Distributed Counter (YouTube Likes)

[!NOTE] This module explores the core principles of Distributed Counter (YouTube Likes), deriving solutions from first principles and hardware constraints to build world-class, production-ready expertise.

1. What is a Distributed Counter?

In the early days of the web, counting was simple: UPDATE posts SET likes = likes + 1 WHERE id = 123. The database handled the locking, and life was good.

Then came “Gangnam Style”. In 2012, it became the first video to hit 1 Billion views, breaking YouTube’s 32-bit integer limit (2,147,483,647). But the bigger problem was Velocity. When a viral event occurs, millions of users click “Like” simultaneously.

A standard relational database (Postgres/MySQL) can handle ~500-1000 concurrent writes to a single row before lock contention brings the system to a halt. This chapter explores how to scale from 100 to 1 Million writes per second using Sharded Counters, Redis, and Write-Behind Caching.

[!TIP] Real-World Examples:

  • YouTube: View counts, Likes.
  • Twitter: Tweet likes, Retweets.
  • Facebook: Post reactions.

2. Requirements & Goals

2.1 Functional Requirements

  1. Ingest: The system must accept “Like”, “View”, or “Upvote” events immediately.
  2. Query: Users must see the current count (approximate or exact).
  3. Deduplication: A user can only like a video once (Idempotency).
  4. Persistence: Counts must not be lost if the cache crashes.

2.2 Non-Functional Requirements

  1. Low Latency: Write operations must return < 10ms (99th percentile).
  2. High Availability: 99.99%. It is better to show an approximate count than to fail the write (AP over CP).
  3. Scalability: Handle 1 Million interactions/sec on a single object (Hot Key).
  4. Eventual Consistency: It is acceptable if the viewer count lags by a few seconds.

2.3 Extended Requirements

  1. Auditability: For “Ad Views” (which equate to money), we must have an exact, audit-proof log of every event.
  2. Bot Detection: We must filter out “fake” views from click farms.

3. Capacity Estimation

Let’s design for a platform like YouTube or Twitter.

3.1 Traffic Analysis

  • DAU (Daily Active Users): 500 Million.
  • Reads: 50 Views/User/Day = 25 Billion Views/Day.
  • Writes (Likes/Interactions): 5 Likes/User/Day = 2.5 Billion Writes/Day.
  • Average Write QPS: 2.5 × 109 / 86400 ≈ 29,000 writes/sec.
  • Peak QPS: Viral events (e.g., Super Bowl) can spike to 100x the average → 3 Million writes/sec.

3.2 Storage (The Ledger)

  • Counter Store: Tiny (Just a number).
  • Relationship Store: Storing “User X liked Video Y” is massive.
  • UserId (8 bytes) + VideoId (8 bytes) + Timestamp (8 bytes) = 24 bytes.
  • 2.5B writes/day * 24 bytes ≈ 60 GB/day.
  • 5-Year Retention: 60 GB * 365 * 5 ≈ 110 TB.
  • Conclusion: We need a sharded NoSQL database (Cassandra/DynamoDB) or a sharded SQL setup. (See Database Sharding).

4. System APIs

We need a simple REST or gRPC API for clients.

Method Endpoint Description
POST /v1/likes Records a like interaction. Payload: { videoId: "v123", userId: "u456" }
GET /v1/likes/{videoId} Returns the current count. Response: { count: 1540032, approx: true }
DELETE /v1/likes/{videoId} Removes a like (Unlike). Payload: { userId: "u456" }

5. Database Design

We need two distinct storage layers: one for the count (Fast) and one for the record (Durable).

5.1 Redis (Hot Storage)

Used for real-time counting.

  • Key: video:{id}:likesValue: Integer
  • Set (Optional): video:{id}:liked_usersSet<UserId> (For deduplication, if small).

5.2 SQL / NoSQL (Cold Storage)

Used for the durable record of who liked what. Table: likes

CREATE TABLE likes (
  user_id BIGINT,
  video_id BIGINT,
  created_at TIMESTAMP,
  PRIMARY KEY (user_id, video_id) -- Ensures 1 like per user per video
);

