Producer Internals & Partitioning
This module explores the core principles of Kafka Producer internals and partitioning, deriving solutions from first principles and hardware constraints to build world-class, production-ready expertise.
The Kafka Producer is not just a “dumb pipe” that sends data to a socket. It is a complex, high-performance client responsible for batching, compressing, and routing data to the correct broker. Understanding its internals is the difference between a system that handles 10k msg/sec and one that handles 1M msg/sec.
1. High-Level Architecture
When you call producer.send(), the operation is asynchronous. The main thread hands the record off to a buffer, and a separate I/O thread (the Sender) is responsible for actually transmitting the bytes.
The Write Path
2. Partitioning Strategies
The Partitioner determines which partition of a topic a message should be written to. This decision determines the ordering and parallelism of your data.
A. Key-Based Partitioning
If you provide a Key (e.g., UserID, OrderID), Kafka guarantees that all messages with the same key will go to the same partition.
- Formula:
murmur2(key) % num_partitions - Use Case: Order processing where
Order-123updates must be processed sequentially.
B. Sticky Partitioner (No Key)
If No Key is provided, older versions of Kafka used Round Robin (P0 → P1 → P2). This was inefficient because it created many small batches. Modern Kafka uses the Sticky Partitioner:
- Stick to Partition 0 until the batch is full or
linger.msexpires. - Send the batch.
- Switch to Partition 1 and repeat.
[!TIP] The Sticky Partitioner dramatically improves throughput by reducing the number of requests sent to brokers, even though data is evenly distributed over time.
3. Interactive: Pipeline Visualizer
Visualize how messages flow through the producer. Adjust linger.ms (simulated speed) to see how batching works.
4. Batching & Compression
The producer accumulates records in memory (Record Accumulator) before sending. This is controlled by two main parameters:
batch.size: The maximum size (in bytes) of a single batch. (Default: 16KB).linger.ms: The time to wait for a batch to fill up before sending. (Default: 0 - send immediately).
Why wait? Sending 1 message 1000 times causes 1000 network requests. Sending 1000 messages in 1 batch causes 1 network request.
Compression
Batching enables effective Compression. You cannot compress a single tiny message effectively. But compressing a batch of 100 messages can reduce size by 60-70%.
- Types:
gzip,snappy(Google),lz4(Fastest),zstd(Facebook - Best Balance).
5. Implementation
Java
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class HighThroughputProducer {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
// Reliability
props.put("acks", "all");
// Batching & Throughput
props.put("batch.size", 32 * 1024); // 32 KB batch
props.put("linger.ms", 20); // Wait 20ms for batch to fill
props.put("compression.type", "snappy");// Compress data
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
String key = "user_" + (i % 5); // Key-based partitioning
String value = "order_" + i;
producer.send(new ProducerRecord<>("orders", key, value),
(metadata, exception) -> {
if (exception != null) {
exception.printStackTrace();
} else {
System.out.printf("Sent to P%d at offset %d%n",
metadata.partition(), metadata.offset());
}
});
}
producer.close();
}
}
Go
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/segmentio/kafka-go"
)
func main() {
// Configure the writer (Producer)
w := &kafka.Writer{
Addr: kafka.TCP("localhost:9092"),
Topic: "orders",
Balancer: &kafka.Hash{}, // Key-based hashing
// Batching settings
BatchSize: 100, // Number of messages
BatchTimeout: 20 * time.Millisecond, // linger.ms equivalent
Compression: kafka.Snappy, // Compression
// Reliability
RequiredAcks: kafka.RequireAll,
}
defer w.Close()
// Send messages
msgs := []kafka.Message{}
for i := 0; i < 100; i++ {
key := fmt.Sprintf("user_%d", i%5)
val := fmt.Sprintf("order_%d", i)
msgs = append(msgs, kafka.Message{
Key: []byte(key),
Value: []byte(val),
})
}
err := w.WriteMessages(context.Background(), msgs...)
if err != nil {
log.Fatal("failed to write messages:", err)
}
fmt.Println("Batch sent successfully")
}
6. Memory Management
The producer uses a BufferPool to recycle memory.
- Total memory is capped by
buffer.memory(Default: 32MB). - If the producer produces faster than the broker can accept, this buffer fills up.
- Once full, the
send()method blocks formax.block.ms(Default: 60s). - If it still can’t clear space, it throws a
TimeoutException.
[!WARNING] Do not set
buffer.memorytoo low if you have high throughput, or your application will spend all its time blocked onsend().