系统设计系列讲解 Design distributed metrics logging system

https://www.twosigma.com/articles/building-a-high-throughput-metrics-system-using-open-source-software/

A Tricky System Design Interview Question: Explain Server Monitoring

Test your system design abilities before the next interview

Photo from Google Cloud.

I read this system design interview question online and wanted to write about it. When I initially tried to answer this question, I embarrassingly didn’t have a good way to structure my thoughts. I had a couple of ideas floating around, but I didn’t know where to start. I later realized that it was probably because server monitoring was so prevalent in modern web development that I had been taking it for granted. So I seemed to know a good deal about it, but I couldn’t really connect the dots. I had to take the time to do a bit of research and flesh out my thoughts.

If you got stuck when you first saw this question, don’t worry. It’s normal.

Usually, the first step in a system design interview is to ask for use case clarification. I don’t think that helps much here since we all know what a server monitoring system looks like. Asking for confirmation and directional guidance as we progress through the discussion may be a better strategy. It shows our knowledge depth and enables us to ask more concrete and intelligent questions.

What To Monitor

There are literally thousands of things we can measure and monitor on a server.

We can define application layer metrics. For example, how many POST requests on a certain path has the server received? How many concurrent requests is the server dealing with? What’s the latency distribution of a particular method? How long is the unprocessed message queue? What’s the cache hit rate? These metrics are defined by the application and also tracked by the application in in-memory counters.

We can monitor language-/execution-specific metrics. I like Golang, so I’ll use that for illustration. For example, we want to know:

  • Which Go version it is.
  • How many concurrent Goroutines the process has.
  • When the last garbage collection was.
  • How many bytes of memory the process has allocated and freed.
  • What the heap size is.

Golang has a runtime package that provides methods for obtaining this kind of information.

We can also inspect the host system. For example, what’s the CPU/memory/disk usage? How many processes are running on the host? Are the filesystem, network, and system timer working properly? These kinds of measurements are surfaced by system calls and vary across platforms. We usually don’t collect them in the application process because they’re common to the host and we need higher privileges to access them. That’s when a daemon process comes in. It’s also worth noting that these kinds of metrics are quite expensive to obtain, so we should be careful about the polling frequency.

Now is a good time to pause and ask if there is anything in particular that the interviewer wants us to monitor. The answer is likely no because the gist of the interview question is to test our system design ability. It’s unlikely that the interview wants us to talk about the details of, for example, the filesystem cache stats.

How To Get the Metrics Out — Push vs. Pull

The metrics are now all available on the server through in-memory counters, language runtime, and system calls. How do we get them out of the server?

Before proceeding, we should confirm with the interviewer: Do we need to get the metrics out of the server? Maybe we don’t. In that case, the server may expose an endpoint service with the metrics or it may just save the metrics to local disks and we can SSH onto it to inspect. The response from the interviewer is, however, very likely to be yes. We do want to get the metrics out. Otherwise, we’ll lose access to the metrics when the server is slow or down — just when we need them the most. We’d also want to put the metrics in a centralized place for better global monitoring and alerting.

A fundamental design question we face now is whether we should use push or pull, meaning whether the server should proactively send the metrics out or it should just expose an endpoint and wait reactively for inquiry.

Prometheus (a famous monitoring system) chose to pull, and there is a good blog post about the design choice. It advocates for pulling because it is more convenient. Each server being monitored only needs to gather the metrics in memory and serve them through an endpoint. It doesn’t need to care where the monitoring system is. It doesn’t need to worry about overloading the monitoring system if they send too much and/or too frequently. A global configuration about what to collect and the collection interval can be tuned in the monitoring system.

On the other hand, pushing may be useful in some scenarios (e.g. when a firewall prevents the monitoring system from accessing the server directly).