6. High-Level Architecture

We use a Write-Behind (Async) architecture to decouple the high-speed writes from the durable storage.

System Architecture: Distributed Counter
Sharded Redis Counters | Scatter-Gather Read | Async Persistence
Write Flow (Fast)
Read Flow (Scatter-Gather)
Persistence (Async)
API Layer
Sharded Cache Layer
Async Persistence Layer
Load Balancer
API Cluster
Node 1
Node 2
Node N
Redis Shards (N=100)
Shard 1 Count: 1,020
Shard 2 Count: 450
...
Shard N Count: 890
Kafka Stream
Worker
Batch Consumer
SQL DB
Relationship Table
INCR (Random Shard) Scatter-Gather (Sum All) Event (User X Liked Y) Batch Insert

7. Component Design (Deep Dive)

7.1 The Single Row Lock Bottleneck

The core bottleneck is the Single Row Lock. If 10,000 requests try to update key="video:123", Redis handles them well, but a SQL database handles them sequentially.

Why is Redis so fast? Redis operates on a Single-Threaded Event Loop. It processes commands sequentially but extremely fast (in-memory access takes nanoseconds). It avoids the context switching and lock overhead that multi-threaded systems (like MySQL) suffer from when thousands of threads fight for a single row lock. However, even Redis has a limit. At ~100k writes/sec to a single key, the single thread maxes out the CPU core. This is where we need sharding.

7.2 Solution: Counter Sharding

Instead of one key, we split the counter into N keys: video:123:0, video:123:1video:123:N-1.

The Write Path (Random Distribution)

When a write comes in, we pick a random shard. This distributes the load evenly across the Redis cluster (if using Redis Cluster) or just reduces contention on a single key.

import random

def increment_likes(video_id, n_shards=100):
  shard_id = random.randint(0, n_shards - 1)
  key = f"video:{video_id}:{shard_id}"
  # INCR is atomic
  redis.incr(key)

The Read Path (Scatter-Gather)

To get the total likes, we sum all shards. This is slower than reading one key, but since Reads << Writes during a viral spike, it’s an acceptable trade-off.

def get_total_likes(video_id, n_shards=100):
  keys = [f"video:{video_id}:{i}" for i in range(n_shards)]
  # MGET executes in a single round-trip (pipelining)
  values = redis.mget(keys)
  total = sum(int(v) or 0 for v in values)
  return total

7.3 Adaptive Sharding

For most videos, N=1 is sufficient. For “Gangnam Style”, we need N=1000.

  • Heuristic: Start with N=10. Monitor Redis CPU. If CPU > 50% on the node holding the key, increase N.
  • Discovery: How does the reader know N? Store metadata: video:123:meta{shards: 50}.

8. Data Partitioning & Sharding

What if we have a Multi-Region setup (US, EU, ASIA)? If we simply replicate Redis, we get write conflicts.

CRDT (Conflict-free Replicated Data Type):

  • G-Counter (Grow-Only Counter): A distributed counter that only increments.
  • Logic: Each region maintains its own counter array. Merge(A, B) = Max(A[i], B[i]) (if using vectors) or simply Sum(All Regions) if using disjoint sets.
  • Redis Implementation: Redis Enterprise offers CRDT support natively for active-active replication.

9. Reliability, Caching, & Load Balancing

9.1 Handling “Approximate” Counting

For things like “Unique Views”, we don’t need an exact count. Storing 1 Billion IP addresses takes ~4GB.

  • HyperLogLog (HLL): A probabilistic data structure that estimates cardinality with 0.81% error using only ~12KB of memory.
  • Use Case: “1.2M Views” (Exact number doesn’t matter).
  • Redis Support: PFADD and PFCOUNT commands.

9.2 Thundering Herd

If a cache key expires, thousands of readers hit the DB.

  • Solution: Probabilistic Early Expiration. If a key expires in 10s, and we are at 9s, flip a coin. If heads, recompute the value early.

10. Interactive Decision Visualizer: Single Lock vs Sharded

