Windowing & Joins

Imagine you are building a real-time dashboard for a global e-commerce platform like Amazon on Black Friday. You need to show the “Total Sales in the Last 5 Minutes.”

If this were a static SQL database, you would just run SELECT SUM(price) FROM sales WHERE timestamp >= NOW() - INTERVAL '5 minutes'. But in a real-time streaming system, “now” is a constantly moving target, and data never stops arriving. To calculate these types of aggregates, you must define specific, bounded slices of time over an unbounded stream of data. This concept is called Windowing.

1. Time Semantics

Before defining windows, we must decide “which time” matters:

  • Event Time: When the event actually occurred (timestamp in the record). This is usually the correct choice.
  • Processing Time: When the event arrived at the Kafka Streams application.
  • Ingestion Time: When the event was appended to the Kafka topic.

2. Window Types

Tumbling Window

Fixed-size, non-overlapping, gap-less windows.

  • Example: “Every 5 minutes” (00:00-00:05, 00:05-00:10).
  • Use Case: Standard periodic reporting.

Hopping Window

Fixed-size, overlapping windows. Defined by size and advance interval.

  • Example: “Last 10 minutes, updated every 5 minutes”.
  • Use Case: Rolling averages, trend analysis.

Session Window

Dynamically sized windows based on user activity. A window stays open as long as events keep arriving within a gap period.

  • Example: “User session ends after 30 minutes of inactivity”.
  • Use Case: User behavior analysis.

Interactive: Window Visualizer

Drop events and see which windows they fall into.

010203040
[0-10)
[10-20)
[20-30)
[30-40)
Selected Strategy: Tumbling Window (Size=10)

3. Real-Time Joins

Merging data from different sources is a common requirement.

Stream-Stream Join

  • Goal: Correlate two event streams.
  • Requirement: Both sides must be windowed. “Join A and B if they happen within X minutes of each other.”
  • Example: AdClick stream joined with AdImpression stream (within 5 min).

Stream-Table Join

  • Goal: Enrich a stream with reference data.
  • Requirement: Only the Stream triggers the join. The Table is just a lookup.
  • Example: Order stream joined with User table (to get user address).

[!WARNING] Co-Partitioning is Mandatory! To join two topics, they must have the same number of partitions and be partitioned by the same key. If not, record A might be on partition 1 of topic X, but its matching record B is on partition 3 of topic Y, on a different server. Kafka Streams relies on local state stores for joins; if the data for the same key isn’t routed to the same physical machine (co-partitioned), the join will silently fail or produce incomplete results. This is a hard physical constraint of distributed systems, not just a framework quirk.

Dealing with “Late” Data: Grace Periods

In distributed systems, networks drop packets, mobile devices go offline, and servers clock-drift. A critical part of windowing is deciding how long to wait for late-arriving events.

  • Watermarks: A threshold indicating that no more events older than timestamp $T$ are expected.
  • Grace Period: Kafka Streams allows you to define a grace period (e.g., “Wait an extra 10 minutes for late events before permanently closing the window”). If an event arrives after the window and grace period have expired, it is dropped.

4. Code Implementation

Java (Windowed Aggregation)


import org.apache.kafka.streams.kstream.TimeWindows;

KStream<String, String> clicks = ...;

// Count clicks per user, per 5-minute tumbling window
KTable<Windowed<String>, Long> clickCounts = clicks
  .groupByKey()
  .windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
  .count();

Go (Manual Windowing Logic)

In Go, you must implement the windowing logic manually, often by bucketing timestamps.


// Concept: Tumbling Window Bucket
func getWindowStart(timestamp int64, windowSize int64) int64 {
  return timestamp - (timestamp % windowSize)
}

// In your consumer loop:
msgTime := msg.Timestamp.Unix()
windowStart := getWindowStart(msgTime, 300) // 300s = 5m

key := fmt.Sprintf("%s-%d", msg.Key, windowStart)
// Store 'key' in a map or Redis to aggregate counts
counts[key]++