Idempotent Producers

Network errors are inevitable. When a Producer sends a message to Kafka, it might receive an error (e.g., “Network Exception”) or no response at all (timeout).

The Producer has a dilemma:

  1. Don’t Retry: The message might be lost. (At-most-once)
  2. Retry: If the original message did land but the acknowledgment was lost, retrying will cause a Duplicate. (At-least-once)

For years, Kafka users had to handle this deduplication in their application. Now, Kafka solves this natively with the Idempotent Producer.

1. How Idempotence Works

An idempotent operation is one that can be performed multiple times without changing the result beyond the initial application. x = 1 is idempotent. x++ is not.

To make Kafka writes idempotent, the protocol introduces two invisible pieces of metadata to every batch of messages:

  1. Producer ID (PID): A unique identifier assigned by the broker to the producer upon connection.
  2. Sequence Number (SeqNum): A monotonically increasing number for each message sent to a specific partition.

1.1 The Deduplication Logic

The Broker keeps track of the last committed Sequence Number for each (PID, Partition) pair.

When a new message arrives:

  • If New SeqNum = Last SeqNum + 1: Accept. (Normal case)
  • If New SeqNum ≤ Last SeqNum: Reject as Duplicate. (Ack immediately without writing)
  • If New SeqNum > Last SeqNum + 1: Reject as Out of Order. (Fatal error, indicates data loss)

2. Interactive: Duplicate Fighter

Simulate a network failure where the acknowledgment is lost. See how the Idempotent Producer prevents the duplicate write.

Producer (PID: 100)
Msg A Seq: 1
Broker
Last Seq: 0
Ready to send.

3. Enabling Idempotence

Since Kafka 3.0, enable.idempotence defaults to true. However, it’s good to know the dependencies.

When enable.idempotence=true:

  • acks defaults to all.
  • retries defaults to MAX_INT.
  • max.in.flight.requests.per.connection must be <= 5 (to preserve order).

Java Configuration

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// Explicitly enable idempotence (Default in modern Kafka)
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");

// These are implied, but good to be explicit
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));

Go Configuration (Franz-Go)

package main

import (
	"context"
	"log"
	"sync"

	"github.com/twmb/franz-go/pkg/kgo"
)

func main() {
	// Franz-go enables idempotence by default!
	// But here is how you ensure it is set.
	client, err := kgo.NewClient(
		kgo.SeedBrokers("localhost:9092"),
		kgo.ProducerIdempotent(),
	)
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	var wg sync.WaitGroup
	wg.Add(1)
	record := &kgo.Record{Topic: "orders", Value: []byte("Order-123")}

	client.Produce(context.Background(), record, func(_ *kgo.Record, err error) {
		defer wg.Done()
		if err != nil {
			log.Printf("Failed to produce: %v", err)
		} else {
			log.Println("Produced successfully!")
		}
	})
	wg.Wait()
}

4. Summary

  • Idempotence guarantees exactly one write per partition.
  • It uses PID and Sequence Numbers to detect duplicates.
  • It handles network retries transparently without application logic.
  • It is enabled by default in modern Kafka.