Transactions (Exactly-Once)

Idempotence (Chapter 3) guarantees exactly-once writes for a single partition. But what if you need to write to multiple partitions atomically?

Example: A banking app reads a withdrawal from topic-A, processes it, and writes a deposit to topic-B.

  • If the app crashes after reading but before writing: Money lost.
  • If the app crashes after writing but before committing the offset: Money duplicated (re-processed).

Kafka Transactions solve this by allowing you to bundle multiple writes (and offset commits) into a single atomic unit.

1. The Mechanics of Transactions

Kafka introduces a Transaction Coordinator (a special module inside the broker) and a Transaction Log (internal topic __transaction_state) to manage this state machine.

1.1 The Protocol

  1. Init: Producer registers with a unique transactional.id.
  2. Begin: Producer starts a transaction.
  3. Produce: Producer sends messages to various partitions. These messages are written to the log but marked as “Uncommitted” (invisible to most consumers).
  4. Commit/Abort:
    • Commit: The Coordinator writes a COMMIT marker to the log. Consumers can now see the data.
    • Abort: The Coordinator writes an ABORT marker. Consumers ignore the data.

1.2 Isolation Levels

Consumers control what they see via isolation.level:

  • read_uncommitted (Default): Sees everything, including aborted transactions and in-flight data.
  • read_committed: Only sees messages from committed transactions. This is required for Exactly-Once processing.

2. Interactive: The Atomic Switch

Simulate a transaction writing to two partitions. See how read_committed consumers ignore the data until the final COMMIT marker drops.

Topic A (Partition 0)

Topic B (Partition 0)

Consumer View (read_committed):
Waiting for committed data...

3. Implementation Guide

To enable transactions, you must configure both the Producer and the Consumer correctly.

3.1 Java Example

Producer:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my-transactional-id"); // Mandatory

Producer<String, String> producer = new KafkaProducer<>(props);

producer.initTransactions(); // Register with coordinator

try {
    producer.beginTransaction();
    producer.send(new ProducerRecord<>("topic-A", "Alice", "$50"));
    producer.send(new ProducerRecord<>("topic-B", "Bob", "$50"));
    producer.commitTransaction(); // Atomic Commit
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
    // Fatal errors
    producer.close();
} catch (KafkaException e) {
    // Transient errors
    producer.abortTransaction();
}

Consumer:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-group");
// CRITICAL: Only see committed data
props.put("isolation.level", "read_committed");

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

3.2 Go Example (Franz-Go)

package main

import (
	"context"
	"log"

	"github.com/twmb/franz-go/pkg/kgo"
)

func main() {
	client, err := kgo.NewClient(
		kgo.SeedBrokers("localhost:9092"),
		kgo.TransactionalID("my-go-txn-id"), // Enable Transactions
	)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	ctx := context.Background()

	// 1. Begin Transaction
	if err := client.BeginTransaction(); err != nil {
		log.Fatal(err)
	}

	// 2. Produce Messages
	recordA := &kgo.Record{Topic: "topic-A", Value: []byte("Alice $50")}
	if err := client.ProduceSync(ctx, recordA).FirstErr(); err != nil {
		client.AbortBufferedRecords(ctx) // Rollback
		log.Fatal(err)
	}

	recordB := &kgo.Record{Topic: "topic-B", Value: []byte("Bob $50")}
	if err := client.ProduceSync(ctx, recordB).FirstErr(); err != nil {
		client.AbortBufferedRecords(ctx) // Rollback
		log.Fatal(err)
	}

	// 3. Commit Transaction
	if err := client.EndTransaction(ctx, kgo.TryCommit); err != nil {
		log.Fatal(err)
	}

	log.Println("Transaction Committed Successfully!")
}

4. Summary

  • Transactions enable atomic writes across multiple partitions.
  • The Transaction Coordinator manages the lifecycle using a Transaction Log.
  • Consumers must use read_committed to ignore aborted or incomplete transactions.
  • This is the foundation of Kafka Streams Exactly-Once Processing.