One disadvantage of pulling that the blog post didn’t mention is that it’s challenging to offer high availability and scalability with a pull-only model. If we’re using push, we can put a load balancer in front of a set of monitoring system replicas and have the servers being monitored send metrics through the load balancer. But with a pull-only model, the metrics are collected directly by a monitoring system instance. We’ll have to shard the metrics deliberately when pulling and deploy backup instances explicitly to support replication and failover. Depending on the environment, this may or may not be a big problem.

Another noteworthy point is that modern monitoring systems usually have more than one layer of push/pull to aggregate metrics in a hierarchy. Sometimes, a hybrid push/pull model will be used between the hierarchical layers to address the shortcomings of a homogeneous retrieval mechanism.

Persisting the Metrics

Now that we’ve managed to get the metrics from the servers being monitored into another system, the next thing is how to store them. Again, always ask for confirmation first to see if it’s necessary. Maybe the requirement is just to have a centralized in-memory metrics repository. In that case, we don’t need to worry about designing a time series database. Nevertheless, time series databases are interesting. Let’s assume that durability is a requirement. It often is, as a monitoring system without a history view isn’t that useful.

We want to store data samples — essentially a value with a timestamp — in chronological order for all time series. A time series is the full timeline view of a particular metric. For example, the P50 latency of POST/myMethod from server instance1 collected in every 15-second interval is a time series.

Our first intuition should be that we absolutely cannot just write individual data samples to files as they arrive. That’ll be prohibitively inefficient because the monitoring system may end up collecting a million data samples or more every minute. So some form of batching is crucial. With batching in memory, there comes the risk of losing data. It may not be a big deal if we’re only batching for a short period of time, as typical time series use cases can tolerate the loss of a few data samples. But if we want to batch for a longer period of time, a durability safeguard should be put in place. After all, the most recent data samples are usually the most valuable. We don’t want to be in a position where we lose the last 30 minutes of metrics.

Write-ahead-log (WAL) is the canonical solution to complement in-memory batching for durability. For more details, you can check out the detailed article I wrote on the topic. On a high level, WAL pipes the changes to a log file on disk that can be used to restore the system state after crashing. WAL doesn’t incur a big IO penalty because sequential file access is relatively fast.

Now we can buffer the data samples in memory for a while, which opens doors for better write efficiency. The next thing we should decide is how we structure the data in files. One time series per file sounds like a simple solution, but unfortunately, it won’t scale. There are just too many time series to create individual files for. We have to store multiple time series in a file. On the other hand, we can’t just put everything in a monolithic file. That file will be too big to operate. We need to cut the file in some way. The natural choice here is to cut the file by the time dimension. We can create one file per day or other configurable time window. Let’s call the data in a time window a block. If the data volume in a block is too big for one file, we can shard it across a few files. We also need an index file in each block to tell us which file and what file position to look for in a particular time series. See figure 1 for an illustration.

Figure 1

As data samples arrive in memory, we buffer them until we need to flush them to disk. When we flush them to disk, we organize them in blocks. The block for the most recent data samples typically represents a small time window. As the blocks grow older, we compact them to form longer time windows. This keeps the overall block number in check. What’s more, the old data samples are queried less frequently so we can put them in larger files for easy management. We can also down-sample the data as part of the compaction to reduce overall data volume. The idea of compacting young files into bigger old files is not new. It’s called a log-structured merge tree. LevelDB is probably the most famous implementation of it.

There are also other small but important details. Compression should be employed to reduce the overall data volume. Time series data is an excellent candidate for compression. All the data samples can be expressed as delta from the preceding data samples. Facebook published a paper that describes two particular tricks in time series data compression that led to 10x saving, taking advantage of the fact that adjacent data points are close. In their paper, a timestamp is encoded as delta of delta. The second-order derivative tends to have more zeroes. The floating-point value is encoded as an XOR result and can be restored by (a XOR b) XOR a = b. When two floating-point values are close, their XOR result has a lot of zeroes.

So far, we’ve only been concerned with data in local disks. To grow beyond a single node, we need to scale the storage out. We can either back the data by using a distributed file system (such as HDFS) or redesign the storage logic to adapt to a more natively distributed infrastructure. We won’t have time to go into details here.

