Distributed Counter (YouTube Likes)

[!IMPORTANT] In this lesson, you will master: Listen up, counting sounds simple until you hit 1 Million writes per second. We’re going to strip away the magic and see how real hyperscalers survive virality.

  1. The Single-Row Bottleneck: Why traditional SQL locks will take down your entire database during a traffic spike.
  2. Sharded Counters: How to scatter writes across multiple Redis nodes and gather them at read time.
  3. Write-Behind Caching: The exact mechanism to decouple high-speed ingestion from slow, durable persistence.

1. What is a Distributed Counter?

In the early days of the web, counting was simple: UPDATE posts SET likes = likes + 1 WHERE id = 123. The database handled the locking, and life was good.

Then came “Gangnam Style”. In 2012, it became the first video to hit 1 Billion views, breaking YouTube’s 32-bit integer limit (2,147,483,647). But the bigger problem was Velocity. When a viral event occurs, millions of users click “Like” simultaneously.

A standard relational database (Postgres/MySQL) can handle ~500-1000 concurrent writes to a single row before lock contention brings the system to a halt. This chapter explores how to scale from 100 to 1 Million writes per second using Sharded Counters, Redis, and Write-Behind Caching.

[!NOTE] War Story: The “Gangnam Style” Integer Overflow In 2012, Psy’s “Gangnam Style” became the first YouTube video to surpass 2,147,483,647 views. YouTube was storing the view count in a standard 32-bit signed integer. When the count exceeded this limit, it caused an integer overflow, breaking the counter. YouTube had to quickly upgrade the database schema to use a 64-bit integer, which can store up to 9 quintillion views. This event highlighted not just storage limits, but the massive velocity of writes that modern distributed systems must handle.

[!TIP] Real-World Examples:

  • YouTube: View counts, Likes.
  • Twitter: Tweet likes, Retweets.
  • Facebook: Post reactions.

2. Requirements & Goals

2.1 Functional Requirements

  1. Ingest: The system must accept “Like”, “View”, or “Upvote” events immediately.
  2. Query: Users must see the current count (approximate or exact).
  3. Deduplication: A user can only like a video once (Idempotency).
  4. Persistence: Counts must not be lost if the cache crashes.

2.2 Non-Functional Requirements

  1. Low Latency: Write operations must return < 10ms (99th percentile).
  2. High Availability: 99.99%. It is better to show an approximate count than to fail the write (AP over CP).
  3. Scalability: Handle 1 Million interactions/sec on a single object (Hot Key).
  4. Eventual Consistency: It is acceptable if the viewer count lags by a few seconds.

2.3 Extended Requirements

  1. Auditability: For “Ad Views” (which equate to money), we must have an exact, audit-proof log of every event.
  2. Bot Detection: We must filter out “fake” views from click farms.

3. Capacity Estimation

Let’s design for a platform like YouTube or Twitter.

3.1 Traffic Analysis

  • DAU (Daily Active Users): 500 Million.
  • Reads: 50 Views/User/Day = 25 Billion Views/Day.
  • Writes (Likes/Interactions): 5 Likes/User/Day = 2.5 Billion Writes/Day.
  • Average Write QPS: 2.5 × 109 / 86400 ≈ 29,000 writes/sec.
  • Peak QPS: Viral events (e.g., Super Bowl) can spike to 100x the average → 3 Million writes/sec.

3.2 Storage (The Ledger)

  • Counter Store: Tiny (Just a number).
  • Relationship Store: Storing “User X liked Video Y” is massive.
  • UserId (8 bytes) + VideoId (8 bytes) + Timestamp (8 bytes) = 24 bytes.
  • 2.5B writes/day * 24 bytes ≈ 60 GB/day.
  • 5-Year Retention: 60 GB * 365 * 5 ≈ 110 TB.
  • Conclusion: We need a sharded NoSQL database (Cassandra/DynamoDB) or a sharded SQL setup. (See Database Sharding).

4. System APIs

We need a simple REST or gRPC API for clients.

Method Endpoint Description
POST /v1/likes Records a like interaction. Payload: { videoId: "v123", userId: "u456" }
GET /v1/likes/{videoId} Returns the current count. Response: { count: 1540032, approx: true }
DELETE /v1/likes/{videoId} Removes a like (Unlike). Payload: { userId: "u456" }

5. Database Design

We need two distinct storage layers: one for the count (Fast) and one for the record (Durable).

5.1 Redis (Hot Storage)

