SYSTEM_DESIGN
System Design: Distributed Metrics Collection
Design a distributed metrics collection system that aggregates counters, gauges, and histograms from millions of sources with sub-minute resolution, supporting both push and pull ingestion at petabyte scale.
Requirements
Functional Requirements:
- Ingest counters, gauges, histograms, and summaries from millions of clients (services, IoT devices, mobile apps)
- Support both push (UDP/HTTP from clients) and pull (scraping) ingestion
- Aggregate metrics in real time: compute rates, percentiles, and sums over configurable windows
- Store downsampled metrics at multiple resolutions: 10s, 1m, 5m, 1h, 1d
- Query API: fetch metric values for time ranges, with filtering and aggregation by dimension/tag
- Cardinality management: limit unique time series per metric to prevent cardinality explosions
Non-Functional Requirements:
- Ingest 10 million metrics/sec (10M data points/sec) globally
- Query latency under 1 second for 24-hour window with 1-minute resolution
- Store 5 years of downsampled metric history
- Handle unreliable clients: missing data points treated as null (not as zero)
Scale Estimation
10M metrics/sec × 8 bytes (float64) + ~50 bytes tags = ~580 MB/sec raw. At 10-second resolution stored: 10M samples/sec × 10s window = 100M data points per window. After compression (delta encoding + gzip): 10:1 ratio = 58 MB/sec compressed writes. For 5-year retention at 1-minute resolution: 10M series × 5 years × 525,600 minutes/year × 8 bytes = ~210 TB compressed. The critical architectural challenge: 10M unique time series × metadata per series = 1 TB of series index — must be distributed.
High-Level Architecture
The system follows an ingest → aggregate → store → query pipeline. Ingest layer: stateless receivers accept metric data from clients via UDP (StatsD protocol, best-effort), HTTP (line protocol, Prometheus remote write), or scraping (pull-based, like Prometheus). Multiple ingest nodes process data in parallel, routed by consistent hashing on the metric key (service + metric name + tags) to ensure data for the same series lands on the same aggregation node.
Aggregation layer: each aggregation node maintains an in-memory sketch for each assigned series: counters are summed, gauges are tracked (last value), histograms use a streaming percentile algorithm (DDSketch or t-Digest). At the end of each resolution window (10 seconds), the aggregated values are flushed to the storage layer. This pre-aggregation is critical: without it, the storage layer would receive 10M raw writes/sec; after aggregation, it receives 10M data points per flush window — the same volume, but in batches rather than a stream.
Storage layer: a columnar time-series database (InfluxDB, TimescaleDB, or ClickHouse) stores aggregated data points. The schema is wide: (series_id, timestamp, value). Series metadata (metric name, tags) is stored in a separate index (inverted index: tag_key → tag_value → series_id list). ClickHouse's columnar storage and vectorized query execution make it well-suited for time-series analytics — queries that aggregate millions of rows complete in seconds.
Core Components
Consistent Hash Ring for Ingest
The ingest layer uses consistent hashing to route metric writes to aggregation nodes. The hash function maps (metric_name + sorted_tags) to a position on the ring. Each aggregation node owns a range of the ring. When a client pushes cpu.usage{host=web-01, region=us-east}, the ingest node hashes the metric key and forwards to the responsible aggregation node. Virtual nodes (150 virtual nodes per physical node) ensure even load distribution. When an aggregation node fails, its ring range is covered by the next node on the ring — the in-flight metrics for that window may be lost (acceptable: a 10-second gap in metrics is better than system-wide failure).
DDSketch for Histogram Aggregation
Histograms (e.g., HTTP request latency distribution) cannot be averaged across instances — percentiles are not additive. DDSketch (Distributed Distribution Sketch) is a mergeable data structure: each client computes a local DDSketch, and sketches from all instances can be merged to compute accurate global percentiles. DDSketch divides the value range into logarithmically-spaced buckets; values are counted in their bucket. To compute p99: find the bucket where 99% of the cumulative count falls. Merge operation: sum counts in matching buckets from multiple sketches. Error guarantee: the relative error on percentile estimates is bounded (e.g., 1% relative error for p99).
Downsampling and Retention
Raw 10-second resolution data is retained for 7 days. Downsampling jobs run continuously, aggregating 10s data into 1m, 1m into 5m, 5m into 1h, and 1h into 1d. Downsampling uses configurable aggregation functions per metric type: counters use SUM (rate can be recomputed from summed counts), gauges use AVG/MIN/MAX, histograms use MERGE (DDSketch merge). Downsampling jobs are implemented as streaming queries on ClickHouse materialized views — new raw data is automatically rolled up into higher-resolution tables by the database engine, without separate batch jobs.
Database Design
ClickHouse schema: metrics_raw (metric_id UInt64, timestamp DateTime, value Float64) with a primary key of (metric_id, timestamp) and a MergeTree engine ordered by (metric_id, timestamp) — optimal for time-range queries per series. A separate series_index table: (metric_id, metric_name, tags Map(String, String), created_at) maps human-readable metric identifiers to numeric IDs. Tag lookup (find all series with region=us-east) uses ClickHouse's array and map data types with bloom filter indexes for efficient filtering.
Metrics older than 7 days (raw) and older than 30 days (1m resolution) are expired via TTL policies: TTL timestamp + INTERVAL 7 DAY DELETE on the raw table. This automatic expiry avoids manual partition management.
API Design
Scaling & Bottlenecks
The series index (mapping metric keys to series IDs) is a bottleneck at high cardinality. At 10M unique series, the index must handle 10M lookups/sec (one per incoming metric). Mitigation: cache the metric_key → series_id mapping in an in-memory hash map on each ingest node (populated at startup from the database); invalidate entries on series expiry. Memory usage: 10M entries × 100 bytes = 1 GB per ingest node — manageable.
ClickHouse write throughput scales horizontally: add shards and distribute time series across shards by series_id hash. The bottleneck shifts to the coordination layer (routing writes to the right shard). At 10M data points per 10-second flush, each shard receives 10M / N data points per flush — at 100 shards, each shard handles 100,000 data points per 10-second flush, trivially within ClickHouse's write capacity.
Key Trade-offs
- Push vs. pull ingestion: Push (clients send metrics) works for ephemeral processes and mobile clients; pull (scraping) is better for server-side services but can't reach clients behind NAT
- Pre-aggregation vs. raw storage: Pre-aggregation (at ingest time, sum counters over 10s windows) dramatically reduces storage but permanently loses the raw data — no ability to compute different time windows retrospectively
- Exact vs. approximate percentiles: Exact percentile computation requires storing all raw values (O(n) memory per series); approximate algorithms (DDSketch, t-Digest) use O(1/ε) memory with bounded error — the right trade-off for high-cardinality metric systems
- Cardinality limits: Enforcing per-metric cardinality limits (reject new series if a metric already has >10,000 unique label combinations) prevents cardinality explosions that can OOM the series index, but may reject legitimate high-cardinality use cases
GO DEEPER
Master this topic in our 12-week cohort
Our Advanced System Design cohort covers this and 11 other deep-dive topics with live sessions, assignments, and expert feedback.