Idempotent Producers
Network errors are inevitable. When a Producer sends a message to Kafka, it might receive an error (e.g., “Network Exception”) or no response at all (timeout).
The Producer has a dilemma:
- Don’t Retry: The message might be lost. (At-most-once)
- Retry: If the original message did land but the acknowledgment was lost, retrying will cause a Duplicate. (At-least-once)
For years, Kafka users had to handle this deduplication in their application. Now, Kafka solves this natively with the Idempotent Producer.
1. How Idempotence Works
An idempotent operation is one that can be performed multiple times without changing the result beyond the initial application. x = 1 is idempotent. x++ is not.
To make Kafka writes idempotent, the protocol introduces two invisible pieces of metadata to every batch of messages:
- Producer ID (PID): A unique identifier assigned by the broker to the producer upon connection.
- Sequence Number (SeqNum): A monotonically increasing number for each message sent to a specific partition.
1.1 The Deduplication Logic
The Broker keeps track of the last committed Sequence Number for each (PID, Partition) pair.
When a new message arrives:
- If New SeqNum = Last SeqNum + 1: Accept. (Normal case)
- If New SeqNum ≤ Last SeqNum: Reject as Duplicate. (Ack immediately without writing)
- If New SeqNum > Last SeqNum + 1: Reject as Out of Order. (Fatal error, indicates data loss)
2. Interactive: Duplicate Fighter
Simulate a network failure where the acknowledgment is lost. See how the Idempotent Producer prevents the duplicate write.
3. Enabling Idempotence
Since Kafka 3.0, enable.idempotence defaults to true. However, it’s good to know the dependencies.
When enable.idempotence=true:
acksdefaults toall.retriesdefaults toMAX_INT.max.in.flight.requests.per.connectionmust be <= 5 (to preserve order).
Java Configuration
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// Explicitly enable idempotence (Default in modern Kafka)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
// These are implied, but good to be explicit
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
Go Configuration (Franz-Go)
package main
import (
"context"
"log"
"sync"
"github.com/twmb/franz-go/pkg/kgo"
)
func main() {
// Franz-go enables idempotence by default!
// But here is how you ensure it is set.
client, err := kgo.NewClient(
kgo.SeedBrokers("localhost:9092"),
kgo.ProducerIdempotent(),
)
if err != nil {
log.Fatal(err)
}
defer client.Close()
var wg sync.WaitGroup
wg.Add(1)
record := &kgo.Record{Topic: "orders", Value: []byte("Order-123")}
client.Produce(context.Background(), record, func(_ *kgo.Record, err error) {
defer wg.Done()
if err != nil {
log.Printf("Failed to produce: %v", err)
} else {
log.Println("Produced successfully!")
}
})
wg.Wait()
}
4. Summary
- Idempotence guarantees exactly one write per partition.
- It uses PID and Sequence Numbers to detect duplicates.
- It handles network retries transparently without application logic.
- It is enabled by default in modern Kafka.