Producer Internals & Partitioning

The Kafka Producer is not just a “dumb pipe” that sends data to a socket. It is a complex, high-performance client responsible for batching, compressing, and routing data to the correct broker. Understanding its internals is the difference between a system that handles 10k msg/sec and one that handles 1M msg/sec.

1. High-Level Architecture

When you call producer.send(), the operation is asynchronous. The main thread hands the record off to a buffer, and a separate I/O thread (the Sender) is responsible for actually transmitting the bytes.

The Write Path

Main Thread
send(record)
Serializer
Object → Bytes
Partitioner
Assign Partition
Record Accumulator
Batch P0
Batch P1
Batch P2
Sender Thread
Drain Batches → Send to Brokers

2. Partitioning Strategies

The Partitioner determines which partition of a topic a message should be written to. This decision determines the ordering and parallelism of your data.

A. Key-Based Partitioning

If you provide a Key (e.g., UserID, OrderID), Kafka guarantees that all messages with the same key will go to the same partition.

  • Formula: murmur2(key) % num_partitions
  • Use Case: Order processing where Order-123 updates must be processed sequentially.

B. Sticky Partitioner (No Key)

If No Key is provided, older versions of Kafka used Round Robin (P0 → P1 → P2). This was inefficient because it created many small batches. Modern Kafka uses the Sticky Partitioner:

  1. Stick to Partition 0 until the batch is full or linger.ms expires.
  2. Send the batch.
  3. Switch to Partition 1 and repeat.

[!TIP] The Sticky Partitioner dramatically improves throughput by reducing the number of requests sent to brokers, even though data is evenly distributed over time.


3. Interactive: Pipeline Visualizer

Visualize how messages flow through the producer. Adjust linger.ms (simulated speed) to see how batching works.

Status: Idle
Partition 0
Partition 1
Accumulating records... batch sends when full (5 items).

4. Batching & Compression

The producer accumulates records in memory (Record Accumulator) before sending. This is controlled by two main parameters:

  1. batch.size: The maximum size (in bytes) of a single batch. (Default: 16KB).
  2. linger.ms: The time to wait for a batch to fill up before sending. (Default: 0 - send immediately).

Why wait? Sending 1 message 1000 times causes 1000 network requests. Sending 1000 messages in 1 batch causes 1 network request.

Compression

Batching enables effective Compression. You cannot compress a single tiny message effectively. But compressing a batch of 100 messages can reduce size by 60-70%.

  • Types: gzip, snappy (Google), lz4 (Fastest), zstd (Facebook - Best Balance).

5. Implementation

Java Producer

```java import org.apache.kafka.clients.producer.*; import java.util.Properties; public class HighThroughputProducer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); // Reliability props.put("acks", "all"); // Batching & Throughput props.put("batch.size", 32 * 1024); // 32 KB batch props.put("linger.ms", 20); // Wait 20ms for batch to fill props.put("compression.type", "snappy");// Compress data KafkaProducer<String, String> producer = new KafkaProducer<>(props); for (int i = 0; i < 100; i++) { String key = "user_" + (i % 5); // Key-based partitioning String value = "order_" + i; producer.send(new ProducerRecord<>("orders", key, value), (metadata, exception) -> { if (exception != null) { exception.printStackTrace(); } else { System.out.printf("Sent to P%d at offset %d%n", metadata.partition(), metadata.offset()); } }); } producer.close(); } } ```

Go Producer

```go package main import ( "context" "fmt" "log" "time" "github.com/segmentio/kafka-go" ) func main() { // Configure the writer (Producer) w := &kafka.Writer{ Addr: kafka.TCP("localhost:9092"), Topic: "orders", Balancer: &kafka.Hash{}, // Key-based hashing // Batching settings BatchSize: 100, // Number of messages BatchTimeout: 20 * time.Millisecond, // linger.ms equivalent Compression: kafka.Snappy, // Compression // Reliability RequiredAcks: kafka.RequireAll, } defer w.Close() // Send messages msgs := []kafka.Message{} for i := 0; i < 100; i++ { key := fmt.Sprintf("user_%d", i%5) val := fmt.Sprintf("order_%d", i) msgs = append(msgs, kafka.Message{ Key: []byte(key), Value: []byte(val), }) } err := w.WriteMessages(context.Background(), msgs...) if err != nil { log.Fatal("failed to write messages:", err) } fmt.Println("Batch sent successfully") } ```

6. Memory Management

The producer uses a BufferPool to recycle memory.

  • Total memory is capped by buffer.memory (Default: 32MB).
  • If the producer produces faster than the broker can accept, this buffer fills up.
  • Once full, the send() method blocks for max.block.ms (Default: 60s).
  • If it still can’t clear space, it throws a TimeoutException.

[!WARNING] Do not set buffer.memory too low if you have high throughput, or your application will spend all its time blocked on send().