Support Querying and Alerting

If we have the storage laid out as in Figure 1, querying is easy to envision. Time is a universal filtering criterion in all metrics searches. We use a time range to narrow the search down to a set of continuous blocks. If we know the time series ID, we can retrieve it from the files in those blocks. Otherwise, we’d need to add a reversed index to go from the search criteria to time series IDs and then follow the index files in blocks to locate the time series files. Once we retrieve the time series data, we can graph them for display.

Alerts can be registered inside the monitoring server. While it sniffs through the incoming data samples, it can alert for any abnormal pattern. We can also decouple them from the monitoring server and deploy them as downstream monitoring servers that only collect and inspect a more selected set of metrics.

Closing

That’ll conclude this system design interview. We’ve definitely covered more than what a typical system design interview would have time for. I hope that you’ve gotten a little bit of inspiration from this article.

References

What To Monitor

There are literally thousands of things we can measure and monitor on a server.

We can define application layer metrics. For example, how many POST requests on a certain path has the server received? How many concurrent requests is the server dealing with? What’s the latency distribution of a particular method? How long is the unprocessed message queue? What’s the cache hit rate? These metrics are defined by the application and also tracked by the application in in-memory counters.

We can monitor language-/execution-specific metrics. I like Golang, so I’ll use that for illustration. For example, we want to know:

  • Which Go version it is.
  • How many concurrent Goroutines the process has.
  • When the last garbage collection was.
  • How many bytes of memory the process has allocated and freed.
  • What the heap size is.

Golang has a runtime package that provides methods for obtaining this kind of information.

We can also inspect the host system. For example, what’s the CPU/memory/disk usage? How many processes are running on the host? Are the filesystem, network, and system timer working properly? These kinds of measurements are surfaced by system calls and vary across platforms. We usually don’t collect them in the application process because they’re common to the host and we need higher privileges to access them. That’s when a daemon process comes in. It’s also worth noting that these kinds of metrics are quite expensive to obtain, so we should be careful about the polling frequency.

Now is a good time to pause and ask if there is anything in particular that the interviewer wants us to monitor. The answer is likely no because the gist of the interview question is to test our system design ability. It’s unlikely that the interview wants us to talk about the details of, for example, the filesystem cache stats.

How To Get the Metrics Out — Push vs. Pull

The metrics are now all available on the server through in-memory counters, language runtime, and system calls. How do we get them out of the server?

Before proceeding, we should confirm with the interviewer: Do we need to get the metrics out of the server? Maybe we don’t. In that case, the server may expose an endpoint service with the metrics or it may just save the metrics to local disks and we can SSH onto it to inspect. The response from the interviewer is, however, very likely to be yes. We do want to get the metrics out. Otherwise, we’ll lose access to the metrics when the server is slow or down — just when we need them the most. We’d also want to put the metrics in a centralized place for better global monitoring and alerting.

A fundamental design question we face now is whether we should use push or pull, meaning whether the server should proactively send the metrics out or it should just expose an endpoint and wait reactively for inquiry.

Prometheus (a famous monitoring system) chose to pull, and there is a good blog post about the design choice. It advocates for pulling because it is more convenient. Each server being monitored only needs to gather the metrics in memory and serve them through an endpoint. It doesn’t need to care where the monitoring system is. It doesn’t need to worry about overloading the monitoring system if they send too much and/or too frequently. A global configuration about what to collect and the collection interval can be tuned in the monitoring system.

On the other hand, pushing may be useful in some scenarios (e.g. when a firewall prevents the monitoring system from accessing the server directly).

One disadvantage of pulling that the blog post didn’t mention is that it’s challenging to offer high availability and scalability with a pull-only model. If we’re using push, we can put a load balancer in front of a set of monitoring system replicas and have the servers being monitored send metrics through the load balancer. But with a pull-only model, the metrics are collected directly by a monitoring system instance. We’ll have to shard the metrics deliberately when pulling and deploy backup instances explicitly to support replication and failover. Depending on the environment, this may or may not be a big problem.

