How can I protect my Apache Kafka cluster from traffic spikes based on specific scenarios without setting quotas on the cluster?
How can I validate requests adhere to a JSON Schema?
How can I make sure parameters are included in the URI, query string, and headers?
How can Amazon MSK ingest messages lightweight clients without using an agent or the native Apache Kafka protocol?
These tasks are achievable using custom proxy servers or gateways, but these options can be difficult to implement and manage. On the other hand, API Gateway has these features and is a fully managed AWS service.
In this blog post we will show you how Amazon API Gateway can answer these questions as a component between your Amazon MSK cluster and your clients.
Amazon MSK is a fully managed service for Apache Kafka that makes it easy to provision Kafka clusters with just a few clicks without the need to provision servers, manage storage, or configure Apache Zookeeper manually. Apache Kafka is an open-source platform for building real-time streaming data pipelines and applications.
Some use cases include ingesting messages from lightweight IoT devices that don’t have support for native Kafka protocol and orchestrating your streaming services with other backend services including third-party APIs.
This pattern also comes with the following trade-offs:
Cost and complexity due to another service to run and maintain.
Performance overhead because it adds extra processing to construct and make HTTP requests. Additionally, REST Proxy needs to parse requests, transform data between formats both for produce, and consume requests.
When you implement this architecture in a production environment, you should consider these points with your business use case and SLA needs.
Solution overview
To implement the solution, complete the following steps:
Create a Kafka topic and configure the REST Proxy on a Kafka client machine
Create an API with REST Proxy integration via API Gateway
Test the end-to-end processes by producing and consuming messages to Amazon MSK
The following diagram illustrates the solution architecture.
Within this architecture, you create an MSK cluster and set up an Amazon EC2 instance with the REST Proxy and Kafka client. You then expose the REST Proxy through Amazon API Gateway and also test the solution by producing messages to Amazon MSK using Postman.
For the production implementation, make sure to set up the REST Proxy behind load balancer with an Auto Scaling group.
Prerequisites
Before you get started, you must have the following prerequisites:
An AWS account that provides access to AWS services
An IAM user with an access key and secret access key to configure the AWS CLI
An Amazon EC2 keypair
Creating an MSK cluster, Kafka client, and REST Proxy
AWS CloudFormation provisions all the required resources, including VPC, subnets, security groups, Amazon MSK cluster, Kafka client, and Kafka REST Proxy. To create these resources, complete the following steps:
Launch in the us-east-1 or us-west-2It takes approximately 15 to 20 minutes to complete.
From the AWS CloudFormation console, choose AmzonMSKAPIBlog.
Under Outputs, get the MSKClusterARN, KafkaClientEC2InstancePublicDNS, and MSKSecurityGroupID details.
Get the ZooKeeperConnectionString and other information about your cluster by entering the following code (provide your Region, cluster ARN, and AWS named profile):
If the command is successful, you see the following message: Created topic amazonmskapigwblog.
To connect the Kafka REST server to the Amazon MSK cluster, modify kafka-rest.properties in the directory (/home/ec2-user/confluent-5.3.1/etc/kafka-rest/) to point to your Amazon MSK’s ZookeeperConnectString and BootstrapserversConnectString information. See the following code:
sudo vi /home/ec2-user/confluent-5.3.1/etc/kafka-rest/kafka-rest.properties
zookeeper.connect=<Replace_With_Your_ZookeeperConnectString>
bootstrap.servers=<Replace_With_Your_BootstrapserversConnectString>
As an additional, optional step, you can create an SSL for securing communication between REST clients and the REST Proxy (HTTPS). If SSL is not required, you can skip steps 5 and 6.
You have now created a Kafka topic and configured Kafka REST Proxy to connect to your Amazon MSK cluster.
Creating an API with Kafka REST Proxy integration
To create an API with Kafka REST Proxy integration via API Gateway, complete the following steps:
On the API Gateway console, choose Create API.
For API type, choose REST API.
Choose Build.
Choose New API.
For API Name, enter a name (for example, amazonmsk-restapi).
As an optional step, for Description, enter a brief description.
Choose Create API.The next step is to create a child resource.
Under Resources, choose a parent resource item.
Under Actions, choose Create Resource.The New Child Resource pane opens.
Select Configure as proxy resource.
For Resource Name, enter proxy.
For Resource Path, enter /{proxy+}.
Select Enable API Gateway CORS.
Choose Create Resource.After you create the resource, the Create Method window opens.
For Integration type, select HTTP Proxy.
For Endpoint URL, enter an HTTP backend resource URL (your Kafka Clien Amazont EC2 instance PublicDNS; for example, http://KafkaClientEC2InstancePublicDNS:8082/{proxy} or https://KafkaClientEC2InstancePublicDNS:8085/{proxy}).
Use the default settings for the remaining fields.
Choose Save.
For SSL, for Endpoint URL, use the HTTPS endpoint.In the API you just created, the API’s proxy resource path of {proxy+} becomes the placeholder of any of the backend endpoints under http://YourKafkaClientPublicIP:8082/.
Choose the API you just created.
Under Actions, choose Deploy API.
For Deployment stage, choose New Stage.
For Stage name, enter the stage name (for example, dev, test, or prod).
Choose Deploy.
Record the Invoke URL after you have deployed the API.
Your external Kafka REST Proxy, which was exposed through API Gateway, now looks like https://YourAPIGWInvoleURL/dev/topics/amazonmskapigwblog. You use this URL in the next step.
Testing the end-to-end processes
To test the end-to-end processes by producing and consuming messages to Amazon MSK. Complete the following steps:
SSH into the Kafka Client Amazon EC2 instance. See the following code:
The following screen shot shows messages coming to the Kafka consumer from the API Gateway Kafka REST endpoint.
Conclusion
This post demonstrated how easy it is to set up REST API endpoints for Amazon MSK with API Gateway. This solution can help you produce and consume messages to Amazon MSK from any IoT device or programming language without depending on native Kafka protocol or clients.
If you have questions or suggestions, please leave your thoughts in the comments.
About the Author
Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.
Francisco Oliveira is a senior big data solutions architect with AWS. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.
AWS Glue is an increasingly popular way to develop serverless ETL (extract, transform, and load) applications for big data and data lake workloads. Organizations that transform their ETL applications to cloud-based, serverless ETL architectures need a seamless, end-to-end continuous integration and continuous delivery (CI/CD) pipeline: from source code, to build, to deployment, to product delivery. Having a good CI/CD pipeline can help your organization discover bugs before they reach production and deliver updates more frequently. It can also help developers write quality code and automate the ETL job release management process, mitigate risk, and more.
AWS Glue is a fully managed data catalog and ETL service. It simplifies and automates the difficult and time-consuming tasks of data discovery, conversion, and job scheduling. AWS Glue crawls your data sources and constructs a data catalog using pre-built classifiers for popular data formats and data types, including CSV, Apache Parquet, JSON, and more.
When you are developing ETL applications using AWS Glue, you might come across some of the following CI/CD challenges:
Iterative development with unit tests
Continuous integration and build
Pushing the ETL pipeline to a test environment
Pushing the ETL pipeline to a production environment
Testing ETL applications using real data (live test)
The following diagram shows the pipeline workflow:
This solution uses AWS CodePipeline, which lets you orchestrate and automate the test and deploy stages for ETL application source code. The solution consists of a pipeline that contains the following stages:
1.) Source Control: In this stage, the AWS Glue ETL job source code and the AWS CloudFormation template file for deploying the ETL jobs are both committed to version control. I chose to use AWS CodeCommit for version control.
2.) LiveTest: In this stage, all resources—including AWS Glue crawlers, jobs, S3 buckets, roles, and other resources that are required for the solution—are provisioned, deployed, live tested, and cleaned up.
The LiveTest stage includes the following actions:
Deploy: In this action, all the resources that are required for this solution (crawlers, jobs, buckets, roles, and so on) are provisioned and deployed using an AWS CloudFormation template.
AutomatedLiveTest: In this action, all the AWS Glue crawlers and jobs are executed and data exploration and validation tests are performed. These validation tests include, but are not limited to, record counts in both raw tables and transformed tables in the data lake and any other business validations. I used AWS CodeBuild for this action.
LiveTestApproval: This action is included for the cases in which a pipeline administrator approval is required to deploy/promote the ETL applications to the next stage. The pipeline pauses in this action until an administrator manually approves the release.
LiveTestCleanup: In this action, all the LiveTest stage resources, including test crawlers, jobs, roles, and so on, are deleted using the AWS CloudFormation template. This action helps minimize cost by ensuring that the test resources exist only for the duration of the AutomatedLiveTest and LiveTestApproval
3.) DeployToProduction: In this stage, all the resources are deployed using the AWS CloudFormation template to the production environment.
Try it out
This code pipeline takes approximately 20 minutes to complete the LiveTest test stage (up to the LiveTest approval stage, in which manual approval is required).
To get started with this solution, choose Launch Stack:
This creates the CI/CD pipeline with all of its stages, as described earlier. It performs an initial commit of the sample AWS Glue ETL job source code to trigger the first release change.
In the AWS CloudFormation console, choose Create. After the template finishes creating resources, you see the pipeline name on the stack Outputs tab.
After that, open the CodePipeline console and select the newly created pipeline. Initially, your pipeline’s CodeCommit stage shows that the source action failed.
Allow a few minutes for your new pipeline to detect the initial commit applied by the CloudFormation stack creation. As soon as the commit is detected, your pipeline starts. You will see the successful stage completion status as soon as the CodeCommit source stage runs.
In the CodeCommit console, choose Code in the navigation pane to view the solution files.
Next, you can watch how the pipeline goes through the LiveTest stage of the deploy and AutomatedLiveTest actions, until it finally reaches the LiveTestApproval action.
At this point, if you check the AWS CloudFormation console, you can see that a new template has been deployed as part of the LiveTest deploy action.
At this point, make sure that the AWS Glue crawlers and the AWS Glue job ran successfully. Also check whether the corresponding databases and external tables have been created in the AWS Glue Data Catalog. Then verify that the data is validated using Amazon Athena, as shown following.
Open the AWS Glue console, and choose Databases in the navigation pane. You will see the following databases in the Data Catalog:
Open the Amazon Athena console, and run the following queries. Verify that the record counts are matching.
SELECT count(*) FROM "nycitytaxi_gluedemocicdtest"."data";
SELECT count(*) FROM "nytaxiparquet_gluedemocicdtest"."datalake";
The following shows the raw data:
The following shows the transformed data:
The pipeline pauses the action until the release is approved. After validating the data, manually approve the revision on the LiveTestApproval action on the CodePipeline console.
Add comments as needed, and choose Approve.
The LiveTestApproval stage now appears as Approved on the console.
After the revision is approved, the pipeline proceeds to use the AWS CloudFormation template to destroy the resources that were deployed in the LiveTest deploy action. This helps reduce cost and ensures a clean test environment on every deployment.
Production deployment is the final stage. In this stage, all the resources—AWS Glue crawlers, AWS Glue jobs, Amazon S3 buckets, roles, and so on—are provisioned and deployed to the production environment using the AWS CloudFormation template.
After successfully running the whole pipeline, feel free to experiment with it by changing the source code stored on AWS CodeCommit. For example, if you modify the AWS Glue ETL job to generate an error, it should make the AutomatedLiveTest action fail. Or if you change the AWS CloudFormation template to make its creation fail, it should affect the LiveTest deploy action. The objective of the pipeline is to guarantee that all changes that are deployed to production are guaranteed to work as expected.
Conclusion
In this post, you learned how easy it is to implement CI/CD for serverless AWS Glue ETL solutions with AWS developer tools like AWS CodePipeline and AWS CodeBuild at scale. Implementing such solutions can help you accelerate ETL development and testing at your organization.
If you have questions or suggestions, please comment below.
Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.
Luis Caro is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.
This post was written in partnership with Intuit to share learnings, best practices, and recommendations for running an Apache Kafka cluster on AWS. Thanks to Vaishak Suresh and his colleagues at Intuit for their contribution and support.
Intuit, in their own words: Intuit, a leading enterprise customer for AWS, is a creator of business and financial management solutions. For more information on how Intuit partners with AWS, see our previous blog post, Real-time Stream Processing Using Apache Spark Streaming and Apache Kafka on AWS. Apache Kafka is an open-source, distributed streaming platform that enables you to build real-time streaming applications.
The best practices described in this post are based on our experience in running and operating large-scale Kafka clusters on AWS for more than two years. Our intent for this post is to help AWS customers who are currently running Kafka on AWS, and also customers who are considering migrating on-premises Kafka deployments to AWS.
Running your Kafka deployment on Amazon EC2 provides a high performance, scalable solution for ingesting streaming data. AWS offers many different instance types and storage option combinations for Kafka deployments. However, given the number of possible deployment topologies, it’s not always trivial to select the most appropriate strategy suitable for your use case.
In this blog post, we cover the following aspects of running Kafka clusters on AWS:
Deployment considerations and patterns
Storage options
Instance types
Networking
Upgrades
Performance tuning
Monitoring
Security
Backup and restore
Note: While implementing Kafka clusters in a production environment, make sure also to consider factors like your number of messages, message size, monitoring, failure handling, and any operational issues.
Deployment considerations and patterns
In this section, we discuss various deployment options available for Kafka on AWS, along with pros and cons of each option. A successful deployment starts with thoughtful consideration of these options. Considering availability, consistency, and operational overhead of the deployment helps when choosing the right option.
Single AWS Region, Three Availability Zones, All Active
One typical deployment pattern (all active) is in a single AWS Region with three Availability Zones (AZs). One Kafka cluster is deployed in each AZ along with Apache ZooKeeper and Kafka producer and consumer instances as shown in the illustration following.
In this pattern, this is the Kafka cluster deployment:
Kafka producers and Kafka cluster are deployed on each AZ.
Data is distributed evenly across three Kafka clusters by using Elastic Load Balancer.
Kafka consumers aggregate data from all three Kafka clusters.
Kafka cluster failover occurs this way:
Mark down all Kafka producers
Stop consumers
Debug and restack Kafka
Restart consumers
Restart Kafka producers
Following are the pros and cons of this pattern.
Pros
Cons
Highly available
Can sustain the failure of two AZs
No message loss during failover
Simple deployment
Very high operational overhead:
All changes need to be deployed three times, one for each Kafka cluster
Maintaining and monitoring three Kafka clusters
Maintaining and monitoring three consumer clusters
A restart is required for patching and upgrading brokers in a Kafka cluster. In this approach, a rolling upgrade is done separately for each cluster.
Single Region, Three Availability Zones, Active-Standby
Another typical deployment pattern (active-standby) is in a single AWS Region with a single Kafka cluster and Kafka brokers and Zookeepers distributed across three AZs. Another similar Kafka cluster acts as a standby as shown in the illustration following. You can use Kafka mirroring with MirrorMaker to replicate messages between any two clusters.
In this pattern, this is the Kafka cluster deployment:
Kafka producers are deployed on all three AZs.
Only one Kafka cluster is deployed across three AZs (active).
ZooKeeper instances are deployed on each AZ.
Brokers are spread evenly across all three AZs.
Kafka consumers can be deployed across all three AZs.
Standby Kafka producers and a Multi-AZ Kafka cluster are part of the deployment.
Kafka cluster failover occurs this way:
Switch traffic to standby Kafka producers cluster and Kafka cluster.
Restart consumers to consume from standby Kafka cluster.
Following are the pros and cons of this pattern.
Pros
Cons
Less operational overhead when compared to the first option
Only one Kafka cluster to manage and consume data from
Can handle single AZ failures without activating a standby Kafka cluster
Added latency due to cross-AZ data transfer among Kafka brokers
For Kafka versions before 0.10, replicas for topic partitions have to be assigned so they’re distributed to the brokers on different AZs (rack-awareness)
The cluster can become unavailable in case of a network glitch, where ZooKeeper does not see Kafka brokers
Possibility of in-transit message loss during failover
Intuit recommends using a single Kafka cluster in one AWS Region, with brokers distributing across three AZs (single region, three AZs). This approach offers stronger fault tolerance than otherwise, because a failed AZ won’t cause Kafka downtime.
Storage options
There are two storage options for file storage in Amazon EC2:
Ephemeral storage is local to the Amazon EC2 instance. It can provide high IOPS based on the instance type. On the other hand, Amazon EBS volumes offer higher resiliency and you can configure IOPS based on your storage needs. EBS volumes also offer some distinct advantages in terms of recovery time. Your choice of storage is closely related to the type of workload supported by your Kafka cluster.
Kafka provides built-in fault tolerance by replicating data partitions across a configurable number of instances. If a broker fails, you can recover it by fetching all the data from other brokers in the cluster that host the other replicas. Depending on the size of the data transfer, it can affect recovery process and network traffic. These in turn eventually affect the cluster’s performance.
The following table contrasts the benefits of using an instance store versus using EBS for storage.
Instance store
EBS
Instance storage is recommended for large- and medium-sized Kafka clusters. For a large cluster, read/write traffic is distributed across a high number of brokers, so the loss of a broker has less of an impact. However, for smaller clusters, a quick recovery for the failed node is important, but a failed broker takes longer and requires more network traffic for a smaller Kafka cluster.
Storage-optimized instances like h1, i3, and d2 are an ideal choice for distributed applications like Kafka.
The primary advantage of using EBS in a Kafka deployment is that it significantly reduces data-transfer traffic when a broker fails or must be replaced. The replacement broker joins the cluster much faster.
Data stored on EBS is persisted in case of an instance failure or termination. The broker’s data stored on an EBS volume remains intact, and you can mount the EBS volume to a new EC2 instance. Most of the replicated data for the replacement broker is already available in the EBS volume and need not be copied over the network from another broker. Only the changes made after the original broker failure need to be transferred across the network. That makes this process much faster.
Intuit chose EBS because of their frequent instance restacking requirements and also other benefits provided by EBS.
Generally, Kafka deployments use a replication factor of three. EBS offers replication within their service, so Intuit chose a replication factor of two instead of three.
Instance types
The choice of instance types is generally driven by the type of storage required for your streaming applications on a Kafka cluster. If your application requires ephemeral storage, h1, i3, and d2 instances are your best option.
Intuit used r3.xlarge instances for their brokers and r3.large for ZooKeeper, with ST1 (throughput optimized HDD) EBS for their Kafka cluster.
Here are sample benchmark numbers from Intuit tests.
Configuration
Broker bytes (MB/s)
r3.xlarge
ST1 EBS
12 brokers
12 partitions
Aggregate 346.9
If you need EBS storage, then AWS has a newer-generation r4 instance. The r4 instance is superior to R3 in many ways:
It has a faster processor (Broadwell).
EBS is optimized by default.
It features networking based on Elastic Network Adapter (ENA), with up to 10 Gbps on smaller sizes.
The network plays a very important role in a distributed system like Kafka. A fast and reliable network ensures that nodes can communicate with each other easily. The available network throughput controls the maximum amount of traffic that Kafka can handle. Network throughput, combined with disk storage, is often the governing factor for cluster sizing.
If you expect your cluster to receive high read/write traffic, select an instance type that offers 10-Gb/s performance.
In addition, choose an option that keeps interbroker network traffic on the private subnet, because this approach allows clients to connect to the brokers. Communication between brokers and clients uses the same network interface and port. For more details, see the documentation about IP addressing for EC2 instances.
If you are deploying in more than one AWS Region, you can connect the two VPCs in the two AWS Regions using cross-region VPC peering. However, be aware of the networking costs associated with cross-AZ deployments.
Upgrades
Kafka has a history of not being backward compatible, but its support of backward compatibility is getting better. During a Kafka upgrade, you should keep your producer and consumer clients on a version equal to or lower than the version you are upgrading from. After the upgrade is finished, you can start using a new protocol version and any new features it supports. There are three upgrade approaches available, discussed following.
Rolling or in-place upgrade
In a rolling or in-place upgrade scenario, upgrade one Kafka broker at a time. Take into consideration the recommendations for doing rolling restarts to avoid downtime for end users.
Downtime upgrade
If you can afford the downtime, you can take your entire cluster down, upgrade each Kafka broker, and then restart the cluster.
Blue/green upgrade
Intuit followed the blue/green deployment model for their workloads, as described following.
If you can afford to create a separate Kafka cluster and upgrade it, we highly recommend the blue/green upgrade scenario. In this scenario, we recommend that you keep your clusters up-to-date with the latest Kafka version. For additional details on Kafka version upgrades or more details, see the Kafka upgrade documentation.
The following illustration shows a blue/green upgrade.
In this scenario, the upgrade plan works like this:
Create a new Kafka cluster on AWS.
Create a new Kafka producers stack to point to the new Kafka cluster.
Create topics on the new Kafka cluster.
Test the green deployment end to end (sanity check).
Using Amazon Route 53, change the new Kafka producers stack on AWS to point to the new green Kafka environment that you have created.
The roll-back plan works like this:
Switch Amazon Route 53 to the old Kafka producers stack on AWS to point to the old Kafka environment.
You can tune Kafka performance in multiple dimensions. Following are some best practices for performance tuning.
These are some general performance tuning techniques:
If throughput is less than network capacity, try the following:
Add more threads
Increase batch size
Add more producer instances
Add more partitions
To improve latency when acks =-1, increase your num.replica.fetches value.
For cross-AZ data transfer, tune your buffer settings for sockets and for OS TCP.
Make sure that num.io.threads is greater than the number of disks dedicated for Kafka.
Adjust num.network.threads based on the number of producers plus the number of consumers plus the replication factor.
Your message size affects your network bandwidth. To get higher performance from a Kafka cluster, select an instance type that offers 10 Gb/s performance.
For Java and JVM tuning, try the following:
Minimize GC pauses by using the Oracle JDK, which uses the new G1 garbage-first collector.
Try to keep the Kafka heap size below 4 GB.
Monitoring
Knowing whether a Kafka cluster is working correctly in a production environment is critical. Sometimes, just knowing that the cluster is up is enough, but Kafka applications have many moving parts to monitor. In fact, it can easily become confusing to understand what’s important to watch and what you can set aside. Items to monitor range from simple metrics about the overall rate of traffic, to producers, consumers, brokers, controller, ZooKeeper, topics, partitions, messages, and so on.
For monitoring, Intuit used several tools, including Newrelec, Wavefront, Amazon CloudWatch, and AWS CloudTrail. Our recommended monitoring approach follows.
For system metrics, we recommend that you monitor:
CPU load
Network metrics
File handle usage
Disk space
Disk I/O performance
Garbage collection
ZooKeeper
For producers, we recommend that you monitor:
Batch-size-avg
Compression-rate-avg
Waiting-threads
Buffer-available-bytes
Record-queue-time-max
Record-send-rate
Records-per-request-avg
For consumers, we recommend that you monitor:
Batch-size-avg
Compression-rate-avg
Waiting-threads
Buffer-available-bytes
Record-queue-time-max
Record-send-rate
Records-per-request-avg
Security
Like most distributed systems, Kafka provides the mechanisms to transfer data with relatively high security across the components involved. Depending on your setup, security might involve different services such as encryption, Kerberos, Transport Layer Security (TLS) certificates, and advanced access control list (ACL) setup in brokers and ZooKeeper. The following tells you more about the Intuit approach. For details on Kafka security not covered in this section, see the Kafka documentation.
Encryption at rest
For EBS-backed EC2 instances, you can enable encryption at rest by using Amazon EBS volumes with encryption enabled. Amazon EBS uses AWS Key Management Service (AWS KMS) for encryption. For more details, see Amazon EBS Encryption in the EBS documentation. For instance store–backed EC2 instances, you can enable encryption at rest by using Amazon EC2 instance store encryption.
Encryption in transit
Kafka uses TLS for client and internode communications.
Authentication
Authentication of connections to brokers from clients (producers and consumers) to other brokers and tools uses either Secure Sockets Layer (SSL) or Simple Authentication and Security Layer (SASL).
Kafka supports Kerberos authentication. If you already have a Kerberos server, you can add Kafka to your current configuration.
Authorization
In Kafka, authorization is pluggable and integration with external authorization services is supported.
Backup and restore
The type of storage used in your deployment dictates your backup and restore strategy.
The best way to back up a Kafka cluster based on instance storage is to set up a second cluster and replicate messages using MirrorMaker. Kafka’s mirroring feature makes it possible to maintain a replica of an existing Kafka cluster. Depending on your setup and requirements, your backup cluster might be in the same AWS Region as your main cluster or in a different one.
For EBS-based deployments, you can enable automatic snapshots of EBS volumes to back up volumes. You can easily create new EBS volumes from these snapshots to restore. We recommend storing backup files in Amazon S3.
For more information on how to back up in Kafka, see the Kafka documentation.
Conclusion
In this post, we discussed several patterns for running Kafka in the AWS Cloud. AWS also provides an alternative managed solution with Amazon Kinesis Data Streams, there are no servers to manage or scaling cliffs to worry about, you can scale the size of your streaming pipeline in seconds without downtime, data replication across availability zones is automatic, you benefit from security out of the box, Kinesis Data Streams is tightly integrated with a wide variety of AWS services like Lambda, Redshift, Elasticsearch and it supports open source frameworks like Storm, Spark, Flink, and more. You may refer to kafka-kinesis connector.
If you have questions or suggestions, please comment below.
Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.
Apache Cassandra is a commonly used, high performance NoSQL database. AWS customers that currently maintain Cassandra on-premises may want to take advantage of the scalability, reliability, security, and economic benefits of running Cassandra on Amazon EC2.
Amazon EC2 and Amazon Elastic Block Store (Amazon EBS) provide secure, resizable compute capacity and storage in the AWS Cloud. When combined, you can deploy Cassandra, allowing you to scale capacity according to your requirements. Given the number of possible deployment topologies, it’s not always trivial to select the most appropriate strategy suitable for your use case.
In this post, we outline three Cassandra deployment options, as well as provide guidance about determining the best practices for your use case in the following areas:
Cassandra resource overview
Deployment considerations
Storage options
Networking
High availability and resiliency
Maintenance
Security
Before we jump into best practices for running Cassandra on AWS, we should mention that we have many customers who decided to use DynamoDB instead of managing their own Cassandra cluster. DynamoDB is fully managed, serverless, and provides multi-master cross-region replication, encryption at rest, and managed backup and restore. Integration with AWS Identity and Access Management (IAM) enables DynamoDB customers to implement fine-grained access control for their data security needs.
Several customers who have been using large Cassandra clusters for many years have moved to DynamoDB to eliminate the complications of administering Cassandra clusters and maintaining high availability and durability themselves. Gumgum.com is one customer who migrated to DynamoDB and observed significant savings. For more information, see Moving to Amazon DynamoDB from Hosted Cassandra: A Leap Towards 60% Cost Saving per Year.
AWS provides options, so you’re covered whether you want to run your own NoSQL Cassandra database, or move to a fully managed, serverless DynamoDB database.
Cassandra resource overview
Here’s a short introduction to standard Cassandra resources and how they are implemented with AWS infrastructure. If you’re already familiar with Cassandra or AWS deployments, this can serve as a refresher.
Resource
Cassandra
AWS
Cluster
A single Cassandra deployment.
This typically consists of multiple physical locations, keyspaces, and physical servers.
A logical deployment construct in AWS that maps to an AWS CloudFormation StackSet, which consists of one or many CloudFormation stacks to deploy Cassandra.
Datacenter
A group of nodes configured as a single replication group.
A logical deployment construct in AWS.
A datacenter is deployed with a single CloudFormation stack consisting of Amazon EC2 instances, networking, storage, and security resources.
Rack
A collection of servers.
A datacenter consists of at least one rack. Cassandra tries to place the replicas on different racks.
A single Availability Zone.
Server/node
A physical virtual machine running Cassandra software.
An EC2 instance.
Token
Conceptually, the data managed by a cluster is represented as a ring. The ring is then divided into ranges equal to the number of nodes. Each node being responsible for one or more ranges of the data. Each node gets assigned with a token, which is essentially a random number from the range. The token value determines the node’s position in the ring and its range of data.
Managed within Cassandra.
Virtual node (vnode)
Responsible for storing a range of data. Each vnode receives one token in the ring. A cluster (by default) consists of 256 tokens, which are uniformly distributed across all servers in the Cassandra datacenter.
Managed within Cassandra.
Replication factor
The total number of replicas across the cluster.
Managed within Cassandra.
Deployment considerations
One of the many benefits of deploying Cassandra on Amazon EC2 is that you can automate many deployment tasks. In addition, AWS includes services, such as CloudFormation, that allow you to describe and provision all your infrastructure resources in your cloud environment.
We recommend orchestrating each Cassandra ring with one CloudFormation template. If you are deploying in multiple AWS Regions, you can use a CloudFormation StackSet to manage those stacks. All the maintenance actions (scaling, upgrading, and backing up) should be scripted with an AWS SDK. These may live as standalone AWS Lambda functions that can be invoked on demand during maintenance.
You can get started by following the Cassandra Quick Start deployment guide. Keep in mind that this guide does not address the requirements to operate a production deployment and should be used only for learning more about Cassandra.
Deployment patterns
In this section, we discuss various deployment options available for Cassandra in Amazon EC2. A successful deployment starts with thoughtful consideration of these options. Consider the amount of data, network environment, throughput, and availability.
Single AWS Region, 3 Availability Zones
Active-active, multi-Region
Active-standby, multi-Region
Single region, 3 Availability Zones
In this pattern, you deploy the Cassandra cluster in one AWS Region and three Availability Zones. There is only one ring in the cluster. By using EC2 instances in three zones, you ensure that the replicas are distributed uniformly in all zones.
To ensure the even distribution of data across all Availability Zones, we recommend that you distribute the EC2 instances evenly in all three Availability Zones. The number of EC2 instances in the cluster is a multiple of three (the replication factor).
This pattern is suitable in situations where the application is deployed in one Region or where deployments in different Regions should be constrained to the same Region because of data privacy or other legal requirements.
Pros
Cons
● Highly available, can sustain failure of one Availability Zone.
● Simple deployment
● Does not protect in a situation when many of the resources in a Region are experiencing intermittent failure.
Active-active, multi-Region
In this pattern, you deploy two rings in two different Regions and link them. The VPCs in the two Regions are peered so that data can be replicated between two rings.
We recommend that the two rings in the two Regions be identical in nature, having the same number of nodes, instance types, and storage configuration.
This pattern is most suitable when the applications using the Cassandra cluster are deployed in more than one Region.
Pros
Cons
● No data loss during failover.
● Highly available, can sustain when many of the resources in a Region are experiencing intermittent failures.
● Read/write traffic can be localized to the closest Region for the user for lower latency and higher performance.
● High operational overhead
● The second Region effectively doubles the cost
Active-standby, multi-region
In this pattern, you deploy two rings in two different Regions and link them. The VPCs in the two Regions are peered so that data can be replicated between two rings.
However, the second Region does not receive traffic from the applications. It only functions as a secondary location for disaster recovery reasons. If the primary Region is not available, the second Region receives traffic.
We recommend that the two rings in the two Regions be identical in nature, having the same number of nodes, instance types, and storage configuration.
This pattern is most suitable when the applications using the Cassandra cluster require low recovery point objective (RPO) and recovery time objective (RTO).
Pros
Cons
● No data loss during failover.
● Highly available, can sustain failure or partitioning of one whole Region.
● High operational overhead.
● High latency for writes for eventual consistency.
● The second Region effectively doubles the cost.
Storage options
In on-premises deployments, Cassandra deployments use local disks to store data. There are two storage options for EC2 instances:
Your choice of storage is closely related to the type of workload supported by the Cassandra cluster. Instance store works best for most general purpose Cassandra deployments. However, in certain read-heavy clusters, Amazon EBS is a better choice.
The choice of instance type is generally driven by the type of storage:
If ephemeral storage is required for your application, a storage-optimized (I3) instance is the best option.
If your workload requires Amazon EBS, it is best to go with compute-optimized (C5) instances.
Burstable instance types (T2) don’t offer good performance for Cassandra deployments.
Instance store
Ephemeral storage is local to the EC2 instance. It may provide high input/output operations per second (IOPs) based on the instance type. An SSD-based instance store can support up to 3.3M IOPS in I3 instances. This high performance makes it an ideal choice for transactional or write-intensive applications such as Cassandra.
In general, instance storage is recommended for transactional, large, and medium-size Cassandra clusters. For a large cluster, read/write traffic is distributed across a higher number of nodes, so the loss of one node has less of an impact. However, for smaller clusters, a quick recovery for the failed node is important.
As an example, for a cluster with 100 nodes, the loss of 1 node is 3.33% loss (with a replication factor of 3). Similarly, for a cluster with 10 nodes, the loss of 1 node is 33% less capacity (with a replication factor of 3).
Ephemeral storage
Amazon EBS
Comments
IOPS
(translates to higher query performance)
Up to 3.3M on I3
80K/instance
10K/gp2/volume
32K/io1/volume
This results in a higher query performance on each host. However, Cassandra implicitly scales well in terms of horizontal scale. In general, we recommend scaling horizontally first. Then, scale vertically to mitigate specific issues.
Note: 3.3M IOPS is observed with 100% random read with a 4-KB block size on Amazon Linux.
AWS instance types
I3
Compute optimized, C5
Being able to choose between different instance types is an advantage in terms of CPU, memory, etc., for horizontal and vertical scaling.
Backup/ recovery
Custom
Basic building blocks are available from AWS.
Amazon EBS offers distinct advantage here. It is small engineering effort to establish a backup/restore strategy.
a) In case of an instance failure, the EBS volumes from the failing instance are attached to a new instance.
b) In case of an EBS volume failure, the data is restored by creating a new EBS volume from last snapshot.
Amazon EBS
EBS volumes offer higher resiliency, and IOPs can be configured based on your storage needs. EBS volumes also offer some distinct advantages in terms of recovery time. EBS volumes can support up to 32K IOPS per volume and up to 80K IOPS per instance in RAID configuration. They have an annualized failure rate (AFR) of 0.1–0.2%, which makes EBS volumes 20 times more reliable than typical commodity disk drives.
The primary advantage of using Amazon EBS in a Cassandra deployment is that it reduces data-transfer traffic significantly when a node fails or must be replaced. The replacement node joins the cluster much faster. However, Amazon EBS could be more expensive, depending on your data storage needs.
Cassandra has built-in fault tolerance by replicating data to partitions across a configurable number of nodes. It can not only withstand node failures but if a node fails, it can also recover by copying data from other replicas into a new node. Depending on your application, this could mean copying tens of gigabytes of data. This adds additional delay to the recovery process, increases network traffic, and could possibly impact the performance of the Cassandra cluster during recovery.
Data stored on Amazon EBS is persisted in case of an instance failure or termination. The node’s data stored on an EBS volume remains intact and the EBS volume can be mounted to a new EC2 instance. Most of the replicated data for the replacement node is already available in the EBS volume and won’t need to be copied over the network from another node. Only the changes made after the original node failed need to be transferred across the network. That makes this process much faster.
EBS volumes are snapshotted periodically. So, if a volume fails, a new volume can be created from the last known good snapshot and be attached to a new instance. This is faster than creating a new volume and coping all the data to it.
Most Cassandra deployments use a replication factor of three. However, Amazon EBS does its own replication under the covers for fault tolerance. In practice, EBS volumes are about 20 times more reliable than typical disk drives. So, it is possible to go with a replication factor of two. This not only saves cost, but also enables deployments in a region that has two Availability Zones.
EBS volumes are recommended in case of read-heavy, small clusters (fewer nodes) that require storage of a large amount of data. Keep in mind that the Amazon EBS provisioned IOPS could get expensive. General purpose EBS volumes work best when sized for required performance.
Networking
If your cluster is expected to receive high read/write traffic, select an instance type that offers 10–Gb/s performance. As an example, i3.8xlarge and c5.9xlarge both offer 10–Gb/s networking performance. A smaller instance type in the same family leads to a relatively lower networking throughput.
Cassandra generates a universal unique identifier (UUID) for each node based on IP address for the instance. This UUID is used for distributing vnodes on the ring.
In the case of an AWS deployment, IP addresses are assigned automatically to the instance when an EC2 instance is created. With the new IP address, the data distribution changes and the whole ring has to be rebalanced. This is not desirable.
To preserve the assigned IP address, use a secondary elastic network interface with a fixed IP address. Before swapping an EC2 instance with a new one, detach the secondary network interface from the old instance and attach it to the new one. This way, the UUID remains same and there is no change in the way that data is distributed in the cluster.
If you are deploying in more than one region, you can connect the two VPCs in two regions using cross-region VPC peering.
High availability and resiliency
Cassandra is designed to be fault-tolerant and highly available during multiple node failures. In the patterns described earlier in this post, you deploy Cassandra to three Availability Zones with a replication factor of three. Even though it limits the AWS Region choices to the Regions with three or more Availability Zones, it offers protection for the cases of one-zone failure and network partitioning within a single Region. The multi-Region deployments described earlier in this post protect when many of the resources in a Region are experiencing intermittent failure.
Resiliency is ensured through infrastructure automation. The deployment patterns all require a quick replacement of the failing nodes. In the case of a regionwide failure, when you deploy with the multi-Region option, traffic can be directed to the other active Region while the infrastructure is recovering in the failing Region. In the case of unforeseen data corruption, the standby cluster can be restored with point-in-time backups stored in Amazon S3.
Maintenance
In this section, we look at ways to ensure that your Cassandra cluster is healthy:
Scaling
Upgrades
Backup and restore
Scaling
Cassandra is horizontally scaled by adding more instances to the ring. We recommend doubling the number of nodes in a cluster to scale up in one scale operation. This leaves the data homogeneously distributed across Availability Zones. Similarly, when scaling down, it’s best to halve the number of instances to keep the data homogeneously distributed.
Cassandra is vertically scaled by increasing the compute power of each node. Larger instance types have proportionally bigger memory. Use deployment automation to swap instances for bigger instances without downtime or data loss.
Upgrades
All three types of upgrades (Cassandra, operating system patching, and instance type changes) follow the same rolling upgrade pattern.
In this process, you start with a new EC2 instance and install software and patches on it. Thereafter, remove one node from the ring. For more information, see Cassandra cluster Rolling upgrade. Then, you detach the secondary network interface from one of the EC2 instances in the ring and attach it to the new EC2 instance. Restart the Cassandra service and wait for it to sync. Repeat this process for all nodes in the cluster.
Backup and restore
Your backup and restore strategy is dependent on the type of storage used in the deployment. Cassandra supports snapshots and incremental backups. When using instance store, a file-based backup tool works best. Customers use rsync or other third-party products to copy data backups from the instance to long-term storage. For more information, see Backing up and restoring data in the DataStax documentation. This process has to be repeated for all instances in the cluster for a complete backup. These backup files are copied back to new instances to restore. We recommend using S3 to durably store backup files for long-term storage.
For Amazon EBS based deployments, you can enable automated snapshots of EBS volumes to back up volumes. New EBS volumes can be easily created from these snapshots for restoration.
Security
We recommend that you think about security in all aspects of deployment. The first step is to ensure that the data is encrypted at rest and in transit. The second step is to restrict access to unauthorized users. For more information about security, see the Cassandra documentation.
Encryption at rest
Encryption at rest can be achieved by using EBS volumes with encryption enabled. Amazon EBS uses AWS KMS for encryption. For more information, see Amazon EBS Encryption.
Instance store–based deployments require using an encrypted file system or an AWS partner solution. If you are using DataStax Enterprise, it supports transparent data encryption.
Encryption in transit
Cassandra uses Transport Layer Security (TLS) for client and internode communications.
Authentication
The security mechanism is pluggable, which means that you can easily swap out one authentication method for another. You can also provide your own method of authenticating to Cassandra, such as a Kerberos ticket, or if you want to store passwords in a different location, such as an LDAP directory.
Authorization
The authorizer that’s plugged in by default is org.apache.cassandra.auth.Allow AllAuthorizer. Cassandra also provides a role-based access control (RBAC) capability, which allows you to create roles and assign permissions to these roles.
Conclusion
In this post, we discussed several patterns for running Cassandra in the AWS Cloud. This post describes how you can manage Cassandra databases running on Amazon EC2. AWS also provides managed offerings for a number of databases. To learn more, see Purpose-built databases for all your application needs.
If you have questions or suggestions, please comment below.
Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.
Provanshu Dey is a Senior IoT Consultant with AWS Professional Services. He works on highly scalable and reliable IoT, data and machine learning solutions with our customers. In his spare time, he enjoys spending time with his family and tinkering with electronics & gadgets.
Prasad Alle is a consultant with AWS Professional Services
Intuit, a creator of business and financial management solutions, is a leading enterprise customer for AWS. The Intuit Data team (IDEA) at Intuit is responsible for building platforms and products that enable a data-driven personalized experience across Intuit products and services.
One dimension of this platform is the streaming data pipeline that enables event-based data to be available for both analytic and real time applications. These include—but are not limited to—applications for personalization, product discovery, fraud detection, and more.
The challenge is building a platform that can support and integrate to over 50+ products and services across Intuit and one that further considers seasonality and the evolution of use cases. Intuit requires a data platform that can scale and abstract the underlying complexities of a distributed architecture, allowing users to focus on leveraging the data rather than managing ingestion.
Amazon EMR, Amazon Kinesis, and Amazon S3 were among the initial considerations to build out this architecture at scale. Given that Intuit had existing infrastructure leveraging Kafka on AWS, the first version was designed using Apache Kafka on Amazon EC2, EMR, and S3 for persistence. Amazon Kinesis provides an alternative managed solution for streaming, which reduces the amount of administration and monitoring required. For more information about Amazon Kinesis reference architectures, see Amazon Kinesis Streams Product Details.
This post demonstrates how to set up Apache Kafka on EC2, use Spark Streaming on EMR to process data coming in to Apache Kafka topics, and query streaming data using Spark SQL on EMR.
Note: This is an example and should not be implemented in a production environment without considering additional operational issues about Apache Kafka and EMR, including monitoring and failure handling.
Intuit’s application architecture
Before detailing Intuit’s implementation, it is helpful to consider the application architecture and physical architecture in the AWS Cloud. The following application architecture can launch via a public subnet or within a private subnet.
Apache Kafka and Amazon EMR in VPC public subnets
The following architecture diagram represents an EMR and Kafka cluster in a VPC public subnet and accesses them through a bastion host to control access and security.
Apache Kafka and Amazon EMR in VPC private subnets
The following architecture diagram represents an EMR cluster in a VPC private subnet with an S3 endpoint and NAT instance; Kafka can also be installed in VPC private subnets. Private subnets allow you to limit access to deployed components, and to control security and routing of the system. You access EMR and Kafka clusters through a bastion host.
By now, you should have a good understanding of the architecture involved and the deployment model you might like to implement from this post.
Stream processing walkthrough
The entire pattern can be implemented in a few simple steps:
Set up Kafka on AWS.
Spin up an EMR 5.0 cluster with Hadoop, Hive, and Spark.
Create a Kafka topic.
Run the Spark Streaming app to process clickstream events.
Use the Kafka producer app to publish clickstream events into Kafka topic.
Explore clickstream events data with SparkSQL.
Prerequisite
To implement the architecture, establish an AWS account, then download and configure the AWS CLI.
Step 1: Set up Kafka on AWS
An AWS CloudFormation template can be used to deploy an Apache Kafka cluster:
Choose Upload a template to Amazon S3 template URL.
Choose Next.
Name and enter the following parameters:
Optionally, specify a tag for the instance. Choose Next.
Review choices, check the IAM acknowledgement, and then choose Create.
The stack takes several minutes to complete as it creates the EC2 instance and provisions Apache Kafka and its prerequisites.
Return to the CloudFormation console. When the CloudFormation stack status returns CREATE_COMPLETE, your EC2 instance is ready. On the Output tab, note the DNS names for Kafka ZooKeeper and broker.
Step 2: Spin up an EMR 5.0 cluster with Hadoop, Hive, and Spark
This step allows the creation of the EMR cluster. You may use the following sample command to create an EMR cluster with AWS CLI tools or you can create the cluster on the console.
If you decide to create the cluster using the CLI, remember to replace myKeyName, myLogBucket, myRegion, and mySubnetId with your EC2 key pair name, logging bucket, region, and public/private subnets.
The cluster is created in approximately 10 minutes and changes to the “Waiting” state.
Step 3: Create a Kafka topic
Kafka maintains feeds of messages in topics. A topic is a category or feed name to which messages are published. To create a Kafka topic, use the following instructions.
Note: Change the ZooKeeper instance DNS address based on your environment.
Step 4: Run the Spark Streaming app to process clickstream events
The Spark Streaming app is able to consume clickstream events as soon as the Kafka producer starts publishing events (as described in Step 5) into the Kafka topic. For this post, I used the Direct Approach (No Receivers) method of Spark Streaming to receive data from Kafka.
After the Kafka producer starts publishing, the Spark Streaming app processes clickstream events, extracts metadata, and stores it in Apache Hive for interactive analysis. Below code explains.
package com.awsproserv.kafkaandsparkstreaming
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext, Time }
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
object ClickstreamSparkstreaming {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s"""
|Usage: ClickstreamSparkstreaming <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <brokers> is a list of one or more Kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("DirectKafkaClickstreams")
// Create context with 10-second batch intervals
val ssc = new StreamingContext(sparkConf, Seconds(10))
// Create direct Kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
val spark = SparkSession
.builder
.config(sparkConf)
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
// Drop the table if it already exists
spark.sql("DROP TABLE IF EXISTS csmessages_hive_table")
// Create the table to store your streams
spark.sql("CREATE TABLE csmessages_hive_table ( recordtime string, eventid string, url string, ip string ) STORED AS TEXTFILE")
// Convert RDDs of the lines DStream to DataFrame and run a SQL query
lines.foreachRDD { (rdd: RDD[String], time: Time) =>
import spark.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val messagesDataFrame = rdd.map(_.split(",")).map(w => Record(w(0), w(1), w(2), w(3))).toDF()
// Creates a temporary view using the DataFrame
messagesDataFrame.createOrReplaceTempView("csmessages")
//Insert continuous streams into Hive table
spark.sql("INSERT INTO TABLE csmessages_hive_table SELECT * FROM csmessages")
// Select the parsed messages from the table using SQL and print it (since it runs on drive display few records)
val messagesqueryDataFrame =
spark.sql("SELECT * FROM csmessages")
println(s"========= $time =========")
messagesqueryDataFrame.show()
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class Record(recordtime: String,eventid: String,url: String,ip: String)
To run the Spark streaming application, use the following instructions.
Get the source code from the aws-blog-sparkstreaming-from-kafka GitHub repo. Run “mvn clean install” to generate the JAR file and copy the kafkaandsparkstreaming-0.0.1-SNAPSHOT-jar-with-dependencies.jar file from your /target folder to an S3 bucket.
Note: Modify the above command to reflect the ClusterId (for example, j-17DQ5BN6HWKAC), S3 bucket, and KafkaBrokerDNS value.
Step 5: Use the Kafka producer app to publish clickstream events into the Kafka topic
A Kafka producer application written in Scala ingests random clickstream data into the Kafka topic “blog-replay”. To run the Kafka producer application, use the following instructions:
Get the source code from the aws-blog-sparkstreaming-from-kafka GitHub repo. Run “mvn clean install” to generate the JAR file and copy the kafkaandsparkstreaming-0.0.1-SNAPSHOT-jar-with-dependencies.jar file from your /target folder to the Kafka broker instance.
Data has been published to the Kafka topic in CSV format as shown below:
recordtime,eventid,url,ip
Step 6: Explore clickstream event data with SparkSQL
In the previous steps, clickstream events were generated using the Kafka producer and published to the Kafka topic. These events have been processed with Spark Streaming.
Next, log in to the EMR master node, launch a SparkSQL session, run a few SQL commands to explore the processed events that you have published.
Type spark-sql to launch the spark-sql CLI session:
select * from csmessages_hive_table limit 10;
Conclusion
These are the lessons learned from Intuit’s experience:
Running broker processes does not guarantee that the cluster is in a good state. The Intuit team experienced problems with undetected Kafka even though the Kafka broker process was running. Monitor the ZooKeeper path (/broker/ids) to make sure that brokers are registered under its path.
Be aware that a different machine image could affect the functionality or performance of Kafka.
Make sure to use separate instances for Kafka brokers and zookeepers. This makes debugging problems easier.
Having more than one Kafka cluster in different Availability Zones can help zone failover issues and also help in upgrading, as compared to having one cluster that requires one-on-one upgrades.
I would like to thank the Intuit Data team (IDEA) for their contributions to this post.
Lucian Lita, Director of Data Engineering – “Apache Kafka on Amazon EC2 and Apache Spark on Amazon EMR turned out to be the right combination for its scalability, reliability and security. This service is key to how Intuit captures data and serves as an inter-service communication backbone. With this in place, Intuit’s team can now focus on higher-order financial service and product needs.”
Tilmann Bruckhaus, Group Manager of Data Engineering – “Intuit has been transitioning from a traditional on-premises data platform to a cloud-based environment where we invest in technologies including Apache Kafka on Amazon EC2, Spark Streaming on Amazon EMR.”
Mita Mahadevan, Group Manager of Data Engineering – “A scalable, elastic data pipeline and stream processing platform is key to delivering real time personalization and predictive analytics within our products.”
If you have questions or suggestions, please comment below.
Prasad Alle is a consultant with AWS Professional Services
Intuit, a creator of business and financial management solutions, is a leading enterprise customer for AWS. The Intuit Data team (IDEA) at Intuit is responsible for building platforms and products that enable a data-driven personalized experience across Intuit products and services.
One dimension of this platform is the streaming data pipeline that enables event-based data to be available for both analytic and real time applications. These include—but are not limited to—applications for personalization, product discovery, fraud detection, and more.
The challenge is building a platform that can support and integrate to over 50+ products and services across Intuit and one that further considers seasonality and the evolution of use cases. Intuit requires a data platform that can scale and abstract the underlying complexities of a distributed architecture, allowing users to focus on leveraging the data rather than managing ingestion.
Amazon EMR, Amazon Kinesis, and Amazon S3 were among the initial considerations to build out this architecture at scale. Given that Intuit had existing infrastructure leveraging Kafka on AWS, the first version was designed using Apache Kafka on Amazon EC2, EMR, and S3 for persistence. Amazon Kinesis provides an alternative managed solution for streaming, which reduces the amount of administration and monitoring required. For more information about Amazon Kinesis reference architectures, see Amazon Kinesis Streams Product Details.
This post demonstrates how to set up Apache Kafka on EC2, use Spark Streaming on EMR to process data coming in to Apache Kafka topics, and query streaming data using Spark SQL on EMR.
Note: This is an example and should not be implemented in a production environment without considering additional operational issues about Apache Kafka and EMR, including monitoring and failure handling.
Intuit’s application architecture
Before detailing Intuit’s implementation, it is helpful to consider the application architecture and physical architecture in the AWS Cloud. The following application architecture can launch via a public subnet or within a private subnet.
Apache Kafka and Amazon EMR in VPC public subnets
The following architecture diagram represents an EMR and Kafka cluster in a VPC public subnet and accesses them through a bastion host to control access and security.
Apache Kafka and Amazon EMR in VPC private subnets
The following architecture diagram represents an EMR cluster in a VPC private subnet with an S3 endpoint and NAT instance; Kafka can also be installed in VPC private subnets. Private subnets allow you to limit access to deployed components, and to control security and routing of the system. You access EMR and Kafka clusters through a bastion host.
By now, you should have a good understanding of the architecture involved and the deployment model you might like to implement from this post.
Stream processing walkthrough
The entire pattern can be implemented in a few simple steps:
Set up Kafka on AWS.
Spin up an EMR 5.0 cluster with Hadoop, Hive, and Spark.
Create a Kafka topic.
Run the Spark Streaming app to process clickstream events.
Use the Kafka producer app to publish clickstream events into Kafka topic.
Explore clickstream events data with SparkSQL.
Prerequisite
To implement the architecture, establish an AWS account, then download and configure the AWS CLI.
Step 1: Set up Kafka on AWS
An AWS CloudFormation template can be used to deploy an Apache Kafka cluster:
Choose Upload a template to Amazon S3 template URL.
Choose Next.
Name and enter the following parameters:
Optionally, specify a tag for the instance. Choose Next.
Review choices, check the IAM acknowledgement, and then choose Create.
The stack takes several minutes to complete as it creates the EC2 instance and provisions Apache Kafka and its prerequisites.
Return to the CloudFormation console. When the CloudFormation stack status returns CREATE_COMPLETE, your EC2 instance is ready. On the Output tab, note the DNS names for Kafka ZooKeeper and broker.
Step 2: Spin up an EMR 5.0 cluster with Hadoop, Hive, and Spark.
This step allows the creation of the EMR cluster. You may use the following sample command to create an EMR cluster with AWS CLI tools or you can create the cluster on the console.
If you decide to create the cluster using the CLI, remember to replace myKeyName, myLogBucket, myRegion, and mySubnetId with your EC2 key pair name, logging bucket, region, and public/private subnets.
The cluster is created in approximately 10 minutesand changes to the “Waiting” state.
Step 3: Create a Kafka topic
Kafka maintains feeds of messages in topics. A topic is a category or feed name to which messages are published. To create a Kafka topic, use the following instructions.
Note: Change the ZooKeeper instance DNS address based on your environment.
Step 4: Run the Spark Streaming app to process clickstream events
The Spark Streaming app is able to consume clickstream events as soon as the Kafka producer starts publishing events (as described in Step 5) into the Kafka topic. For this post, I used the Direct Approach (No Receivers) method of Spark Streaming to receive data from Kafka.
After the Kafka producer starts publishing, the Spark Streaming app processes clickstream events, extracts metadata, and stores it in Apache Hive for interactive analysis. Below code explains.
package com.awsproserv.kafkaandsparkstreaming
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{ Seconds, StreamingContext, Time }
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
object ClickstreamSparkstreaming {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println(s"""
|Usage: ClickstreamSparkstreaming <brokers> <topics>
| <brokers> is a list of one or more Kafka brokers
| <brokers> is a list of one or more Kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("DirectKafkaClickstreams")
// Create context with 10-second batch intervals
val ssc = new StreamingContext(sparkConf, Seconds(10))
// Create direct Kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicsSet)
val lines = messages.map(_._2)
val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
val spark = SparkSession
.builder
.config(sparkConf)
.config("spark.sql.warehouse.dir", warehouseLocation)
.enableHiveSupport()
.getOrCreate()
// Drop the table if it already exists
spark.sql("DROP TABLE IF EXISTS csmessages_hive_table")
// Create the table to store your streams
spark.sql("CREATE TABLE csmessages_hive_table ( recordtime string, eventid string, url string, ip string ) STORED AS TEXTFILE")
// Convert RDDs of the lines DStream to DataFrame and run a SQL query
lines.foreachRDD { (rdd: RDD[String], time: Time) =>
import spark.implicits._
// Convert RDD[String] to RDD[case class] to DataFrame
val messagesDataFrame = rdd.map(_.split(",")).map(w => Record(w(0), w(1), w(2), w(3))).toDF()
// Creates a temporary view using the DataFrame
messagesDataFrame.createOrReplaceTempView("csmessages")
//Insert continuous streams into Hive table
spark.sql("INSERT INTO TABLE csmessages_hive_table SELECT * FROM csmessages")
// Select the parsed messages from the table using SQL and print it (since it runs on drive display few records)
val messagesqueryDataFrame =
spark.sql("SELECT * FROM csmessages")
println(s"========= $time =========")
messagesqueryDataFrame.show()
}
// Start the computation
ssc.start()
ssc.awaitTermination()
}
}
/** Case class for converting RDD to DataFrame */
case class Record(recordtime: String,eventid: String,url: String,ip: String)
To run the Spark streaming application, use the following instructions.
Get the source code from the aws-blog-sparkstreaming-from-kafka GitHub repo. Run "mvn clean install" to generate the JAR file and copy the kafkaandsparkstreaming-0.0.1-SNAPSHOT-jar-with-dependencies.jar file from your /target folder to an S3 bucket.
Note: Modify the above command to reflect the ClusterId (for example, j-17DQ5BN6HWKAC), S3 bucket, and KafkaBrokerDNS value.
Step 5: Use the Kafka producer app to publish clickstream events into the Kafka topic
A Kafka producer application written in Scala ingests random clickstream data into the Kafka topic “blog-replay”. To run the Kafka producer application, use the following instructions:
Get the source code from the aws-blog-sparkstreaming-from-kafka GitHub repo. Run "mvn clean install" to generate the JAR file and copy the kafkaandsparkstreaming-0.0.1-SNAPSHOT-jar-with-dependencies.jar file from your /target folder to the Kafka broker instance.
Data has been published to the Kafka topic in CSV format as shown below:
recordtime,eventid,url,ip
Step 6: Explore clickstream event data with SparkSQL
In the previous steps, clickstream events were generated using the Kafka producer and published to the Kafka topic. These events have been processed with Spark Streaming.
Next, log in to the EMR master node, launch a SparkSQL session, run a few SQL commands to explore the processed events that you have published.
Type spark-sql to launch the spark-sql CLI session:
select * from csmessages_hive_table limit 10;
Conclusion
These are the lessons learned from Intuit’s experience:
Running broker processes does not guarantee that the cluster is in a good state. The Intuit team experienced problems with undetected Kafka even though the Kafka broker process was running. Monitor the ZooKeeper path (/broker/ids) to make sure that brokers are registered under its path.
Be aware that a different machine image could affect the functionality or performance of Kafka.
Make sure to use separate instances for Kafka brokers and zookeepers. This makes debugging problems easier.
Having more than one Kafka cluster in different Availability Zones can help zone failover issues and also help in upgrading, as compared to having one cluster that requires one-on-one upgrades.
I would like to thank the Intuit Data team (IDEA) for their contributions to this post.
Lucian Lita, Director of Data Engineering – “Apache Kafka on Amazon EC2 and Apache Spark on Amazon EMR turned out to be the right combination for its scalability, reliability and security. This service is key to how Intuit captures data and serves as an inter-service communication backbone. With this in place, Intuit’s team can now focus on higher-order financial service and product needs.”
Tilmann Bruckhaus, Group Manager of Data Engineering – “Intuit has been transitioning from a traditional on-premises data platform to a cloud-based environment where we invest in technologies including Apache Kafka on Amazon EC2, Spark Streaming on Amazon EMR.”
Mita Mahadevan, Group Manager of Data Engineering – “A scalable, elastic data pipeline and stream processing platform is key to delivering real time personalization and predictive analytics within our products.”
If you have questions or suggestions, please comment below.
By continuing to use the site, you agree to the use of cookies. more information
The cookie settings on this website are set to "allow cookies" to give you the best browsing experience possible. If you continue to use this website without changing your cookie settings or you click "Accept" below then you are consenting to this.