Key Takeaways
- Stream processors should be scalable to handle growing business needs of stream data processing.
- Scaling a stream processor in a containerized environment has to balance trade-off between quality-of-service aspects and the associated cost.
- The stream processor should be able to scale horizontally to exploit this trade-off on a containerized environment such as Kubernetes running on a cloud service provider.
- Successful run of a stream processing app in containerized environment depends on the amount of hardware resources provisioned for each stream processor worker.
- Adding more hardware resources than required for the containerized environment does not increase the performance.
Data stream processing has become one of the main paradigms for data analytics in recent times, because of the growing requirement of event stream processing (i.e., stream processing) applications. Different notable applications of stream processing could be found in areas such as telecommunications, vehicle traffic management, crowd management, health informatics, Cyber security, finance, etc.
Stream Processors are software platforms that enable users to process and respond to incoming data streams faster (see What is Stream Processing?). There are a number of stream processors available in the market to choose from (see How to Choose a Stream Processor?). Flink, Heron, Kafka, Samza, Spark Streaming, Storm, and WSO2 Stream Processor are some examples of open source stream processors.
Real-time operation of stream processors is critical to provide a high-quality service in terms of system performance. Most of the modern stream processors can handle 90% of the streaming use cases with few computer nodes. However, with time, due to business expansions most profitable businesses have to handle increasing amounts of workloads. Hence the chosen stream processor requires to be capable of scaling and handling larger workloads easily.
Increasingly, stream processors have been deployed as Software as a Service (SaaS) in cloud computing systems. Some notable examples include Amazon Kinesis Data Analytics, Microsoft Azure Stream Analytics, Google Cloud Dataflow, etc. Cloud-based stream processors provide the ability for elastically scaling the stream processing applications running on them. Container-centric management environments (i.e., containerized environments) such as Kubernetes, used by the cloud service providers allow scalable execution of stream processing applications. However, it is non-trivial when/how to do the scaling of a distributed stream processor in a containerized environment mainly because of the heterogeneity of the hardware/software environment and due to the characteristics of the data streams they process. This article presents a practical real-world example scenario of a data-intensive stream processing application, explains how it can be systematically scaled in Kubernetes, and explains the associated trade-offs. We use WSO2 Stream Processor (WSO2 SP) as the example stream processor in this article since it is an open source, cloud-native stream processor suitable for implementing such applications. However, we believe the same concepts can be equally applied to any other cloud-native stream processors available in the market.
We take a real-world example of stream processing related to detecting malicious attacks which tries to make unauthorized login attempts to a web server creating a denial of service situation (DoS attack). Once such a DoS attack has been detected the system sends alerts to the system administrators so that necessary security measures can be taken.
Detecting Malicious Attacks using Web Server Logs
The operational status of a web server can be monitored by using the HTTP log events captured in the web server’s log. The appearance of HTTP status codes such as 401 (unauthorized) or 403 (forbidden) in a continuous manner shows that there are malicious login attempts being made to the web server. The 401 response may show that authorization has been refused for the credentials supplied by the external party which tries to access the web server. The status code 403 indicates that the server is refusing to fulfill the request although the server understood the request.
Figure 1: Detecting Malicious Attacks to a Web Server.
There are different approaches to handle this scenario. However, in this use case, the information security expert prefers to receive the information in real-time as alerts. An alert needs to be thrown if such malicious request count exceeds 30 times within three seconds time period with an access ratio (defined as (unauthorized count + forbidden count)/total access count) being 1.0. We developed the alert generation application as shown in Figure 1, using a stream processor which receives and processes log events from the web server. With the system scalability in mind, the system has been deployed in a Kubernetes cluster running on Google Compute Engine (GCE). Listing 1 shows streaming SQL code written in Siddhi query language. We call such an application as the Siddhi application.
Listing 1: Malicious attacks detection application developed with Siddhi streaming SQL
@App:name("MaliciousAttacksDetection")
@App:description("HTTP Log Processor for detecting malicious DoS attacks")
@source(type = 'kafka', partition.no.list='0', threading.option='single.thread', group.id="group", bootstrap.servers='kafka-service:9092', topic.list = 'attackDetectionTopic',
@map(type = 'json'))
define stream inputStream ( iij_timestamp long, ip string, timestamp long, zone float, cik double, accession string, doc string, code float, size double, idx float, norefer float, noagent float, find float, crawler float, groupID int, browser string);
--Output of query 1: I want to know the IP of the malicious hosts which try to make unauthorized login attempts in short intervals.
@sink(type='log')
define stream outputStreamDoSAlert(ip string ,groupID int);
--The Actual latency of parallel siddhi apps are getting started to measure at this point
@info(name = "Query1")
@dist(execGroup='group11' ,parallel ='1')
from inputStream
select iij_timestamp, ip, timestamp, zone, cik, accession, doc, code, size, idx, norefer, noagent, find, crawler, groupID, browser, convert(time:timestampInMilliseconds(),'long') as injected_iijtimestamp
insert into interimInputStream;
--Query 2: Here all the accesses are either 401 or 403 and they have been done at least 30 times within 3 seconds time period.
@info(name = "Query2")
@dist(execGroup='group3', parallel ='12')
partition with (groupID of interimInputStream)
begin
from interimInputStream#window.timeBatch(timestamp, 3 sec)
select ip, count() as totalAccessCount, sum(ifThenElse(code == 401F, 1, 0)) as unauthorizedCount, sum(ifThenElse(code == 403F, 1, 0)) as forbiddenCount,injected_iijtimestamp as iijtimestamp,groupID,timestamp
insert into #interimStream3;
from #interimStream3#throughput:throughput(iijtimestamp,"throughput",3,6,"outputStreamDoSAlert",30)
select ip, totalAccessCount, (unauthorizedCount + forbiddenCount)/totalAccessCount as accessPercentage ,groupID
insert into #interimStream5;
from #interimStream5 [totalAccessCount > 30L and accessPercentage == 1.0]
select ip ,groupID
insert into outputStreamDoSAlert;
end;
We deploy this application in a distributed stream processor as shown in the deployment architecture diagram of Figure 2. Here, every component, such as Worker-1, Worker-2, …, etc. are deployed as single Container and as single Kubernetes pod. Each of the container categories and the tasks performed by them are listed in Table I. Figure 2 shows a scenario where the entire system getting deployed in six Kubernetes nodes.
Table 1: The list of tasks performed by different types of containers in the Kubernetes environment
Container Category |
Task Performed by the Container Category |
Manager |
The two managers in the manager cluster are two SP instances configured to run in high availability mode. |
Worker |
Workers are the containers which do the actual processing of data streams. Each Worker is an SP instance configured to run using Worker profile. |
Zookeeper |
We used three Zookeeper nodes for implementing the high availability functionality. |
Producer |
In these experiments, Producer container is the one which generates varied workloads. In the current system architecture, the Producer container reads the log files and generates events which are sent to the Kafka broker which collects the input events to the system. |
SP-RDBMS |
We use a separate relational database server to collect the performance information of the stream processor applications that run on the system. Performance numbers are collected at three levels: node-level, container-level, and Siddhi application level. |
We used a separate Kafka broker which coordinates the communication between the components in the cluster. |
|
NFS |
We used NFS (Network File System) for storing and accessing the configuration details of the distributed stream processor. |
We deploy this application to run on the Kubernetes environment of the Google Compute Engine. For a specific workload P, the system should operate and provide a specific quality of service (QoS) value Q. We measure the value of Q from the latency numbers observed from the workers (latency is the time difference between the time the event enters the worker and the time the event exits the worker). The following list of bullet points describes how each component gets deployed in the cluster.
- Node 1 hosts the web server
- Node 2 hosts the producer component which is the workload producer for the scenario. It reads the web server logs and publishes them to the Kafka instance running on Node 4.
- Nodes 2 and 4 run the two managers of stream processor.
- Node 3 hosts the NFS which is spawned automatically by the gcloud.
- Nodes 5 and 6 run the workers and they handle the real workload. They read data from Kafka instance, process them applying the stream processing operations, and write the results back to the Kafka instance.
Figure 2: Deployment Architecture of the Kubernetes Cluster.
However, as time passes, the workload on the web server also gets increased. This is a typical characteristic found with the web servers of most of the business organizations. It may grow the workload for example from P to 2P, 4P, …, 16P, etc. In such scenario the organization which operates the web server monitoring system faces issues of maintaining the quality of service attribute at Q’ in such a way it is almost equivalent to Q. Here, Q’ is the observed value of QoS attribute. In such situations, the stream processor should be able to scale enough to maintain the expected level of QoS. Note that Scalability is the ability of a system to handle increasing amounts of workload. Scalability is also known as the system's ability to expand to support the increase of the workload. In this article, our specific focus is on Load Scalability which is the ability of a system to run gracefully as the offered traffic increases.
There are two approaches to doing load scalability: strong scaling and weak scaling. The Strong scaling maintains the same problem size to solve but increases the number of processors. The Weak scaling increases the number of processors maintaining a fixed problem size per processor. In this article, we conduct weak scaling because we face the phenomenon of workload increase.
Experiments
We used a Kubernetes cluster provisioned on Google Compute Engine (GCE). Furthermore, we used experiment nodes with 2 CPU and 7.5GB of memory (n1-standard-2 machine type) to host the pods. Each pod is having one container and in that container stream processor (SP) components are deployed. We used JDK 1.8, Kafka 2.0.1, WSO2-SP 4.3.0, and MYSQL 5.7.4 to build the docker images. They were all deployed as containerized applications in the cluster. A Cluster is maintained as each pod is having only one SP component. We conducted each experiment for 40 minutes including a warm-up period of 10 minutes. Note that we use the notation x-y-z to denote (number of nodes)-(number of workers)-(number of instances).
We used EDGAR Log File Data Set as the representative real-world data set for the experiments since it already comprises a web server log data set in the form of CSV (Comma-Separated Values). The dataset we used was one of the log files log20170325.csv from EDGAR Log File datasets. The CSV file had 22,146,382 events with a total file size of approx. 2.4GB which had average message size of 144 bytes. Each record had 15 fields. Listing 2 shows the first two records from the EDGAR data set.
Listing 2: First two records from the EDGAR Log File data set
ip,date,time,zone,cik,accession,extention,code,size,idx,norefer,noagent,find,crawler,browser
100.14.44.eca,2017-03-25,00:00:00,0.0,1031093.0,0001137171-10-000013,-index.htm,200.0,7926.0,1.0,0.0,0.0,10.0,0.0,
We measured performance at three levels such as node level, container level, and stream processor level. However, the conclusions made in this article are based on stream processor level-latency and throughput numbers. We measured these values at the group3 Siddhi execution group which corresponds to the partial Siddhi application which got deployed in each of the SP Workers. The six different experiments which we conducted using the Kubernetes cluster provided us with the list of results shown in Table 2.
In Table 2 the ID corresponds to the unique experiment identifier. No of nodes correspond to the total number of Kubernetes Workers. Instance count is the number of Siddhi execution group instances used for the experiment. Producer Data Rate (no of threads) corresponds to the number of producers which simultaneously generate streaming events by reading the HTTP log events from EDGAR data files.
Results
The two nodes, 6 workers, 6 instance case (2-6-6) resulted in 390ms average latency for a single workload producer P. This is the base case which resembled the real-world deployment of this use case. With the increasing workload, the latency increases significantly while the throughput reduces. We take the special case of workload being increased to a significant amount by increasing the workload generator threads to 16. Scenario ID 2 shows this situation. When we increase the generator threads by 16 times, the latency gets increased by 28.2% while throughput gets reduced by 29%. As shown in scenario 3 reducing the number of unique group IDs in the input data items makes things worse (especially in terms of throughput). This is because reducing the number of unique group Ids results in less amount of parallelism to be exploited by the application.
If we double the number of workers per node as shown in scenario ID 4, we reduce the amount of memory per worker by half. This reduces the throughput by two-thirds from case 1. The latency increases by 9 times. Therefore, we observe that the amount of memory per worker plays important role in determining the latency of the application. Sometimes if the sufficient amount of memory has not been provided the workers may not even get deployed.
In the 5th scenario, we increase the number of Kubernetes worker nodes to three with 12 workers, which eliminates the performance bottleneck. This adds another node (Node 7) to Figure 2 with each node having four stream processor workers. Each worker requests 1 GB memory and each node can sustain 4-5 workers with that amount of memory. Hence, even if we have a system setup 3-6-6 (i.e., three nodes, six workers, and six instances) it will operate with 2-6-6’s performance. However, the performance characteristics of 2-12-12 and 3-12-12 setups won’t be the same.
Adding more hardware resources than required helps neither. We can observe this from the experiment scenario 6. Although we have doubled the number of nodes compared to scenario 5, we get a similar average latency to scenario 5. The approach followed by scenario 6 might be useful if the workers get more partial Siddhi apps added. This requires additional memory where three nodes cannot provide.
Table 2: Performance Numbers for Different Kubernetes Cluster Settings
ID |
No. of Nodes |
No. of Workers |
No. of Instances |
Producer Data rate (No of Threads) |
Worker memory Allocated (GB) |
Throughput (Per instance) |
Entire Average Latency (ms) |
Unique Group IDs |
1 |
2 |
6 |
6 |
P |
1 |
900 |
390 |
12 |
2 |
2 |
6 |
6 |
16P |
1 |
640 |
500 |
12 |
3 |
2 |
6 |
6 |
16P |
1 |
310 |
420 |
6 |
4 |
2 |
12 |
12 |
16P |
0.5 |
300 |
3500 |
12 |
5 |
3 |
12 |
12 |
16P |
1 |
660 |
300 |
12 |
6 |
6 |
12 |
12 |
16P |
1 |
640 |
310 |
12 |
As we are using partitions in the Siddhi application, the Siddhi application gets divided into partial Siddhi applications which will get the workload according to the distribution of unique values of the attribute we are using for the partition. The rightmost column of Table 2 shows the number of unique attribute values used for the groupID field. In our Siddhi application, we are using groupID as the partition attribute. Therefore, using 6 unique values means only 6 partial Siddhi applications can get the workload according to the hash-based mapping of Siddhi. Therefore, we are increasing the parallelism by increasing the number of unique group IDs to 12. This means the distribution of stream will be directed to 12 partial Siddhi applications. Because of these reasons, the parallelism is getting increased and we are getting better latencies in scenario 1, than scenario 3.
Summary
Scalability is a significant challenge faced by stream processors in container environments because of the application’s workload is increased over time. This article presented our experience of scaling such a distributed stream processor in a Kubernetes environment. To do this, the stream processor should provide a programming language/query constructs to maintain the optimal level of parallelism irrespective of the initial scale of the application. Running stream processor workers with multiple partitions enables scaling of the stream processor. With the increasing workloads, a sufficient amount of hardware resources need to be provisioned so that the system could maintain the level of QoS agreed upon by the operators. Adding more resources incurs additional cost. However, adding more resources does not guarantee performance improvements. Instead, the stream processor should be able to identify the level of resource requirements and scale up to the level where optimal performance vs cost ratio can be maintained.
About the Authors
Sarangan Janakan is an Intern Software Engineer at WSO2. He is currently a third year undergraduate at the Department of Computer Science and Engineering, University of Moratuwa, Sri Lanka. His research interests include data stream processing and cloud computing.
Miyuru Dayarathna is a Senior Technical Lead at WSO2. He is a computer scientist with multiple research interests and contributions in stream computing, graph data management and mining, cloud computing, performance engineering, information security, etc. He is also a consultant at the Department of Computer Science and Engineering, University of Moratuwa, Sri Lanka. His recent research focus at WSO2 has been on Stream Processor and Identity Server. He has published technical papers in reputed international journals and conferences.