Another noteworthy point is that modern monitoring systems usually have more than one layer of push/pull to aggregate metrics in a hierarchy. Sometimes, a hybrid push/pull model will be used between the hierarchical layers to address the shortcomings of a homogeneous retrieval mechanism.

Persisting the Metrics

Now that we’ve managed to get the metrics from the servers being monitored into another system, the next thing is how to store them. Again, always ask for confirmation first to see if it’s necessary. Maybe the requirement is just to have a centralized in-memory metrics repository. In that case, we don’t need to worry about designing a time series database. Nevertheless, time series databases are interesting. Let’s assume that durability is a requirement. It often is, as a monitoring system without a history view isn’t that useful.

We want to store data samples — essentially a value with a timestamp — in chronological order for all time series. A time series is the full timeline view of a particular metric. For example, the P50 latency of POST/myMethod from server instance1 collected in every 15-second interval is a time series.

Our first intuition should be that we absolutely cannot just write individual data samples to files as they arrive. That’ll be prohibitively inefficient because the monitoring system may end up collecting a million data samples or more every minute. So some form of batching is crucial. With batching in memory, there comes the risk of losing data. It may not be a big deal if we’re only batching for a short period of time, as typical time series use cases can tolerate the loss of a few data samples. But if we want to batch for a longer period of time, a durability safeguard should be put in place. After all, the most recent data samples are usually the most valuable. We don’t want to be in a position where we lose the last 30 minutes of metrics.

Write-ahead-log (WAL) is the canonical solution to complement in-memory batching for durability. For more details, you can check out the detailed article I wrote on the topic. On a high level, WAL pipes the changes to a log file on disk that can be used to restore the system state after crashing. WAL doesn’t incur a big IO penalty because sequential file access is relatively fast.

Now we can buffer the data samples in memory for a while, which opens doors for better write efficiency. The next thing we should decide is how we structure the data in files. One time series per file sounds like a simple solution, but unfortunately, it won’t scale. There are just too many time series to create individual files for. We have to store multiple time series in a file. On the other hand, we can’t just put everything in a monolithic file. That file will be too big to operate. We need to cut the file in some way. The natural choice here is to cut the file by the time dimension. We can create one file per day or other configurable time window. Let’s call the data in a time window a block. If the data volume in a block is too big for one file, we can shard it across a few files. We also need an index file in each block to tell us which file and what file position to look for in a particular time series. See figure 1 for an illustration.

Figure 1

As data samples arrive in memory, we buffer them until we need to flush them to disk. When we flush them to disk, we organize them in blocks. The block for the most recent data samples typically represents a small time window. As the blocks grow older, we compact them to form longer time windows. This keeps the overall block number in check. What’s more, the old data samples are queried less frequently so we can put them in larger files for easy management. We can also down-sample the data as part of the compaction to reduce overall data volume. The idea of compacting young files into bigger old files is not new. It’s called a log-structured merge tree. LevelDB is probably the most famous implementation of it.

There are also other small but important details. Compression should be employed to reduce the overall data volume. Time series data is an excellent candidate for compression. All the data samples can be expressed as delta from the preceding data samples. Facebook published a paper that describes two particular tricks in time series data compression that led to 10x saving, taking advantage of the fact that adjacent data points are close. In their paper, a timestamp is encoded as delta of delta. The second-order derivative tends to have more zeroes. The floating-point value is encoded as an XOR result and can be restored by (a XOR b) XOR a = b. When two floating-point values are close, their XOR result has a lot of zeroes.

So far, we’ve only been concerned with data in local disks. To grow beyond a single node, we need to scale the storage out. We can either back the data by using a distributed file system (such as HDFS) or redesign the storage logic to adapt to a more natively distributed infrastructure. We won’t have time to go into details here.

Support Querying and Alerting

