Consumer Groups
Reading a stream sequentially with XRANGE is fine for a single client, but what if you have 10,000 events per second? A single consumer will be overwhelmed.
Just like Kafka, Redis Streams supports Consumer Groups to allow multiple clients to cooperate and consume a stream in parallel.
1. How Consumer Groups Work
A Consumer Group guarantees that each message is delivered to only one consumer in the group. This allows you to scale out processing by simply adding more consumers.
- Creation: You create a group for a stream (e.g.,
mygroupformystream). - Consumption: Consumers ask for “new” messages (
>). Redis serves unread messages to them. - Tracking: Redis remembers which message was sent to which consumer in the Pending Entries List (PEL).
- Acknowledgment: Once processed, the consumer sends an
XACK. Redis removes it from the PEL.
2. Key Commands
Creating a Group (XGROUP)
# Create group 'mygroup' starting from the beginning (0)
XGROUP CREATE mystream mygroup 0 MKSTREAM
Reading Messages (XREADGROUP)
# Read up to 1 new message for consumer 'Alice'
XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
>: A special ID meaning “give me messages never delivered to other consumers”.
Acknowledging (XACK)
# Tell Redis we are done with message 1623456789123-0
XACK mystream mygroup 1623456789123-0
3. Interactive: Consumer Group Balance
Simulate a high-throughput stream and watch how messages are distributed among consumers.
Stream: "mystream"
4. Deep Dive: Pending Entries List (PEL)
The PEL is critical for fault tolerance. It ensures at-least-once delivery.
What happens if a consumer crashes?
- Alice reads message
ID:100. - Redis adds
ID:100to the PEL associated with Alice. - Alice crashes before sending
XACK. - Message
ID:100stays in the PEL forever (it is not lost). - Recovery: Another consumer (Bob) can inspect Alice’s PEL (
XPENDING) and “steal” the message (XCLAIM) to process it himself.
[!IMPORTANT] Without
XACK, the PEL will grow indefinitely, consuming memory. Always acknowledge processed messages!
5. Code Examples: Consumer
package main
import (
"context"
"fmt"
"github.com/redis/go-redis/v9"
)
func main() {
ctx := context.Background()
rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})
// Create Group
rdb.XGroupCreateMkStream(ctx, "mystream", "mygroup", "$")
for {
// Read
streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: "mygroup",
Consumer: "consumer1",
Streams: []string{"mystream", ">"},
Count: 1,
Block: 0,
}).Result()
if err != nil {
panic(err)
}
for _, stream := range streams {
for _, msg := range stream.Messages {
fmt.Println("Processing:", msg.Values)
// ACK
rdb.XAck(ctx, "mystream", "mygroup", msg.ID)
}
}
}
}
6. Summary
Consumer Groups turn Redis Streams into a robust, distributed work queue. By leveraging the PEL and acknowledgments, you can ensure that no job is ever lost, even if workers fail.