Windowing & Joins
In real-time streaming, “now” is a moving target. To calculate aggregates (like “average price”), you must define a specific slice of time. This is Windowing.
1. Time Semantics
Before defining windows, we must decide “which time” matters:
- Event Time: When the event actually occurred (timestamp in the record). This is usually the correct choice.
- Processing Time: When the event arrived at the Kafka Streams application.
- Ingestion Time: When the event was appended to the Kafka topic.
2. Window Types
Tumbling Window
Fixed-size, non-overlapping, gap-less windows.
- Example: “Every 5 minutes” (00:00-00:05, 00:05-00:10).
- Use Case: Standard periodic reporting.
Hopping Window
Fixed-size, overlapping windows. Defined by size and advance interval.
- Example: “Last 10 minutes, updated every 5 minutes”.
- Use Case: Rolling averages, trend analysis.
Session Window
Dynamically sized windows based on user activity. A window stays open as long as events keep arriving within a gap period.
- Example: “User session ends after 30 minutes of inactivity”.
- Use Case: User behavior analysis.
Interactive: Window Visualizer
Drop events and see which windows they fall into.
3. Real-Time Joins
Merging data from different sources is a common requirement.
Stream-Stream Join
- Goal: Correlate two event streams.
- Requirement: Both sides must be windowed. “Join A and B if they happen within X minutes of each other.”
- Example: AdClick stream joined with AdImpression stream (within 5 min).
Stream-Table Join
- Goal: Enrich a stream with reference data.
- Requirement: Only the Stream triggers the join. The Table is just a lookup.
- Example: Order stream joined with User table (to get user address).
[!WARNING] Co-Partitioning is Mandatory! To join two topics, they must have the same number of partitions and be partitioned by the same key. If not, record A might be on partition 1 of topic X, but its matching record B is on partition 3 of topic Y, on a different server. Kafka Streams will fail or produce wrong results if this condition is not met.
4. Code Implementation
Java (Windowed Aggregation)
import org.apache.kafka.streams.kstream.TimeWindows;
KStream<String, String> clicks = ...;
// Count clicks per user, per 5-minute tumbling window
KTable<Windowed<String>, Long> clickCounts = clicks
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
Go (Manual Windowing Logic)
In Go, you must implement the windowing logic manually, often by bucketing timestamps.
// Concept: Tumbling Window Bucket
func getWindowStart(timestamp int64, windowSize int64) int64 {
return timestamp - (timestamp % windowSize)
}
// In your consumer loop:
msgTime := msg.Timestamp.Unix()
windowStart := getWindowStart(msgTime, 300) // 300s = 5m
key := fmt.Sprintf("%s-%d", msg.Key, windowStart)
// Store 'key' in a map or Redis to aggregate counts
counts[key]++