If we have the storage laid out as in Figure 1, querying is easy to envision. Time is a universal filtering criterion in all metrics searches. We use a time range to narrow the search down to a set of continuous blocks. If we know the time series ID, we can retrieve it from the files in those blocks. Otherwise, we’d need to add a reversed index to go from the search criteria to time series IDs and then follow the index files in blocks to locate the time series files. Once we retrieve the time series data, we can graph them for display.

Alerts can be registered inside the monitoring server. While it sniffs through the incoming data samples, it can alert for any abnormal pattern. We can also decouple them from the monitoring server and deploy them as downstream monitoring servers that only collect and inspect a more selected set of metrics.

Closing

That’ll conclude this system design interview. We’ve definitely covered more than what a typical system design interview would have time for. I hope that you’ve gotten a little bit of inspiration from this article.

References

https://eileen-code4fun.medium.com/building-an-append-only-log-from-scratch-e8712b49c924

Append-only log, sometimes called write ahead log, is a fundamental building block in all modem databases. It’s used to persist mutation commands for recovery purposes. A command to change the database’s state will first be recorded in such a log before applying to the database. Should the database crash and lose updates, its state can be restored by applying the commands since the most recent snapshot backup. The log’s append-only nature is partly due to functional design — because we only ever need to add new commands to the end — and partly due to performance need — sequential file access is orders of magnitude faster than random file access.

While there are too many blog posts about this already, I hope to look at it from an implementation point of view. Let’s see how we could build an append-only log from scratch and some of the practical considerations in real production systems. I have the full implementation on Github [1]. It’s implemented in Golang. Feel free to check out the repo and play around with the code.

Record Format

First and foremost, we need to define our record structure in the log. Without loss of generality, our record is just a byte array. We need to store the length as well to aid reading. To detect data corruption, which happens in real life, we should introduce a checksum field.

type Record struct {
    len uint32
    data []byte
    crc uint32
}

The next thing is to marshal the record into bytes so that we can write out to the file. The len occupies 4 bytes, the data occupies len bytes, and the crc occupies the final 4 bytes. It’s also straightforward to transform the marshaled record back to the structure. We read in the first 4 bytes to find out the len, and read in the next len bytes for the data. In the end, we verify that the len and data together yield the same checksum as the stored crc.

Upon start, the program needs to open a dedicated log file. It can either create a new one or open an existing one in append mode. The append-only log has an AppendRecord method, by which callers can submit log records.

Durability vs Throughput

Once a log record is passed in through the AppendRecord method, we could immediately write it to the log file. However, real world systems typically have an application level buffer to batch the log records, and only flush them to file when enough have accumulated. This raises a question of durability. If the server crashes before the log records are flushed to disk, updates will be lost. But if we flush too often, the throughput will be limited. It’s a trade-off that often lends itself in system configurations.

The possibility of losing updates may sound scary. But in fact, even if we immediately write every record to file, there are still chances of losing updates because the operating system often caches data in memory before actually writing to disk. The caching time in a modem OS is around 30 seconds. Unless we explicitly invoke fsync on that file descriptor, upon which the OS has to flush the data to disk. Calling fsync for every record is extremely expensive. The throughput could be thousands of times worse. I did a quick benchmark using my laptop. The diff is indeed astounding.

Test Iterations Cost
BenchmarkSync 1000000000 0.363 ns/op
BenchmarkAsync 1000000000 0.000796 ns/op

Even if we’re willing to pay that performance penalty, our updates are still not 100% safe. The disk drivers often have their own layer of cache and they don’t usually offer a control interface to applications. Thus, there is no such thing as absolutely safe. Even if we use a custom OS that surfaces a control interface to force sync in the disk driver, the disk could run at fault. This is not to say we just don’t care since there is no guarantee, but instead we should evaluate our use cases and analyze what’s at stake and how reliable we need the database to be against the remote chance of failure. Real world systems often end up doing fsync every N times to maintain a balance between durability guarantee and throughput.

