Design a Distributed Counter System¶
Interview Time: 60 min | Difficulty: Medium
Key Focus: Atomic operations, eventual consistency, zero data loss
Step 1: Functional & Non-Functional Requirements¶
Functional Requirements¶
- Increment counter (e.g., page views, event count, likes)
- Decrement counter (where applicable)
- Read current value
- Get counter statistics (min, max, increments/sec)
- Support counters for millions of entities (videos, posts, products)
- Persistent storage (no loss on restart)
- Idempotent operations (duplicate requests = single increment)
- Partitioned counters (sharded by key for parallelism)
Non-Functional Requirements¶
| Requirement | Target | Notes |
|---|---|---|
| Scale | 1M increments/sec | Distributed counters |
| Latency | Increment <10ms, read <5ms | Very fast operations |
| Availability | 99.99% uptime | No data loss |
| Consistency | Strong eventually | Reads lag <1 second |
| Durability | Zero loss (persist to DB) | Critical for correctness |
| Idempotency | Handle duplicate requests | Network retries safe |
Step 2: API Design, Data Model & High-Level Design¶
Core API Endpoints¶
# Counter Operations
POST /counters/{counter_key}/increment
{amount: 1, request_id: uuid} // request_id for idempotency
→ {counter_value, success: true}
POST /counters/{counter_key}/decrement
{amount: 1, request_id: uuid}
→ {counter_value, success: true}
GET /counters/{counter_key}
→ {counter_value, updated_at, sharded_count: N}
GET /counters/{counter_key}/stats
→ {total, increments_per_sec, lastUpdated}
# Batch Operations
POST /counters/batch-increment
{increments: [{counter_key, amount}, ...]}
→ {results: [{counter_key, new_value}]}
Entity Data Model¶
COUNTERS (main table)
├─ counter_key (VARCHAR, PK, e.g., "video:123:views")
├─ total_count (BIGINT, sum of all shards)
├─ shard_count (INT, number of shards for this counter)
├─ updated_at (TIMESTAMP)
├─ created_at (TIMESTAMP)
COUNTER_SHARDS (one shard per partition)
├─ shard_key (VARCHAR + shard_id, PK, e.g., "video:123:views#1")
├─ counter_key (FK)
├─ shard_id (INT, 0 to N-1)
├─ shard_value (BIGINT, value for this shard)
├─ updated_at (TIMESTAMP)
IDEMPOTENT_REQUESTS (prevent duplicates)
├─ request_id (VARCHAR, PK, UUID)
├─ counter_key (FK)
├─ operation (increment, decrement)
├─ amount (INT)
├─ applied_at (TIMESTAMP)
├─ expires_at (TIMESTAMP + 24 hours)
High-Level Architecture¶
graph TB
CLIENT["Client<br/>(video app)"]
API["Counter API<br/>(increment, read)"]
SHARD_ROUTER["Shard Router<br/>(consistent hash<br/>to shard)"]
SHARD_LAYERS["Shard Layers<br/>(local counters<br/>per shard)"]
REDIS["Redis<br/>(cache, buffering<br/>before DB flush)"]
DB["PostgreSQL<br/>(durable storage)"]
AGGREGATOR["Aggregator Job<br/>(sum shards,<br/>update total)<br/>runs every 1 min"]
CLIENT --> API
API --> SHARD_ROUTER
SHARD_ROUTER --> REDIS
REDIS --> SHARD_LAYERS
SHARD_LAYERS --> DB
AGGREGATOR --> DB
Step 3: Concurrency, Consistency & Scalability¶
🔴 Problem: Counter Contention Under Load¶
Scenario: 1M requests/sec all incrementing counter for same video. Single counter row becomes bottleneck. DB locks, latency explodes!
Solution: Sharded Counters (Partitioning)
Sharded Counter Architecture:
1. Split into N shards (reduce contention):
counter_key = "video:123:views"
shard_count = 10 -- 10 shards
Physical shards stored:
├─ video:123:views#0 = 50000
├─ video:123:views#1 = 49500
├─ video:123:views#2 = 51000
├─ ...
└─ video:123:views#9 = 48500
Total = 50000 + 49500 + 51000 + ... = 500M views
2. Route increment to shard:
request_hash = hash(request_id) % shard_count
→ Route to shard_0, shard_5, shard_3, ... (distributed)
Each shard gets ~1M / 10 = 100K req/sec (manageable)
3. Increment atomically:
shard_id = hash(request_id) % shard_count
UPDATE counter_shards
SET shard_value = shard_value + 1
WHERE shard_key = '{counter_key}#{shard_id}'
Result:
- 10 different rows being updated
- Lock contention reduced 10×
- Latency: ~5ms (vs 100ms with single row)
4. Read total (aggregation):
SELECT SUM(shard_value)
FROM counter_shards
WHERE counter_key = 'video:123:views'
Adds latency: 10ms for SELECT + SUM
→ Cache this, update every 1 minute
🟡 Problem: Duplicate Requests (Network Retries)¶
Scenario: Client increments counter, gets timeout. Retries. Counter incremented twice!
Solution: Idempotent Operations via Request ID
Idempotent Increment:
1. Client sends:
POST /counters/video:123/increment
{
amount: 1,
request_id: "550e8400-e29b-41d4-a716-446655440000" -- UUID, unique per request
}
2. Server deduplicates:
-- Check if request already processed
existing = SELECT * FROM idempotent_requests
WHERE request_id = '550e8400...'
IF existing:
RETURN {counter_value: existing.counter_value, duplicate: true}
ELSE:
-- Process new request
UPDATE counter_shards SET shard_value += 1 WHERE ...
-- Mark as processed
INSERT INTO idempotent_requests (
request_id, counter_key, amount, applied_at
) VALUES ('550e8400...', 'video:123:views', 1, now())
RETURN {counter_value: new_value, duplicate: false}
3. Cleanup (after 24 hours):
Background job:
DELETE FROM idempotent_requests
WHERE expires_at < now() -- auto-expire after 1 day
Result: Old request IDs recycled
🔷 Problem: Consistency Between Cache and Database¶
Scenario: Counter cached as 100 in Redis. DB has 100. User increments 10 times. Cache says 110, DB says 100. Lag!
Solution: Write-Through Cache + Periodic Sync
Cache + DB Sync:
1. Write-through (on increment):
Increment flow:
a) Write to DB (source of truth):
UPDATE counter_shards
SET shard_value = shard_value + 1
b) Update cache immediately:
INCR cache:{counter_key} -- Redis atomic increment
Wait for both to succeed:
IF both succeed:
RETURN success
ELSE:
RETURN error (rollback both if possible)
2. Periodic sync (every 1 min):
Aggregator job:
FOR each active counter:
db_total = SELECT SUM(shard_value) FROM counter_shards ...
cache_total = GET cache:{counter_key}
IF cache_total != db_total:
divergence = abs(db_total - cache_total)
IF divergence > 1000: -- threshold
ALERT "Cache drift > 1000"
-- Resync: DB is source of truth
SET cache:{counter_key} db_total
3. Read-through (on GET):
total = GET cache:{counter_key}
IF total == null (cache miss):
total = SELECT SUM(...) FROM counter_shards
SET cache:{counter_key} total EX 60 -- 1 minute TTL
RETURN total
Consistency guarantee:
- Strong: writes immediately visible (write-through)
- Warm re-sync: every 1 min catches drift
- Data loss: zero (DB is durable)
Step 4: Persistence Layer, Caching & Monitoring¶
Database Design¶
CREATE TABLE counters (
counter_key VARCHAR(255) PRIMARY KEY,
total_count BIGINT DEFAULT 0,
shard_count INT DEFAULT 10,
updated_at TIMESTAMP DEFAULT NOW(),
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE counter_shards (
shard_key VARCHAR(255) PRIMARY KEY,
counter_key VARCHAR(255) NOT NULL,
shard_id INT NOT NULL,
shard_value BIGINT DEFAULT 0,
updated_at TIMESTAMP DEFAULT NOW(),
FOREIGN KEY (counter_key) REFERENCES counters(counter_key)
);
CREATE INDEX idx_counter_shards_counter_key
ON counter_shards(counter_key);
CREATE TABLE idempotent_requests (
request_id VARCHAR(36) PRIMARY KEY,
counter_key VARCHAR(255) NOT NULL,
operation VARCHAR(50),
amount INT,
applied_at TIMESTAMP DEFAULT NOW(),
expires_at TIMESTAMP -- 24 hours from applied_at
);
CREATE INDEX idx_idempotent_expires
ON idempotent_requests(expires_at);
Caching Strategy¶
Redis:
1. Counter Cache (write-through)
Key: "counter:{counter_key}"
Value: {total: 500000000, updated_at}
TTL: 1 hour (sync from DB if missing)
Purpose: Fast reads, no DB query
Hit rate: 99%+ (same counters read repeatedly)
2. Shard Local Cache (hot path)
Key: "shard:{shard_key}"
Value: shard_value (int)
TTL: 1 minute (resync from DB)
Purpose: Avoid DB lock contention on hot shards
Monitoring¶
- alert: CounterCacheDrift
expr: abs(cache_value - db_value) > 1000
annotations: "Cache drifted >1000 from DB – sync issue"
- alert: ShardHotness
expr: single_shard_qps > total_qps / shard_count * 10
annotations: "One shard getting 10× traffic – hash function issue?"
- alert: IdempotentTableSize
expr: idempotent_requests_rows > 100_000_000
annotations: "Idempotent table bloated – cleanup job failing?"
- alert: CounterLag
expr: counter_read_latency_p95 > 100
annotations: "Counter read > 100ms – cache miss storm?"
Key Metrics: - Increment latency (p50, p95, p99) - Cache hit rate (should be >95%) - Drift rate (cache vs DB divergence) - Shard distribution (QPS across shards, should be balanced)
⚡ Quick Reference Cheat Sheet¶
Critical Decisions¶
- Sharded counters – 10-100 shards per counter, reduce lock contention
- Request ID deduplication – Idempotent via UUID, cleanup after 24h
- Write-through cache – Update DB + cache atomically
- Periodic sync – Every 1 min, catch drift early
- Aggregation job – Sum shards, update main counter table
Tech Stack¶
Cache: Redis (atomic INCR, fast reads)
Database: PostgreSQL (durable, ACID)
Scheduler: Cron job (aggregation)
Monitoring: Prometheus (drift detection)
🎯 Interview Summary (5 Minutes)¶
- Sharded counters → N shards per counter, hash request to shard
- Request deduplication → UUID request_id, check + mark in table
- Write-through cache → Update DB + Redis atomically
- Periodic aggregation → Sum shards every 1 min, catch drift
- Idempotent cleanup → Auto-expire request_ids after 24 hours
- Strong consistency → Writes visible immediately (write-through)
- Zero data loss → DB durable, cache is optimization only