Stateless vs Stateful Operations
Stream processing operations fall into two categories: Stateless (simple, fast, isolated) and Stateful (complex, memory-intensive, context-aware).
1. Stateless Operations
A stateless operation depends only on the current record being processed. It does not need to know about any previous records.
- Filter: Should this record pass? (e.g.,
amount > 100) - Map: Transform the key or value. (e.g.,
String→Integer) - Branch: Split one stream into multiple streams.
- Characteristics:
- No Disk I/O: Pure CPU processing.
- Instant Recovery: If an instance fails, it just restarts and consumes from the last offset. No state to restore.
2. Stateful Operations
A stateful operation depends on previous records. To process the current record, you need “memory” of what happened before.
- Count: “How many times have we seen user ‘Alice’?” (Needs to remember the current count).
- Reduce/Aggregate: “What is the total sum of orders for ‘Bob’?” (Needs to remember the running sum).
- Join: “Match this Order with the previous Payment”.
- Windowing: “Count events in the last 5 minutes”.
3. The State Store (RocksDB)
Where does Kafka Streams store this “memory”? It uses State Stores. By default, Kafka Streams uses RocksDB, an embedded key-value store that runs locally within your application instance.
- Why RocksDB?: It is highly optimized for fast writes (LSM trees) and runs in-process (low latency).
- Location: The state is stored on the local disk of the machine/container running the application.
4. Fault Tolerance: The Changelog Topic
If the state is stored locally on disk, what happens if the container crashes or the disk dies?
Kafka Streams ensures durability using Changelog Topics.
- Dual Write: Every time the application updates the local RocksDB store (e.g., increments a count), it also sends that update to a hidden internal Kafka topic called the Changelog Topic.
- Recovery: If the application crashes and restarts on a new machine:
- It sees that its local RocksDB is empty.
- It replays the Changelog Topic from the beginning (or latest snapshot).
- This reconstructs the RocksDB state exactly as it was before the crash.
[!IMPORTANT] Performance Trade-off: Stateful operations are slower than stateless ones because they involve:
- Local Disk I/O (RocksDB).
- Network I/O (Producing to the Changelog Topic).
Interactive: State Recovery Simulator
Simulate a node crash and watch it restore its state from the Changelog.
5. Code Implementation
Java (Kafka Streams)
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> lines = builder.stream("text-lines");
// 1. Stateless: Map (Split lines into words)
KStream<String, String> words = lines.flatMapValues(
value -> Arrays.asList(value.toLowerCase().split("\\W+"))
);
// 2. Stateful: Count (Aggregation)
KTable<String, Long> wordCounts = words
.groupBy((key, word) -> word) // Re-partition by word
.count(Materialized.as("CountsStore")); // "CountsStore" is the RocksDB name
wordCounts.toStream().to("word-counts");
Go (Consumer with In-Memory State)
In Go, handling state safely usually implies managing a local map and periodically checkpointing it, or using a library. This example shows a simple in-memory state map (not fault-tolerant without persistence).
package main
import (
"strings"
"github.com/confluentinc/confluent-kafka-go/kafka"
)
func main() {
// ... Setup Consumer/Producer ...
// Local State Store (In-Memory)
stateStore := make(map[string]int)
for {
msg, err := c.ReadMessage(-1)
if err == nil {
line := string(msg.Value)
// Stateless: Split
words := strings.Fields(strings.ToLower(line))
for _, word := range words {
// Stateful: Count
stateStore[word]++
// Emitting the update (Stream of updates)
// In production, you would flush periodically or on every update
// to a "changelog" topic manually if you built your own framework.
}
}
}
}