SYSTEM_DESIGN

System Design: Log Aggregation Pipeline

Design a log aggregation pipeline like the ELK Stack or Datadog that collects, processes, stores, and searches logs from thousands of services at petabyte scale with real-time querying.

16 min readUpdated Jan 15, 2025
system-designlog-aggregationelasticsearchkafkadeveloper-tools

Requirements

Functional Requirements:

  • Collect logs from thousands of services via agents (Filebeat, Fluentd, OpenTelemetry Collector)
  • Parse and enrich logs: extract structured fields from unstructured text, add metadata (service name, environment, region)
  • Index logs for full-text search and field-based filtering
  • Provide a query interface: search by keyword, filter by time range, aggregate by field
  • Set up alerts based on log patterns (error rate exceeds threshold, specific exception appears)
  • Retain logs for configurable periods (hot: 7 days, warm: 30 days, cold: 1 year)

Non-Functional Requirements:

  • Ingest 10 TB of logs per day (100,000+ events/sec) with under 5 seconds end-to-end latency from emission to searchability
  • Query response time under 2 seconds for a 1-hour time window search
  • Durability: no log loss during pipeline component failures (buffering + acknowledgment)
  • 99.9% availability for the query tier; ingest can tolerate brief degradation

Scale Estimation

10 TB/day = 115 MB/sec raw log ingestion. After compression (logs compress 10:1 with gzip): 11.5 MB/sec compressed storage writes. 100,000 events/sec × 1 KB average = 100 MB/sec before compression. Indexing overhead in Elasticsearch: ~3-4x the raw data size for inverted indexes and doc values, so 10 TB/day → 30-40 TB/day of index storage. For 30-day hot retention: 30 TB × 30 days = 900 TB of hot storage. This drives the design toward tiered storage with warm/cold tiers on cheaper media (HDDs or object storage).

High-Level Architecture

The pipeline has four stages: collection, buffering, processing, and storage/query. Log agents (Filebeat, Fluentd, or the OpenTelemetry Collector) run as DaemonSets on every Kubernetes node, tailing log files and shipping events to a message queue. Kafka serves as the durable buffer between collection and processing — it absorbs traffic spikes, provides replay capability for reprocessing, and decouples producer throughput from consumer processing speed.

A stream processing layer (Logstash, Flink, or Vector) consumes from Kafka, enriches and parses logs (grok patterns for unstructured text, JSON parsing for structured logs), and writes to Elasticsearch (for search) and optionally to object storage (S3/GCS for long-term cold storage). Elasticsearch clusters are sized for hot data (NVMe SSDs, 7-day retention); older data is moved to warm nodes (HDDs) via ILM (Index Lifecycle Management) policies, and eventually snapshotted to object storage and deleted from Elasticsearch.

The query tier (Kibana or a custom query API) translates user queries into Elasticsearch DSL queries. Full-text search uses Elasticsearch's inverted index; field filtering uses doc values (column-store). Time-range queries leverage time-based index sharding (one index per day or hour) to limit search scope.

Core Components

Log Collection Agent

Agents handle tail-based collection (following log files with inotify/kqueue), journal scraping (systemd journal), and direct SDK integration (structured logging libraries emit directly to the agent). Key challenges: log rotation (agents must track inode, not filename, to avoid missing logs during rotation), backpressure (if Kafka is slow, agents buffer to disk rather than dropping), and multi-line log handling (stack traces span multiple lines — agents buffer lines with a regex pattern to detect log boundaries). Agents add metadata: hostname, pod name, namespace, container image, environment — reducing query latency by enabling field-based pre-filtering.

Kafka Buffer Layer

Kafka topics are partitioned by service name or log source for parallelism. Replication factor 3 ensures durability (logs are not lost if a Kafka broker dies). Log agents write with acks=all (wait for all in-sync replicas) to prevent data loss. Retention on Kafka: 24 hours (acts as a buffer and replay source if the processing layer needs to reprocess due to a bug in parsing logic). Topic throughput: at 100,000 events/sec with 1 KB average, 10 partitions each handling 10,000 events/sec is sufficient. Consumer groups allow multiple processors (one for Elasticsearch, one for alerting, one for S3 archival) to consume the same stream independently.

Elasticsearch Indexing

Elasticsearch distributes data across shards (horizontal partitions). Time-based index strategy: create one index per day (logs-2025-01-15). Each daily index has 5 primary shards + 1 replica. Shard sizing: target 30-50 GB per shard for optimal performance. Daily index rotation enables simple lifecycle management — rolling old indices to warm nodes is a single API call. The ingest pipeline (Elasticsearch ingest nodes) handles parsing and enrichment, offloading this from the application layer. Field mappings are explicitly defined (not dynamic) to prevent mapping explosions from high-cardinality dynamic fields.

Database Design

Elasticsearch is the primary query store. Each log document: {timestamp, service, level, message, trace_id, span_id, host, k8s_namespace, k8s_pod, [parsed_fields...]}. The message field uses a text mapping (full-text inverted index). Other fields use keyword mapping (exact match, aggregations) or numeric types. High-cardinality fields like trace_id use keyword (for exact lookup) but are not aggregated.

For long-term storage, raw compressed logs are archived to S3 in columnar format (Parquet) for cost-efficient analytics queries via Athena or BigQuery. The Parquet schema matches the Elasticsearch document structure, enabling analytics without Elasticsearch.

API Design

Scaling & Bottlenecks

Elasticsearch indexing throughput is the primary bottleneck. Mitigation: increase bulk indexing batch size (send 1,000-5,000 documents per bulk request rather than one at a time); use index sorting to improve query performance at the cost of indexing speed; tune refresh interval (default 1 second — increase to 5-10 seconds reduces I/O overhead at the cost of query freshness).

Query performance degrades with large time windows scanning many shards. Mitigation: always include a time range in queries to limit shard selection; use shard routing (send queries to the shard holding a specific service's data); consider separate indices per high-volume service to avoid cross-service query interference.

Key Trade-offs

  • Push vs. pull log collection: Push (agents send to Kafka) is simpler and lower latency; pull (central scraper reads from service endpoints) is more resilient to agent failures but harder to implement for file-based logs
  • Structured vs. unstructured logs: Structured (JSON) logs are faster to parse and query but require application changes; unstructured logs with grok parsing work with existing apps but parsing is brittle and CPU-intensive
  • Index now vs. search-time parsing: Pre-indexing all fields enables fast queries but consumes 3-4x storage; storing raw logs and parsing at query time (like Loki) saves storage but increases query latency
  • Hot/warm/cold tiering vs. flat storage: Tiering dramatically reduces storage costs (object storage is 100x cheaper than NVMe SSDs) but adds operational complexity and increases cold query latency from seconds to minutes

GO DEEPER

Master this topic in our 12-week cohort

Our Advanced System Design cohort covers this and 11 other deep-dive topics with live sessions, assignments, and expert feedback.