Design Metrics Aggregation System

Design Metrics Aggregation System

Two Sigma 的 monitoring system, 讲得非常好!

这个 video 实际上是模仿 Two Sigma 做的。

这篇对底层原理解释得不错。

这篇 对上面的进行了总结和讲解,讲得也不错。


Requirement

Functional Requirement

  • add instrumentation for services that runs on hundreds of services in cloud

  • collect stats about servers where application runs

  • visualize & monitor the metrics

Non-Functional requirements

  • Reliable

  • Scalable

  • Low end-to-end latency

  • Flexible for different schema


High-level design


Detailed design

Query Design

  • we can collect query-specific information like what’s shown below

  • by storing the data in its raw form, we are able to do exploratory analysis using dynamic “if that, then what?” questions.

{
    query_time: "20190117 09:00::05",
    user: "user1",
    dataset: "x",
    date_range: {
        begin: "20180203",
        end: "20180204"
    },
    duration: 100,
    bytes: 350000000,
    query_param_1: 1.0,
    query_param_2: "log",
    ...
}

Database selection - Where should the metrics data be stored?

  • Time series database: InfluxDB, graphite

  • Elasticsearch

Solution: Elasticsearch

  • best ELK bundle: elasticsearch + logstash + kibana

  • easy searching and analysis, and it comes with plugins, like Kibana, that make data analysis and visualization easy.


Middleware - How do we handle the high throughput?

Solution: Kafka

  • Elasticsearch wasn’t built to handle 50,000 messages per second (we tested this through internal benchmarks).

  • Use a buffer, which is analogous to a water tank.

  • You can think of a Kafka topic as a partitioned queue where the partitions can be written to and read from simultaneously.


How is the metrics data transferred from Kafka to Elasticsearch?

  • read data from Kafka and then serve to Elasticsearch require a lot fo application code, handling edge cases and errors

  • existing messaging solutions for connecting Kafka and Elasticsearch

Solution: use logstash as a pipeline

  • config logstash

input {
  kafka {
    bootstrap_servers => "kafka-server.host:9095"
    topic_id => "usage-topic"
    codec => "json"
    group_id => "consumer-group-a"
  }
}
filter {
  date {
    match => ["query_time", "UNIX_MS"]
    remove_field => ["query_time"]
  }
}
output {
  elasticsearch {
    index => "metrics-%{+YYYY-MM-dd}"
    hosts => ["metrics.elasticsearch.host:443"]
  }
}

Ensure high availability

  • Use kubernetes to maintain multiple logstash instances

  • Two sigma uses marathon since they maintain it locally


Data partition

Data storage

read this doc.

  • Use Write-ahead-log (WAL) to store logs

  • compact old files using log-structured merge tree(LSMT).

Sharding

  • 不能只用 service name 作为 sharding key,否则会有 hot entity issue。

  • Because most of time we query by time range to list service instances’ API methods (sample picture below), the sharding key can be “time bucket” + “service name”, where the time bucket is a timestamp rounded to some interval.

  • sample query:

SELECT * FROM metric_readings
WHERE service_name = "Service A"
AND time_bucket IN (1411840800, 1411844400)
AND timestamp >= 1411841700
AND timestamp <= 1411845300;
  • 如果依然有 hot entity issue,we can append sequence number such as 1,2,3 … to it.

Last updated