Transactions (Exactly-Once)
Idempotence (Chapter 3) guarantees exactly-once writes for a single partition. But what if you need to write to multiple partitions atomically?
🚨 The Dual-Write Problem (Banking Example)
A banking app reads a withdrawal from topic-A, processes it, and writes a deposit to topic-B.
- Scenario A: If the app crashes after reading but before writing: Money lost.
- Scenario B: If the app crashes after writing but before committing the offset: Money duplicated (re-processed on restart).
Kafka Transactions solve this by allowing you to bundle multiple writes (and offset commits) into a single atomic unit.
1. The Mechanics of Transactions
Kafka introduces a Transaction Coordinator (a special module inside the broker) and a Transaction Log (internal topic __transaction_state) to manage this state machine.
Think of the Transaction Coordinator as an escrow agent in a real estate deal. The buyer's money and the seller's deed are temporarily held in trust (the Transaction Log). Neither party actually receives their item until the agent officially stamps the deal as COMMIT. If any issue arises, the agent stamps ABORT, and it's as if the transaction never happened.
1.1 The Protocol (Two-Phase Commit)
- Init: Producer registers with a unique
transactional.id. - Begin: Producer starts a transaction.
- Produce: Producer sends messages to various partitions. These messages are written to the log but marked as “Uncommitted” (invisible to most consumers).
- Commit/Abort:
- Commit: The Coordinator writes a
COMMITmarker to the log. Consumers can now see the data. - Abort: The Coordinator writes an
ABORTmarker. Consumers ignore the data.
- Commit: The Coordinator writes a
1.2 Zombie Fencing (The Power of transactional.id)
Why is transactional.id mandatory? It solves the Zombie Producer problem.
If a producer experiences a long network partition, the system might assume it died and spin up a replacement. When the original producer wakes up (a “zombie”), it might try to continue its transaction.
Because both producers share the same transactional.id, the Transaction Coordinator tracks an Epoch Number. The new producer gets a higher epoch. When the zombie tries to commit with the older epoch, the Coordinator definitively rejects it with a ProducerFencedException.
1.3 Isolation Levels
Consumers control what they see via isolation.level:
read_uncommitted(Default): Sees everything, including aborted transactions and in-flight data.read_committed: Only sees messages from committed transactions. This is required for Exactly-Once processing.
2. Interactive: The Atomic Switch
Simulate a transaction writing to two partitions. See how read_committed consumers ignore the data until the final COMMIT marker drops.
Topic A (Partition 0)
Topic B (Partition 0)
3. Implementation Guide
To enable transactions, you must configure both the Producer and the Consumer correctly.
Key Configuration:
- Producer: Must set a
transactional.idand initialize transactions. - Consumer: Must set
isolation.leveltoread_committed.
// --- Java Example ---
// 1. Producer Setup
Properties prodProps = new Properties();
prodProps.put("bootstrap.servers", "localhost:9092");
prodProps.put("transactional.id", "my-transactional-id"); // Mandatory
Producer<String, String> producer = new KafkaProducer<>(prodProps);
producer.initTransactions(); // Register with coordinator
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic-A", "Alice", "$50"));
producer.send(new ProducerRecord<>("topic-B", "Bob", "$50"));
producer.commitTransaction(); // Atomic Commit
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// Fatal errors (e.g. Zombie fenced)
producer.close();
} catch (KafkaException e) {
// Transient errors
producer.abortTransaction();
}
// 2. Consumer Setup
Properties consProps = new Properties();
consProps.put("bootstrap.servers", "localhost:9092");
consProps.put("group.id", "my-group");
// CRITICAL: Only see committed data
consProps.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consProps);
// --- Go Example (Franz-Go) ---
package main
import (
"context"
"log"
"github.com/twmb/franz-go/pkg/kgo"
)
func main() {
client, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.TransactionalID("my-go-txn-id"), // Enable Transactions
)
if err != nil {
log.Fatal(err)
}
defer client.Close()
ctx := context.Background()
// 1. Begin Transaction
if err := client.BeginTransaction(); err != nil {
log.Fatal(err)
}
// 2. Produce Messages
recordA := &kgo.Record{Topic: "topic-A", Value: []byte("Alice $50")}
if err := client.ProduceSync(ctx, recordA).FirstErr(); err != nil {
client.AbortBufferedRecords(ctx) // Rollback
log.Fatal(err)
}
recordB := &kgo.Record{Topic: "topic-B", Value: []byte("Bob $50")}
if err := client.ProduceSync(ctx, recordB).FirstErr(); err != nil {
client.AbortBufferedRecords(ctx) // Rollback
log.Fatal(err)
}
// 3. Commit Transaction
if err := client.EndTransaction(ctx, kgo.TryCommit); err != nil {
log.Fatal(err)
}
// For Consumer, set: kgo.ConsumeIsolationLevel(kgo.ReadCommitted())
log.Println("Transaction Committed Successfully!")
}
4. Summary
- Transactions enable atomic writes across multiple partitions.
- The Transaction Coordinator manages the lifecycle using a Transaction Log.
- Consumers must use
read_committedto ignore aborted or incomplete transactions. - This is the foundation of Kafka Streams Exactly-Once Processing.