Stateless vs Stateful Operations

Stream processing operations fall into two categories: Stateless (simple, fast, isolated) and Stateful (complex, memory-intensive, context-aware).

1. Stateless Operations

A stateless operation depends only on the current record being processed. It does not need to know about any previous records.

  • Filter: Should this record pass? (e.g., amount > 100)
  • Map: Transform the key or value. (e.g., StringInteger)
  • Branch: Split one stream into multiple streams.
  • Characteristics:
  • No Disk I/O: Pure CPU processing.
  • Instant Recovery: If an instance fails, it just restarts and consumes from the last offset. No state to restore.

2. Stateful Operations

A stateful operation depends on previous records. To process the current record, you need “memory” of what happened before.

  • Count: “How many times have we seen user ‘Alice’?” (Needs to remember the current count).
  • Reduce/Aggregate: “What is the total sum of orders for ‘Bob’?” (Needs to remember the running sum).
  • Join: “Match this Order with the previous Payment”.
  • Windowing: “Count events in the last 5 minutes”.

3. The State Store (RocksDB)

Where does Kafka Streams store this “memory”? It uses State Stores. By default, Kafka Streams uses RocksDB, an embedded key-value store that runs locally within your application instance.

  • Why RocksDB?: It is highly optimized for fast writes (LSM trees) and runs in-process (low latency).
  • Location: The state is stored on the local disk of the machine/container running the application.

4. Fault Tolerance: The Changelog Topic

If the state is stored locally on disk, what happens if the container crashes or the disk dies?

Kafka Streams ensures durability using Changelog Topics.

  1. Dual Write: Every time the application updates the local RocksDB store (e.g., increments a count), it also sends that update to a hidden internal Kafka topic called the Changelog Topic.
  2. Recovery: If the application crashes and restarts on a new machine:
    • It sees that its local RocksDB is empty.
    • It replays the Changelog Topic from the beginning (or latest snapshot).
    • This reconstructs the RocksDB state exactly as it was before the crash.

[!IMPORTANT] Performance Trade-off: Stateful operations are slower than stateless ones because they involve:

  1. Local Disk I/O (RocksDB).
  2. Network I/O (Producing to the Changelog Topic).

Interactive: State Recovery Simulator

Simulate a node crash and watch it restore its state from the Changelog.

Kafka: Changelog Topic
Update: A=10
Update: B=5
Update: A=12
Update: A=15
Application Instance
STATUS: OK
RocksDB: A=15

5. Code Implementation

Java (Kafka Streams)


import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> lines = builder.stream("text-lines");

// 1. Stateless: Map (Split lines into words)
KStream<String, String> words = lines.flatMapValues(
    value -> Arrays.asList(value.toLowerCase().split("\\W+"))
);

// 2. Stateful: Count (Aggregation)
KTable<String, Long> wordCounts = words
    .groupBy((key, word) -> word) // Re-partition by word
    .count(Materialized.as("CountsStore")); // "CountsStore" is the RocksDB name

wordCounts.toStream().to("word-counts");

Go (Consumer with In-Memory State)

In Go, handling state safely usually implies managing a local map and periodically checkpointing it, or using a library. This example shows a simple in-memory state map (not fault-tolerant without persistence).


package main

import (
	"strings"
	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	// ... Setup Consumer/Producer ...

    // Local State Store (In-Memory)
	stateStore := make(map[string]int)

	for {
		msg, err := c.ReadMessage(-1)
		if err == nil {
			line := string(msg.Value)

            // Stateless: Split
			words := strings.Fields(strings.ToLower(line))

			for _, word := range words {
                // Stateful: Count
				stateStore[word]++

                // Emitting the update (Stream of updates)
                // In production, you would flush periodically or on every update
                // to a "changelog" topic manually if you built your own framework.
			}
		}
	}
}