Key Takeaways
- In streaming architectures, any gaps in non-functional requirements can be very unforgiving. Data engineers spend much of their time-fighting fires and keeping systems up if they don't build these systems with the “ilities” as first-class citizens, and by “ilities,” I mean nonfunctional requirements such as scalability, reliability, and operability. If you don’t build your systems with the -ilities as first class citizens, you pay a steep operational tax.
- To build a reliable streaming data pipeline, conceptualize the streaming pipeline as a series of links in a chain where each link is transactional. Links are connected via Kafka topics, each of which provide transactional guarantees. Once the links are combined, the pipeline will be transactional.
- The 2 most important top-level metrics for any streaming data pipeline are lag and loss. Lag expresses the amount of message delay in a system. Loss measures the magnitude of loss as messages transit the system.
- Most streaming data use-cases require low latency (i.e. low end-to-end message lag) but they also require low or zero loss. It is important to understand that performance penalties do exist when building a loss-less pipeline – i.e. to build for zero loss, you need to give up some speed. However, there are some strategies to minimize lag over an aggregate of messages (i.e. increase throughput via parallel processing). By doing this, we may have a lag floor, but we can still maximize throughput.
- Auto-scaling is key to maximizing throughput. When picking a metric for autoscaling, make sure we pick a metric that increases with increased traffic and decreases with increased scale out (a.k.a. increased parallel processing). Avg CPU is often good enough for this.
At QCon Plus 2021 last November, Sid Anand, Chief Architect at Datazoom and PMC Member at Apache Airflow, presented on building high-fidelity nearline data streams as a service within a lean team. In this talk, Sid provides a master class on building high-fidelity data streams from the ground up.
Introduction
In our world today, machine intelligence and personalization drive engaging experiences online. Whether that's a learn-to-rank system that improves search quality at your favorite search engine, a recommender system that recommends music, or movies, recommender systems that recommend who you should follow, or ranking systems that re-rank a feed on your social platform of choice. Disparate data is constantly being connected to drive predictions that keep us engaged. While it may seem that some magical SQL join powers these connections, the reality is that data growth has made it impractical to store all of this data in a single DB. Ten years ago, we used a single monolithic DB to store data, but today, the picture below is more representative of the modern data architectures we see.
It is a collection of point solutions tied together by one or more data movement infrastructure services. How do companies manage the complexity below? A key piece to the puzzle is data movement, which usually comes in two forms, either batch processing or stream processing.
What makes streams hard?
There are lots of moving parts in the picture above. It has a large surface area, meaning there are many places where errors can occur. In streaming architectures, any gaps in non-functional requirements can be very unforgiving. Data engineers spend much of their time-fighting fires and keeping systems up if they don't build these systems with the “ilities” as first-class citizens; by “ilities,” I mean nonfunctional requirements such as scalability, reliability, observability, operability, and the like. To underscore this point, you will pay a steep operational tax in the form of data pipeline outages and disruptions if your team does not build data systems with “ilities” as first-class citizens. Disruptions and outages typically translate into unhappy customers and burnt-out team members that eventually leave your team. Let’s dive in and build a streaming system.
Start Simple
This is an ambitious endeavor. As with any large project or endeavor, I like to start simple. Let's start by defining a goal:
We aim to build a system that delivers messages from source S to destination D
First, let's decouple S and D by putting a message broker between them. This is not a controversial decision; it’s conventional and used worldwide. In this case, I've picked Kafka as my technology, but you can use any message brokering system. In this system, I've created a single topic called E, to signify events that will flow through this topic. I've decoupled S and D with this event topic. This means that if D fails, S can continue to publish messages to E. If S fails, D can continue to consume messages from E.
Let's make a few more implementation decisions about this system. Let's run our system on a cloud platform. For many, that just means running it on a publicly available cloud platform like AWS or GCP. It can also mean running it on Kubernetes on-prem, or some mix of the two. Additionally, to start with, let's operate at a low scale. This means that we can run Kafka with a single partition in our topic. However, since we want things to be reliable, let's run three brokers split across three availability zones, and set the RF or replication factor to 3. Additionally, we will run S and D on single but separate EC2 instances to increase the availability of our system. Let's provide our stream as a service to make things more interesting. This means we can accept inbound messages at an API endpoint hosted at process S. When messages arrive, S will process them and send them to event topic E. Process D will consume from event topic E and send the message to some third-party endpoint on the internet.
Reliability
The first question I ask is: ”is this system reliable?” Let's revise our goal:
We want to build a system that delivers messages reliably from S to D.
To make this goal more concrete; I’d like to add the following requirement:
I want zero message loss
What does this requirement mean for our design? It means that once process S has acknowledged a message to a remote sender, D must deliver that message to a remote receiver. How do we build reliability into our system? Let's first generalize our system. Instead of just S and D, let's say we have three processes, A, B, and C, all of which are connected via Kafka topics. To make this system reliable, let's treat this linear topology as a chain. Like any chain, it's only as strong as its weakest link. If each process or link is transactional in nature, this chain will be transactional. My definition of “transactionality” is at least once delivery since this is the most common way that Kafka is used today. How do we make each link transactional? Let's first break this chain into its component processing links. First, we have A: A is an ingest node. Then we have B: B is an internal node. It reads from Kafka and writes to Kafka. Finally, we have C: C is an expel or egest node. It reads from Kafka and sends messages out to the internet.
How do we make A reliable? A will receive a request via its REST endpoint, process the message m1, and reliably send the data to Kafka as a Kafka event. Then, A will send an HTTP response back to its caller. To reliably send data to Kafka, A will need to call kProducer.send with two arguments: a topic and a message. A will then immediately need to call flush, which will flush internal Kafka buffers and force m1 to be sent to all three brokers. Since we have producer config acks=all, A will wait for an acknowledgment of success from all three brokers before it can respond to its caller.
How about C? What does C need to do to be reliable? C needs to read data, typically a batch from Kafka, do some processing on it, and then reliably send data out. In this case, reliably means that it will need to wait for a 200 OK response code from some external service. Upon receiving that, process C will manually move its Kafka checkpoint forward. If there are any problems, process C will negatively ACK (i.e. NACK) Kafka, forcing B to reread the same data. Finally, what does B need to do? B is a combination of A, and C. B needs to act like a reliable Kafka Producer like A, and it also needs to act like a reliable Kafka consumer like C.
What can we say about the reliability of our system now? What happens if a process crashes? If A crashes, we will have a complete outage at ingest. That means our system will not accept any new messages. If instead C crashes, this service will stop delivering messages to external consumers; However, that does mean A will continue to receive messages and save them to Kafka. B will continue to process them but C will not deliver them until process C recovers. The most common solution to this problem is to wrap each service in an autoscaling group of size T. If we do so, then each of the groups can handle T-1 concurrent failures. While the term “autoscaling group” was coined by Amazon, autoscaling groups are available today on all cloud platforms and in Kubernetes.
Lag
For now, we appear to have a pretty reliable data stream. How do we measure its reliability? This brings us to observability. In streaming systems, there are two primary quality metrics that we care about : lag and loss. You might be familiar with these metrics if you've worked in streams before. If you're new to it, I will give you an introduction.
Firstly, let's start with lag.
Lag is simply a measure of message delay in a system.
The longer a message takes to transit a system, the greater its lag. The greater the lag, the greater the impact on a business, especially one that depends on low-latency insights. Our goal is to minimize lag in order to deliver insights as quickly as possible. How do we compute lag? To start with, let's discuss one of the concepts called event time. Event time is the creation time of a message or event. Event time is typically stored within the message body and travels with the message as it transits our system. Lag can be calculated for any message m1 at any node N in the system using the equation shown below:
Let's look at a real example.
Let's say we have a message created at noon (T0). That message arrives at our system at node A at 12:01 p.m (T1). Node A processes the message and sends it to node B. The message arrives at node B at 12:04 p.m (T3). B processes it and sends it to C, receiving it at 12:10 p.m (T5). In turn, C processes the message and sends it on its way. Using the equation from the page before, we see that T1-T0 is one minute, T3-T0 is four minutes, and so on; we can compute the lag of message m1 and node C at 10 minutes.
In practice, lag in these systems is not on the order of minutes but on the order of milliseconds.
One thing I will mention is that we've been talking about message arrival at these nodes; hence, this is called arrival lag or lag-in. Another thing to observe is that lag is cumulative. That means the lag computed at node C accounts for the lag upstream of it in both nodes A and B. Similarly, the lag computed at node B accounts for the lag upstream of it at node A.
Aside from arrival lag, there is another type of lag called departure lag. Departure lag is calculated when messages leave nodes. Similar to how we calculated arrival lag, we can compute departure lag. Referring to the image below, we have computed departure lag at all nodes A, B, and C as T2, T4, and T6, respectively.
The single most important metric for lag in any streaming system is called the end-to-end lag (a.k.a. E2E Lag). E2E lag is the total time a message spends in the system. E2E lag is straightforward to compute as it is simply the departure lag at the last node in the system. Hence, it's the departure lag at node C (i.e. 10ms).
While knowing the lag for a particular message (ml) is interesting, it's of little use when we deal with billions or trillions of messages. Instead, we leverage statistics to capture population behavior. I prefer the use of the 95th (or 99th) percentiles (a.k.a. P95). Many people prefer Max or P99.
Let's look at some of the statistics we can build. We can compute the end-to-end lag over the population at P95. We can also compute the lag-in or lag-out at any node. We can compute what's called a process duration with lag-out and lag-in: this is the time spent at any node in the chain. Why is that useful?
Let's have a look at a real example. Imagine a linear topology as shown in the image above; we have a message that hits a system with four nodes, the red node, green node, blue node, and finally, an orange node. This is actually a system we run in production. The graph above is taken from CloudWatch for our production service. As you can see, we took the process duration for each node and put it in a pie chart. This gives us the lag contribution of each node in the system. The lag contribution is approximately equal for each node. No single node stands out. This is a very well-tuned system. If we take this pie chart and spread it out over time, we get the graph on the right, which shows us that the performance is consistent over time. Hence, we have a well-tuned system that performs consistently over time.
Loss
Now that we've talked about lag, what about loss? Loss is simply a measure of messages lost while transiting a system. Messages can be lost for various reasons, most of which we can mitigate—the greater the loss, the lower the data quality. Hence, our goal is to minimize loss to deliver high-quality insights. You may be asking yourself “how do we compute loss in a streaming system?” Loss can be computed as the set difference of messages between any two points in the system. If we look at our topology from before, the one difference you see here is that we have a collection of ten messages transiting the system instead of a single message transiting the system.
We can use the following loss table to compute loss. Each row in the loss table is a message; each column is a node in the chain. As a message transits the system, we tally a 1. For example, message one is successfully transited through the entire system, so there's a 1 in each column. Message 2 also transits each node successfully in our system. However, message 3, while successfully processed at the red node, does not make it to the green node. Therefore, it doesn't reach the blue or the yellow node. At the bottom, we can compute the loss per node. Then, on the lower right, as you can see, we can compute the end-to-end loss in our system, which in this case is 50%.
In a streaming data system, messages never stop flowing. How do we know when to count? The key is to allocate messages to 1-minute wide time buckets using message event time. For example, at the 12:34 minute of a day, we can compute a loss table, which includes all the messages whose event times fall in the 12:34 minute. Similarly, we can do this at other times in the day. Let's imagine that right now; the time is 12:40 p.m. As we know, in these systems, messages may arrive late. We can see four of the tables are still getting updates to their tallies. However, we may notice that the 12:35 p.m. table is no longer changing; therefore, all messages that will arrive have arrived. At this point, we can compute loss. Any table before this time, we can age out and drop. This allows us to scale the loss calculation by trimming tables we no longer need for computation.
To summarize, we wait a few minutes for messages to transit and then compute loss. Then we raise a loss alarm if loss occurs over a configured threshold, for example, 1%. With lag and loss explained, we have a way to measure the reliability and latency of our system.
Performance
Have we tuned our system for performance yet? Let's revise our goal. We want to build a system that can deliver messages reliably from S to D with low latency. To understand streaming system performance, we need to understand the components of end-to-end lag. The first component I'd like to mention is called the ingest time. The ingest time is the time from the last byte in the request to the first byte out of the response. This time includes any overhead we incur in reliably sending messages to Kafka. At the end of our pipeline, we have something called the expel time or the egest time. This is the time to process and egest a message at D. Any time between these two is called transit time. All three times together make up the end-to-end lag.
Performance Penalties
When discussing performance, we must acknowledge that there are certain performance penalties that we must incur in order to build a lossless system.. In other words, we need to trade off latency for reliability. Let's look at some of these penalties.
The first penalty is the ingest penalty. In the name of reliability, S needs to call kProducer.flush on every inbound API request. S also needs to wait for three acks from Kafka before sending its API response to clients. While we cannot remove these ingest performance penalties, we can amortize their cost by leveraging batching. In other words, we can maximize throughput over a constant per-API-call latency by supporting and promoting batch APIs; therefore, we get multiple messages per web request.
Similarly, we have something called the expel penalty. There's an observation we need to consider. Kafka is very fast. It is many orders of magnitude faster than typical HTTP round trip times (RTTs) over the public internet. In fact, the majority of the expel time is the HTTP round trip time. Again, we will use an amortization approach. In each D node, we will add batch and parallelism. We will read a batch from Kafka; then, we will re-batch them into smaller batches and use parallel threads to send these out to their destinations. This way, we can maximize throughput and minimize the expel cost or penalty per message.
Last but not least, we have something called a retry penalty. In order to run a zero-loss pipeline, we need to retry messages at D that will succeed given enough attempts. We have no control over remote endpoints called by D. These endpoints can suffer from transient failures; they may also throttle D from time to time. There might be other things going on that we have no control over. We have to determine whether we can succeed through retries. We call these types of failures recoverable failures. However, there are also some types of cases or errors that are not recoverable. For example, if we receive a 4xx HTTP response code, except for 429s (i.e. a common throttling response code), we should not retry because they will not succeed even with retries. To summarize, to handle the retry penalty, we have to pay some latency penalty on retry. We need to be smart about what we retry, which we've already talked about. We're not going to retry any non-recoverable failures. We also have to be smart about how we retry.
Performance – Tiered Retries
One idea that I use is called tiered retries – refer to the architecture below.. In this approach, we have two tiers: a local retry tier and a global retry tier. In the local tier, we try to send a message a configurable number of times at D with low inter-attempt pauses. The goal of this tier is to try to deliver messages during short-lived and transient outages at the remote destination.
If we exhaust local retries, D transfers the message to a global retrier service (gr). The global retrier then retries the message over a longer span of time. The goal of this tier is to try to outlast longer-lived outages with the hope of successfully delivering messages. By handing off this responsibility to the global retrier, service D can focus on delivering messages that are not facing issues. Note that service D may send messages to different remote services or endpoints. Hence, while one of the remote destinations may be facing an outage, others may be fully functional. Since this is a streaming system, the global retrier and service D are separated by Kafka topics, RI (Retry_In) and RO (Retry_Out).
The beauty of this approach is that in practice, we typically see much less than 1% global retries, typically much less than 0.1%. Therefore, even though these take longer, they don't affect our P95 end-to-end lag.
Scalability
At this point, we have a system that works well at a low scale. How does this system scale with increasing traffic? First, let's dispel a myth. There is no such thing as a system that can handle infinite scale. Many believe that you can achieve this type of goal by moving to AWS or some other hosted platform. The reality is that each account has some limits on it, so your traffic and throughput will be capped. In essence, each system is traffic-rated, no matter where it runs. The traffic rating is measured by running a load test against the system. We only achieve higher scale by iteratively running load tests and removing bottlenecks. When autoscaling, especially for data streams, we usually have two goals. Goal 1 is automatically scaling out to maintain low latency, for example, to minimize end-to-end lag. Goal 2 is to scale in and to reduce cost. For now, I'm going to focus on Goal 1. When autoscaling, there are a few things to consider; firstly, what can we auto-scale? At least for the last ten years or so, we have been able to auto-scale, at least in Amazon, compute. All of our compute nodes are auto-scalable. What about Kafka? Kafka currently does not support autoscaling, but it is something they're working on.
The most crucial part of autoscaling is picking the right metric to trigger autoscaling actions. To do that, we have to select a metric that preserves low latency, that goes up as traffic increases and goes down as the microservice scales out. In my experience, average CPU is the best measure. There are a few things to be wary of however. If your code has locks, code synchronization, or IO waits, there will be an issue. You may not be able to saturate the CPU on your box. As traffic increases, your CPU will reach a plateau. When that happens, autoscaling will stop, and your latency will increase. If you see this in your system, the simple way around it is to just lower your threshold below the CPU plateau. That will get you around this problem.
Conclusion
At this point, we have a system with non-functional requirements that we desire. While we've covered many key elements, we've left out many more: isolation, multi-level autoscaling with containers, streaming operators, and the cache architecture.