Consumer Groups

Reading a stream sequentially with XRANGE is fine for a single client, but what if you have 10,000 events per second? A single consumer will be overwhelmed.

Just like Kafka, Redis Streams supports Consumer Groups to allow multiple clients to cooperate and consume a stream in parallel.

1. How Consumer Groups Work

A Consumer Group guarantees that each message is delivered to only one consumer in the group. This allows you to scale out processing by simply adding more consumers.

  1. Creation: You create a group for a stream (e.g., mygroup for mystream).
  2. Consumption: Consumers ask for “new” messages (>). Redis serves unread messages to them.
  3. Tracking: Redis remembers which message was sent to which consumer in the Pending Entries List (PEL).
  4. Acknowledgment: Once processed, the consumer sends an XACK. Redis removes it from the PEL.

2. Key Commands

Creating a Group (XGROUP)

# Create group 'mygroup' starting from the beginning (0)
XGROUP CREATE mystream mygroup 0 MKSTREAM

Reading Messages (XREADGROUP)

# Read up to 1 new message for consumer 'Alice'
XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
  • >: A special ID meaning “give me messages never delivered to other consumers”.

Acknowledging (XACK)

# Tell Redis we are done with message 1623456789123-0
XACK mystream mygroup 1623456789123-0

3. Interactive: Consumer Group Balance

Simulate a high-throughput stream and watch how messages are distributed among consumers.

Stream: "mystream"

⬇️ XREADGROUP
Consumer 1

4. Deep Dive: Pending Entries List (PEL)

The PEL is critical for fault tolerance. It ensures at-least-once delivery.

What happens if a consumer crashes?

  1. Alice reads message ID:100.
  2. Redis adds ID:100 to the PEL associated with Alice.
  3. Alice crashes before sending XACK.
  4. Message ID:100 stays in the PEL forever (it is not lost).
  5. Recovery: Another consumer (Bob) can inspect Alice’s PEL (XPENDING) and “steal” the message (XCLAIM) to process it himself.

[!IMPORTANT] Without XACK, the PEL will grow indefinitely, consuming memory. Always acknowledge processed messages!

5. Code Examples: Consumer

package main

import (
    "context"
    "fmt"
    "github.com/redis/go-redis/v9"
)

func main() {
    ctx := context.Background()
    rdb := redis.NewClient(&redis.Options{Addr: "localhost:6379"})

    // Create Group
    rdb.XGroupCreateMkStream(ctx, "mystream", "mygroup", "$")

    for {
        // Read
        streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
            Group:    "mygroup",
            Consumer: "consumer1",
            Streams:  []string{"mystream", ">"},
            Count:    1,
            Block:    0,
        }).Result()

        if err != nil {
            panic(err)
        }

        for _, stream := range streams {
            for _, msg := range stream.Messages {
                fmt.Println("Processing:", msg.Values)

                // ACK
                rdb.XAck(ctx, "mystream", "mygroup", msg.ID)
            }
        }
    }
}

6. Summary

Consumer Groups turn Redis Streams into a robust, distributed work queue. By leveraging the PEL and acknowledgments, you can ensure that no job is ever lost, even if workers fail.