Log File Structure

One other thing to note is that so far we just keep writing records consecutively in the log. In reality, we’d want to write in a way that respects page boundary. File content is loaded into memory in chunks for read/write. And paging is the canonical approach in many operating systems. Aligning with the page boundary enables the OS to better operate the file. Page size should be a configurable parameter in our append-only log for portability. If there is not enough space at the end of a page to even store the len, just don’t bother. Pad the remaining bytes with 0 and start a new page. This avoids the awkward situation that we need to keep two pages in memory just to read an integer. Now, you may wonder what if a record is longer than a page size. That’s fine. We can go across pages. A smart trick LevelDB uses is to have a record type that designates whether the record in the page is a FULL record, a FIRST record, a MIDDLE record, or a LAST record. I have another post that talks about many other features in LevelDB [2].

type Record struct {
     type uint8 // FULL, FIRST, MIDDLE, LAST
     len uint32
     data []byte
     crc uint32
}

When a record fits in a page, its type is always FULL. When we need to go across pages, we break the record into segments, each represented by a standalone record. We can then use the record type to denote the start (FIRST), the end (LAST), and the parts in between (MIDDLE).

Log Cleanup

One last thing to mention is that we don’t want the log to go on indefinitely. That’s just waste of disk space. We need to truncate the log when we know the earlier part is no longer needed. In practice, we don’t go back and modify the existing log. We just start a new log file after a new snapshot backup. Then we can discard the old log files.

References

[1] GitHub - eileen-code4fun/SimpleWAL: A demo implementation of write ahead log.

[2] Log Structured Merge Tree (LSM-tree) Implementations (a Demo and LevelDB) | by Eileen Pangu | Medium

The metrics data include “service” (service name), “instance” (instance id or ip address), “code” (http status code), “method” (http method), “path” (url path), “duration” (how much time API request takes), “count” (number of requests in an interval). Because most of time we query by time range to list service instances’ API methods (sample picture below), the sharding key can be **“time bucket” + “service” **, where the time bucket is a timestamp rounded to some interval.
This gives us a known, time-based value to use as a means of further partitioning our data, and also allows us to easily find keys that can be safely archived. The time bucket is an example of a sentinel, and is a useful construct in a number of models where you need better distribution than your natural key provides. When choosing values for your time buckets, a rule of thumb is to select an interval that allows you to perform the bulk of your queries using only two buckets. The more buckets you query, the more nodes will be involved to produce your result. It’s also worth noting that this would be an excellent time to use time-window compaction.
For the query, we want a range from time x to time y for a given service. Since our timestamp is a clustering column, this is possible:

SELECT * FROM metric_readings
WHERE service_name = "Service A"
AND time_bucket IN (1411840800, 1411844400)
AND timestamp >= 1411841700
AND timestamp <= 1411845300;

For hot spot issue (even though very unlikely), we can append sequence number such as 1,2,3 … to it.

When time is all that matters

In the previous example, we were looking for time-ordered data for a given object, in this case a service. But there are cases when what we really need is to simply get a list of the latest readings from all services. We need a different model to address this, because our previous model required that we know which service we were querying. It would be tempting to simply remove service name from the primary key, using only time_bucket as the partition key. The problem with this strategy is that all writes and most reads would be against a single partition key. This would create a single hotspot that would move around the cluster as the interval changed. Keep in mind that a materialized view would result in the same problem, since the view itself would contain hotspots. As a result, it is imperative that you determine some sentinel value that can be used in place of the service name, and that is not time oriented. For example, API method or url path or instance ip address could be a good value. In practice I have found that this use case is rare, or that the real use case requires a queue or cache.

1 Like

@Xavier 如果timestamp + metrics作为sharding key,

  1. 那还是会出现某段时间内某个shard会出现hotspot吧?
  2. 对于in memory的bloomfilter不cover的时间段,搜索某api的 metrics,是不是还得全shard遍历?

Please read above