Experience the difference between a single contended counter and a sharded approach.

  • Single Lock: Requests must wait in line. High latency.
  • Sharded: Requests are processed in parallel. High throughput.

Counter Architecture Simulator

Simulate 1,000 concurrent "Like" requests

Time Elapsed
0.0s
Processed
0 / 1000
Database/Cache Node(s) Activity
Select a mode to start simulation

11. System Walkthrough

Let’s trace a “Like” request from a user in New York.

Scenario: User A likes Video V123

  1. User Action: User A clicks “Like”. Client sends POST /likes.
      { "videoId": "v123", "userId": "u456", "timestamp": 1698765432 }
    
  2. API Gateway: Validates Auth Token. Routes to API Cluster.
  3. Sharding Logic:
    • API computes: shard_id = hash(v123) % 100 = 42.
    • Target Key: video:v123:42.
  4. Redis Write (Hot Path):
    • API executes INCR video:v123:42.
    • Response: 105 (Current shard count).
    • Total Latency so far: 15ms.
  5. Event Queue (Async Path):
    • API pushes message to Kafka Topic likes-events.
        { "event": "LIKE", "video": "v123", "user": "u456", "shard": 42 }
      
  6. Persistence:
    • Worker pulls batch of 100 events.
    • Executes SQL Batch Insert: INSERT INTO likes (user_id, video_id) VALUES ....

12. Low-Level Optimizations

12.1 Redis Pipelining vs Lua

  • Pipelining: We use MGET to read 100 shards in 1 Round Trip Time (RTT). Without pipelining, reading 100 keys would take 100 RTTs (e.g., 50ms vs 5000ms).
  • Lua Scripting: Used for Atomic “Check and Set”. E.g., “Only increment if user hasn’t liked yet”.
      if redis.call("SISMEMBER", KEYS[2], ARGV[1]) == 0 then
    redis.call("INCR", KEYS[1])
    redis.call("SADD", KEYS[2], ARGV[1])
    return 1
      else
    return 0
      end
    

12.2 HyperLogLog Internals

  • HLL works by hashing the input (UserID) and counting the leading zeros in the binary representation.
  • If we see a hash with 10 leading zeros, it’s likely we’ve seen ~2^10 distinct items.
  • We use 16,384 registers (buckets) to average out the variance.

13. Interview Gauntlet

Q1: How do you handle “Unlikes”?

  • Answer: Send a decrement event (-1). In the sharded counter, we just DECR the random shard. In SQL, we DELETE the row.

Q2: How do you migrate from N=10 to N=100 shards?

  • Answer: We don’t need to migrate data. We just start writing to new keys (11-99). The Reader MGET just needs to know to query 0-99.

Q3: What if the API crashes before writing to Kafka?

  • Answer: We lose the “Durable Record” (User liked Video), but the Counter in Redis is already updated. The system is eventually consistent. For ads/money, we might need a distributed transaction (Saga), but for Likes, this trade-off is acceptable.

Q4: How does Instagram handle “Private Account” likes?

  • Answer: Authorization check first. The “Fanout” service checks if the viewer follows the private account before delivering the notification.

Q5: Can we use DynamoDB Atomic Counter?

  • Answer: Yes, but DynamoDB writes are expensive ($). Redis is cheaper for high-frequency updates. We usually buffer in Redis and flush to DynamoDB.

14. Summary: The Whiteboard Strategy

If asked to design this in an interview, draw this 4-Quadrant Layout:

1. Requirements

  • Func: Ingest, Query, Dedup.
  • Non-Func: High Write Throughput, Low Latency.
  • Scale: 1M+ writes/sec.

2. Architecture

[Client] → [API] → [Sharded Redis] | (Async) → [Kafka] → [Worker] → [SQL]

* Sharding: Key: `video:123:shard_N`. * Write-Behind: Buffer writes in RAM, flush to Disk later.

3. Data & API

POST /like → { videoId, userId } Redis: INCR video:123:5 SQL: (user_id, video_id, timestamp)

4. Deep Dives

  • Sharding: Solves single-key contention.
  • Approximate: Use HyperLogLog for unique views.
  • Consistency: AP system (Eventual Consistency).