Used for real-time counting.

  • Key: video:{id}:likesValue: Integer
  • Set (Optional): video:{id}:liked_usersSet<UserId> (For deduplication, if small).

5.2 SQL / NoSQL (Cold Storage)

Used for the durable record of who liked what. Table: likes

CREATE TABLE likes (
  user_id BIGINT,
  video_id BIGINT,
  created_at TIMESTAMP,
  PRIMARY KEY (user_id, video_id) -- Ensures 1 like per user per video
);

6. High-Level Architecture

We use a Write-Behind (Async) architecture to decouple the high-speed writes from the durable storage.

System Architecture: Distributed Counter
Sharded Redis Counters | Scatter-Gather Read | Async Persistence
Write Flow (Fast)
Read Flow (Scatter-Gather)
Persistence (Async)
API Layer
Sharded Cache Layer
Async Persistence Layer
Load Balancer
API Cluster
Node 1
Node 2
Node N
Redis Shards (N=100)
Shard 1 Count: 1,020
Shard 2 Count: 450
...
Shard N Count: 890
Kafka Stream
Worker
Batch Consumer
SQL DB
Relationship Table
INCR (Random Shard) Scatter-Gather (Sum All) Event (User X Liked Y) Batch Insert

7. Component Design (Deep Dive)

7.1 The Single Row Lock Bottleneck

The core bottleneck is the Single Row Lock. If 10,000 requests try to update key="video:123", Redis handles them well, but a SQL database handles them sequentially.

Why is Redis so fast? Redis operates on a Single-Threaded Event Loop. It processes commands sequentially but extremely fast (in-memory access takes nanoseconds). It avoids the context switching and lock overhead that multi-threaded systems (like MySQL) suffer from when thousands of threads fight for a single row lock. However, even Redis has a limit. At ~100k writes/sec to a single key, the single thread maxes out the CPU core. This is where we need sharding.

7.2 Solution: Counter Sharding

Instead of one key, we split the counter into N keys: video:123:0, video:123:1video:123:N-1.

The Write Path (Random Distribution)

When a write comes in, we pick a random shard. This distributes the load evenly across the Redis cluster (if using Redis Cluster) or just reduces contention on a single key.

import random

def increment_likes(video_id, n_shards=100):
  shard_id = random.randint(0, n_shards - 1)
  key = f"video:{video_id}:{shard_id}"
  # INCR is atomic
  redis.incr(key)

The Read Path (Scatter-Gather)

To get the total likes, we sum all shards. This is slower than reading one key, but since Reads << Writes during a viral spike, it’s an acceptable trade-off.

def get_total_likes(video_id, n_shards=100):
  keys = [f"video:{video_id}:{i}" for i in range(n_shards)]
  # MGET executes in a single round-trip (pipelining)
  values = redis.mget(keys)
  total = sum(int(v) or 0 for v in values)
  return total

7.3 Adaptive Sharding

For most videos, N=1 is sufficient. For “Gangnam Style”, we need N=1000.

  • Heuristic: Start with N=10. Monitor Redis CPU. If CPU > 50% on the node holding the key, increase N.
  • Discovery: How does the reader know N? Store metadata: video:123:meta{shards: 50}.

8. Data Partitioning & Sharding

What if we have a Multi-Region setup (US, EU, ASIA)? If we simply replicate Redis, we get write conflicts.

CRDT (Conflict-free Replicated Data Type):

  • G-Counter (Grow-Only Counter): A distributed counter that only increments.
  • Logic: Each region maintains its own counter array. Merge(A, B) = Max(A[i], B[i]) (if using vectors) or simply Sum(All Regions) if using disjoint sets.
  • Redis Implementation: Redis Enterprise offers CRDT support natively for active-active replication.

9. Reliability, Caching, & Load Balancing

9.1 Handling “Approximate” Counting

For things like “Unique Views”, we don’t need an exact count. Storing 1 Billion IP addresses takes ~4GB.

  • HyperLogLog (HLL): A probabilistic data structure that estimates cardinality with 0.81% error using only ~12KB of memory.
  • Use Case: “1.2M Views” (Exact number doesn’t matter).
  • Redis Support: PFADD and PFCOUNT commands.

9.2 Thundering Herd

If a cache key expires, thousands of readers hit the DB.

  • Solution: Probabilistic Early Expiration. If a key expires in 10s, and we are at 9s, flip a coin. If heads, recompute the value early.

