Transactions (Exactly-Once)
Idempotence (Chapter 3) guarantees exactly-once writes for a single partition. But what if you need to write to multiple partitions atomically?
Example: A banking app reads a withdrawal from topic-A, processes it, and writes a deposit to topic-B.
- If the app crashes after reading but before writing: Money lost.
- If the app crashes after writing but before committing the offset: Money duplicated (re-processed).
Kafka Transactions solve this by allowing you to bundle multiple writes (and offset commits) into a single atomic unit.
1. The Mechanics of Transactions
Kafka introduces a Transaction Coordinator (a special module inside the broker) and a Transaction Log (internal topic __transaction_state) to manage this state machine.
1.1 The Protocol
- Init: Producer registers with a unique
transactional.id. - Begin: Producer starts a transaction.
- Produce: Producer sends messages to various partitions. These messages are written to the log but marked as “Uncommitted” (invisible to most consumers).
- Commit/Abort:
- Commit: The Coordinator writes a
COMMITmarker to the log. Consumers can now see the data. - Abort: The Coordinator writes an
ABORTmarker. Consumers ignore the data.
- Commit: The Coordinator writes a
1.2 Isolation Levels
Consumers control what they see via isolation.level:
read_uncommitted(Default): Sees everything, including aborted transactions and in-flight data.read_committed: Only sees messages from committed transactions. This is required for Exactly-Once processing.
2. Interactive: The Atomic Switch
Simulate a transaction writing to two partitions. See how read_committed consumers ignore the data until the final COMMIT marker drops.
Topic A (Partition 0)
Topic B (Partition 0)
3. Implementation Guide
To enable transactions, you must configure both the Producer and the Consumer correctly.
3.1 Java Example
Producer:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id"); // Mandatory
Producer<String, String> producer = new KafkaProducer<>(props);
producer.initTransactions(); // Register with coordinator
try {
producer.beginTransaction();
producer.send(new ProducerRecord<>("topic-A", "Alice", "$50"));
producer.send(new ProducerRecord<>("topic-B", "Bob", "$50"));
producer.commitTransaction(); // Atomic Commit
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// Fatal errors
producer.close();
} catch (KafkaException e) {
// Transient errors
producer.abortTransaction();
}
Consumer:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
// CRITICAL: Only see committed data
props.put("isolation.level", "read_committed");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
3.2 Go Example (Franz-Go)
package main
import (
"context"
"log"
"github.com/twmb/franz-go/pkg/kgo"
)
func main() {
client, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.TransactionalID("my-go-txn-id"), // Enable Transactions
)
if err != nil {
log.Fatal(err)
}
defer client.Close()
ctx := context.Background()
// 1. Begin Transaction
if err := client.BeginTransaction(); err != nil {
log.Fatal(err)
}
// 2. Produce Messages
recordA := &kgo.Record{Topic: "topic-A", Value: []byte("Alice $50")}
if err := client.ProduceSync(ctx, recordA).FirstErr(); err != nil {
client.AbortBufferedRecords(ctx) // Rollback
log.Fatal(err)
}
recordB := &kgo.Record{Topic: "topic-B", Value: []byte("Bob $50")}
if err := client.ProduceSync(ctx, recordB).FirstErr(); err != nil {
client.AbortBufferedRecords(ctx) // Rollback
log.Fatal(err)
}
// 3. Commit Transaction
if err := client.EndTransaction(ctx, kgo.TryCommit); err != nil {
log.Fatal(err)
}
log.Println("Transaction Committed Successfully!")
}
4. Summary
- Transactions enable atomic writes across multiple partitions.
- The Transaction Coordinator manages the lifecycle using a Transaction Log.
- Consumers must use
read_committedto ignore aborted or incomplete transactions. - This is the foundation of Kafka Streams Exactly-Once Processing.