# Design Metrics Aggregation System

## Design Metrics Aggregation System

> Two Sigma 的 [monitoring system](https://www.twosigma.com/articles/building-a-high-throughput-metrics-system-using-open-source-software/), 讲得非常好！

> 这个 [video](https://www.youtube.com/watch?v=UEJ6xq4frEw) 实际上是模仿 Two Sigma 做的。

> [这篇](https://betterprogramming.pub/a-tricky-system-design-interview-question-explain-server-monitoring-c5be0ce54a30)对底层原理解释得不错。

> [这篇](https://1o24bbs.com/t/design-distributed-metrics-logging-system/27161/5) 对上面的进行了总结和讲解，讲得也不错。

***

## Requirement

### Functional Requirement

* add instrumentation for services that runs on hundreds of services in cloud
* collect stats about servers where application runs
* visualize & monitor the metrics

### Non-Functional requirements

* Reliable
* Scalable
* Low end-to-end latency
* Flexible for different schema

***

## High-level design

![](https://2407442552-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lpv9LvHzpublmUWisvz%2Fuploads%2Fgit-blob-625be4ffea8f554c37dab820508eaddb3965eed7%2Fmetrics_aggregation.png?alt=media)

***

## Detailed design

### Query Design

* we can collect query-specific information like what’s shown below
* by storing the data in its raw form, we are able to do exploratory analysis using dynamic “if that, then what?” questions.

```json
{
    query_time: "20190117 09:00::05",
    user: "user1",
    dataset: "x",
    date_range: {
        begin: "20180203",
        end: "20180204"
    },
    duration: 100,
    bytes: 350000000,
    query_param_1: 1.0,
    query_param_2: "log",
    ...
}
```

***

### Database selection - Where should the metrics data be stored?

* Time series database: InfluxDB, graphite
* Elasticsearch

#### Solution: Elasticsearch

* best ELK bundle: **elasticsearch + logstash + kibana**
* easy searching and analysis, and it comes with plugins, like Kibana, that make data analysis and visualization easy.

![](https://2407442552-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lpv9LvHzpublmUWisvz%2Fuploads%2Fgit-blob-8fbc9db7b5f7ef190ae7be27546b908a2e95fd08%2F1.png?alt=media)

***

### Middleware - How do we handle the high throughput?

#### Solution: Kafka

* Elasticsearch wasn’t built to handle 50,000 messages per second (we tested this through internal benchmarks).
* Use a buffer, which is analogous to a water tank.

![](https://2407442552-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lpv9LvHzpublmUWisvz%2Fuploads%2Fgit-blob-e0f097882e3ac05a0f0626e1eda38a4a2d71eb6e%2F2.png?alt=media)

* You can think of a Kafka topic as a **partitioned queue** where the partitions can be written to and read from simultaneously.

***

### How is the metrics data transferred from Kafka to Elasticsearch?

* read data from Kafka and then serve to Elasticsearch require a lot fo application code, handling edge cases and errors
* existing messaging solutions for connecting Kafka and Elasticsearch

![](https://2407442552-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lpv9LvHzpublmUWisvz%2Fuploads%2Fgit-blob-0a8a92f1382c958c8c530cb05bf10552f55ba35e%2F3.png?alt=media)

#### Solution: use logstash as a pipeline

![](https://2407442552-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lpv9LvHzpublmUWisvz%2Fuploads%2Fgit-blob-56b5759e9f655d5888069676a115bff2aa8b2848%2F4.png?alt=media)

* config logstash

```json
input {
  kafka {
    bootstrap_servers => "kafka-server.host:9095"
    topic_id => "usage-topic"
    codec => "json"
    group_id => "consumer-group-a"
  }
}
filter {
  date {
    match => ["query_time", "UNIX_MS"]
    remove_field => ["query_time"]
  }
}
output {
  elasticsearch {
    index => "metrics-%{+YYYY-MM-dd}"
    hosts => ["metrics.elasticsearch.host:443"]
  }
}
```

***

### Ensure high availability

* Use kubernetes to maintain multiple logstash instances
* Two sigma uses marathon since they maintain it locally

![](https://2407442552-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lpv9LvHzpublmUWisvz%2Fuploads%2Fgit-blob-50fd8e8112ba54ea03aa7fb51cff2ea843580169%2F5.png?alt=media)

***

### Data partition

#### Data storage

![](https://2407442552-files.gitbook.io/~/files/v0/b/gitbook-x-prod.appspot.com/o/spaces%2F-Lpv9LvHzpublmUWisvz%2Fuploads%2Fgit-blob-31c6492502fe3245b9abb96ac7eb69962978e0c1%2F6.png?alt=media)

> read this [doc](https://betterprogramming.pub/a-tricky-system-design-interview-question-explain-server-monitoring-c5be0ce54a30).

* Use Write-ahead-log (WAL) to store logs
* compact old files using log-structured merge tree(LSMT).

#### Sharding

* 不能只用 service name 作为 sharding key，否则会有 hot entity issue。
* Because most of time we query by time range to list service instances’ API methods (sample picture below), the **sharding key** can be **“time bucket” + “service name”**, where the `time bucket` is a timestamp rounded to some interval.
* sample query:

```sql
SELECT * FROM metric_readings
WHERE service_name = "Service A"
AND time_bucket IN (1411840800, 1411844400)
AND timestamp >= 1411841700
AND timestamp <= 1411845300;
```

* 如果依然有 hot entity issue，we can append sequence number such as 1,2,3 … to it.