[!NOTE] War Story: The Cache Stampede When a popular celebrity posts online, their data is typically cached. If that cache key suddenly expires or is evicted, millions of read requests might bypass the cache and hit the database simultaneously—a phenomenon known as a “Cache Stampede” or “Thundering Herd.” The database can immediately crash under the load. A common solution is to use “Leases”: the first request that encounters a cache miss is granted a short-term lease to fetch the data from the DB, while all subsequent requests are told to wait or use the stale value until the cache is repopulated.


10. Interactive Decision Visualizer: Single Lock vs Sharded

Experience the difference between a single contended counter and a sharded approach.

  • Single Lock: Requests must wait in line. High latency.
  • Sharded: Requests are processed in parallel. High throughput.

Counter Architecture Simulator

Simulate 1,000 concurrent "Like" requests

Time Elapsed
0.0s
Processed
0 / 1000
Database/Cache Node(s) Activity
Select a mode to start simulation

11. System Walkthrough

Let’s trace a “Like” request from a user in New York.

Scenario: User A likes Video V123

  1. User Action: User A clicks “Like”. Client sends POST /likes.
      { "videoId": "v123", "userId": "u456", "timestamp": 1698765432 }
    
  2. API Gateway: Validates Auth Token. Routes to API Cluster.
  3. Sharding Logic:
    • API computes: shard_id = hash(v123) % 100 = 42.
    • Target Key: video:v123:42.
  4. Redis Write (Hot Path):
    • API executes INCR video:v123:42.
    • Response: 105 (Current shard count).
    • Total Latency so far: 15ms.
  5. Event Queue (Async Path):
    • API pushes message to Kafka Topic likes-events.
        { "event": "LIKE", "video": "v123", "user": "u456", "shard": 42 }
      
  6. Persistence:
    • Worker pulls batch of 100 events.
    • Executes SQL Batch Insert: INSERT INTO likes (user_id, video_id) VALUES ....

12. Low-Level Optimizations

12.1 Redis Pipelining vs Lua

  • Pipelining: We use MGET to read 100 shards in 1 Round Trip Time (RTT). Without pipelining, reading 100 keys would take 100 RTTs (e.g., 50ms vs 5000ms).
  • Lua Scripting: Used for Atomic “Check and Set”. E.g., “Only increment if user hasn’t liked yet”.
      if redis.call("SISMEMBER", KEYS[2], ARGV[1]) == 0 then
    redis.call("INCR", KEYS[1])
    redis.call("SADD", KEYS[2], ARGV[1])
    return 1
      else
    return 0
      end
    

12.2 HyperLogLog Internals

  • HLL works by hashing the input (UserID) and counting the leading zeros in the binary representation.
  • If we see a hash with 10 leading zeros, it’s likely we’ve seen ~2^10 distinct items.
  • We use 16,384 registers (buckets) to average out the variance.

13. Interview Gauntlet

Q1: How do you handle “Unlikes”?

  • Answer: Send a decrement event (-1). In the sharded counter, we just DECR the random shard. In SQL, we DELETE the row.

Q2: How do you migrate from N=10 to N=100 shards?

  • Answer: We don’t need to migrate data. We just start writing to new keys (11-99). The Reader MGET just needs to know to query 0-99.

Q3: What if the API crashes before writing to Kafka?

  • Answer: We lose the “Durable Record” (User liked Video), but the Counter in Redis is already updated. The system is eventually consistent. For ads/money, we might need a distributed transaction (Saga), but for Likes, this trade-off is acceptable.

Q4: How does Instagram handle “Private Account” likes?

  • Answer: Authorization check first. The “Fanout” service checks if the viewer follows the private account before delivering the notification.

Q5: Can we use DynamoDB Atomic Counter?

  • Answer: Yes, but DynamoDB writes are expensive ($). Redis is cheaper for high-frequency updates. We usually buffer in Redis and flush to DynamoDB.

14. Summary: The Whiteboard Strategy

If asked to design this in an interview, draw this 4-Quadrant Layout:

1. Requirements

  • Func: Ingest, Query, Dedup.
  • Non-Func: High Write Throughput, Low Latency.
  • Scale: 1M+ writes/sec.

2. Architecture

[Client] → [API] → [Sharded Redis]
(Async) → [Kafka] → [Worker] → [SQL]
  • Sharding: Key: video:123:shard_N.
  • Write-Behind: Buffer writes in RAM, flush to Disk later.

3. Data & API

POST /like → { videoId, userId }
Redis: INCR video:123:5
SQL: (user_id, video_id, timestamp)

4. Deep Dives

  • Sharding: Solves single-key contention.
  • Approximate: Use HyperLogLog for unique views.
  • Consistency: AP system (Eventual Consistency).