Tag Archives: Amazon Kinesis Data Firehose

Optimizing downstream data processing with Amazon Kinesis Data Firehose and Amazon EMR running Apache Spark

Post Syndicated from Srikanth Kodali original https://aws.amazon.com/blogs/big-data/optimizing-downstream-data-processing-with-amazon-kinesis-data-firehose-and-amazon-emr-running-apache-spark/

For most organizations, working with ever-increasing volumes of data and incorporating new data sources can be a challenge.  Often, AWS customers have messages coming from various connected devices and sensors that must be efficiently ingested and processed before further analysis.  Amazon S3 is a natural landing spot for data of all types.  However, the way data is stored in Amazon S3 can make a significant difference in the efficiency and cost of downstream data processing.  Specifically, Apache Spark can be over-burdened with file operations if it is processing a large number of small files versus fewer larger files.  Each of these files has its own overhead of a few milliseconds for opening, reading metadata information, and closing. This overhead of file operations on these large numbers of files results in slow processing. This blog post shows how to use Amazon Kinesis Data Firehose to merge many small messages into larger messages for delivery to Amazon S3.  This results in faster processing with Amazon EMR running Spark.

Like Amazon Kinesis Data Streams, Kinesis Data Firehose accepts a maximum incoming message size of 1 MB.  If a single message is greater than 1 MB, it can be compressed before placing it on the stream.  However, at large volumes, a message or file size of 1 MB or less is usually too small.  Although there is no right answer for file size, 1 MB for many datasets would just yield too many files and file operations.

This post also shows how to read the compressed files using Apache Spark that are in Amazon S3, which does not have a proper file name extension and store back in Amazon S3 in parquet format.

Solution overview

The steps we follow in this blog post are:

  1. Create a virtual private cloud (VPC) and an Amazon S3 bucket.
  2. Provision a Kinesis data stream, and an AWS Lambda function to process the messages from the Kinesis data stream.
  3. Provision Kinesis Data Firehose to deliver messages to Amazon S3 sent from the Lambda function in step 2. This step also provisions an Amazon EMR cluster to process the data in Amazon S3.
  4. Generate test data with custom code running on an Amazon EC2
  5. Run a sample Spark program from the Amazon EMR cluster’s master instance to read the files from Amazon S3, convert them into parquet format and write back to an Amazon S3 destination.

The following diagram explains how the services work together:

The AWS Lambda function in the diagram reads the messages, append additional data to them, and compress them with gzip before sending to Amazon Kinesis Data Firehose. The reason for this is most customers need some enrichment to the data before arriving to Amazon S3.

Amazon Kinesis Data Firehose can buffer incoming messages into larger records before delivering them to your Amazon S3 bucket. It does so according to two conditions, buffer size (up to 128 MB) and buffer interval (up to 900 seconds). Record delivery is triggered once either of these conditions has been satisfied.

An Apache Spark job reads the messages from Amazon S3, and stores them in parquet format. With parquet, data is stored in a columnar format that provides more efficient scanning and enables ad hoc querying or further processing by services like Amazon Athena.

Considerations

The maximum size of a record sent to Kinesis Data Firehose is 1,000 KB. If your message size is greater than this value, compressing the message before it is sent to Kinesis Data Firehose is the best approach. Kinesis Data Firehose  also offers compression of messages after they are written to the Kinesis Data Firehose data stream. Unfortunately, this does not overcome the message size limitation, because this compression happens after the message is written. When Kinesis Data Firehose delivers a previously compressed message to Amazon S3 it is written as an object without a file extension. For example, if a message is compressed with gzip before it is written to Kinesis Data Firehose, it is delivered to Amazon S3 without the .gz extension. This is problematic if you are using Apache Spark for downstream processing because a “.gz” extension is required.

We will see how to overcome this issue by reading the files using the Amazon S3 API operations later in this blog.

Prerequisites and assumptions

To follow the steps outlined in this blog post, you need the following:

  • An AWS account that provides access to AWS services.
  • An AWS Identity and Access Management (IAM) user with an access key and secret access key to configure the AWS CLI.
  • The templates and code are intended to work in the US East (N. Virginia) Region only.

Additionally, be aware of the following:

  • We configure all services in the same VPC to simplify networking considerations.
  • Important: The AWS CloudFormation templates and the sample code that we provide use hardcoded user names and passwords and open security groups. These are for testing purposes only. They aren’t intended for production use without any modifications.

Implementing the solution

You can use this downloadable template for single-click deployment. This template is launched in the US East (N. Virginia) Region by default. Do not change to a different Region. The template is designed to work only in the US East (N. Virginia) Region. To launch directly through the console, choose the Launch Stack button.

This template takes the following parameters. Some of the parameters have default values, and you can’t edit these. These predefined names are hardcoded in the code. For some of the parameters, you must provide the values. The following table provides additional details.

For this parameterProvide this
StackNameProvide the stack name.

ClientIP

 

The IP address range of the client that is allowed to connect to the cluster using SSH.
FirehoseDeliveryStreamNameThe name of the Amazon Firehose delivery stream. Default value is set to “AWSBlogs-LambdaToFireHose”.
InstanceTypeThe EC2 instance type.
KeyNameThe name of an existing EC2 key pair to enable access to login.
KinesisStreamNameThe name of the Amazon Kinesis Stream. Default value is set to “AWS-Blog-BaseKinesisStream”
RegionAWS Region – By default it is us-east-1 — US East (N. Virginia). Do not change this as the scripts are developed to work in this Region only.

EMRClusterName

 

A name for the EMR cluster.
S3BucketNameThe name of the bucket that is created in your account. Provide some unique name to this bucket. This bucket is used for storing the messages and output from the Spark code.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names and for I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND. And then click on the Create button.

If you use this one-step solution, you can skip to Step 7: Generate test dataset and load into Kinesis Data Streams.

To create each component individually, use the following steps.

1. Use the AWS CloudFormation template to configure Amazon VPC and create an Amazon S3 bucket

In this step, we set up a VPC, public subnet, internet gateway, route table, and a security group. The security group has two inbound access rules. The first inbound rule allows access to the TCP port 22 (SSH) from the provided client IP CIDR range and the second inbound rule allows access to any TCP port from any host with in the same security group. We use this VPC and subnet for all other services that are created in the next steps. In addition to these resources, we will also create a standard Amazon S3 bucket with a provided bucket name to store the incoming data and processed data. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.
S3BucketNameProvide a unique Amazon S3 bucket. This bucket is created in your account.
ClientIpProvide a CIDR IP address range that is added to inbound rule of the security group. You can get your current IP address from “checkip.amazon.com” web url.

After you specify the template details, choose Next. On the Review page, choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
StackNameName
VPCIDVpc-xxxxxxx
SubnetIDsubnet-xxxxxxxx
SecurityGroupsg-xxxxxxxxxx
S3BucketDomain<S3_BUCKET_NAME>.s3.amazonaws.com
S3BucketARNarn:aws:s3:::<S3_BUCKET_NAME>

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

2.  Use the AWS CloudFormation template to create necessary IAM Roles

In this step, we set up two AWS IAM roles. One of the IAM roles will be used by an AWS Lambda function to allow access to Amazon S3 service, Amazon Kinesis Data Firehose, Amazon CloudWatch Logs, and Amazon EC2 instances.  The second IAM role is used by the Amazon Kinesis Data Firehose service to access Amazon S3 service. You can use this downloadable CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names. Choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
LambdaRoleArnarn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-lamdarole
FirehoseRoleArnarn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-firehoserole

When the stack launch is complete, it returns the output with information about the resources that were created. Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

3. Use an AWS CloudFormation template to configure the Amazon Kinesis Data Firehose data stream

In this step, we set up Amazon Kinesis Data Firehose with Amazon S3 as destination for the incoming messages. We select the Uncompressed option for compression format, buffering options with 128 MB size and interval seconds of 300. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.
FirehoseDeliveryStreamNameProvide the name of the Amazon Kinesis Data Firehose delivery stream. The default value is set to “AWSBlogs-LambdaToFirehose”
RoleProvide the Kinesis Data Firehose IAM role ARN that was created as part of step 2.
S3BucketARNSelect the S3BucketARN. You can get this from the step 1 AWS CloudFormation output.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

4. Use an AWS CloudFormation template to create a Kinesis data stream and a Lambda function

In this step, we set up a Kinesis data stream and an AWS Lambda function. We can use the AWS Lambda function to process incoming messages in a Kinesis data stream. An event source mapping is also created as part of this template. This adds a trigger to the AWS Lambda function for the Kinesis data stream source. For more information about creating event source mapping, see Creating an Event Source Mapping. This Kinesis data stream is created with 10 shards and the Lambda function is created with a Java 8 runtime. We allocate memory size of 1920 MB and timeout of 300 seconds. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides details.

For this parameterDo this
StackNameProvide the stack name.
KinesisStreamNameProvide the name of the Amazon Kinesis stream. Default value is set to ‘AWS-Blog-BaseKinesisStream’
RoleProvide the IAM Role created for Lambda function as part of the second AWS CloudFormation template. Get the value from the output of second AWS CloudFormation template.
S3BucketProvide the existing Amazon S3 bucket name that was created using first AWS CloudFormation template. Do not use the domain name. Provide the bucket name only.
RegionSelect the AWS Region. By default it is us-east-1 — US East (N. Virginia).

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

5. Use an AWS CloudFormation template to configure the Amazon EMR cluster

In this step, we set up an Amazon EMR 5.16.0 cluster with “Spark”, “Ganglia” and “Hive” applications. We create this cluster with one master and two core nodes, and use an r4.xlarge instance type. The template uses an AWS Glue metastore for the Amazon EMR hive metastore. This Amazon EMR cluster is used to process the messages in Amazon S3 bucket that are created by the Amazon Kinesis Data Firehose data stream. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
EMRClusterNameProvide the name for the EMR cluster.
ClusterSecurityGroupSelect the security group ID that was created as part of the first AWS CloudFormation template.
ClusterSubnetIDSelect the subnet ID that was created as part of the first AWS CloudFormation template.
AllowedCIDRProvide the IP address range of the client that is allowed to connect to the cluster.
KeyNameProvide the name of an existing EC2 key pair to access the Amazon EMR cluster.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
EMRClusterMasterssh [email protected] -i <KEY_PAIR_NAME>.pem

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

6. Use an AWS CloudFormation template to create an Amazon EC2 Instance to generate test data

In this step, we set up an Amazon EC2 instance and install open-jdk version 1.8. The AWS CloudFormation script that creates this EC2 instance runs two additional steps. First, it downloads and installs open-jdk version 1.8. Second, it downloads a Java program jar file on to the EC2 instance’s ec2-user home directory. We use this Java program to generate test data messages with an approximate size of ~900 KB. We then send them to the Kinesis data stream that was created as part of the previous steps. The Java jar file name is: “sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar”.

You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
EC2SecurityGroupSelect the security group ID that was created from the first AWS CloudFormation template.
EC2SubnetSelect the subnet that was created from the first AWS CloudFormation template.
InstanceTypeSelect the provided instance type. By default, it selects r4.4xlarge instance.
KeyNameName of an existing EC2 key pair to enable SSH access to the EC2 instance.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page select “I acknowledge that AWS CloudFormation might create IAM resources with custom names” option and, click Create button.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
EC2Instancessh [email protected]<Public-IP> -i <KEY_PAIR_NAME>.pem

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

7. Generate the test dataset and load into Kinesis Data Streams

After all of the previous AWS CloudFormation stacks are created successfully, log in to the EC2 instance that was created as part of the step 6. Use the “ssh” command as shown in the CloudFormation stack template output. This template copies the “sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar” file, which we use to generate the test data and send to Amazon Kinesis Data Streams. You can find the code corresponding to this sample Kinesis producer in this Git repository.

Make sure your EC2 instance’s security group allows ssh port 22 (Inbound) from your IP address. If not, update your security group inbound access.

Run the following commands to generate some test data.

$ cd;

 

$ ls -ltra sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

-rwxr-xr-x 1 ec2-user ec2-user 27536802 Oct 29 21:19 sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

 

$java -Xms1024m -Xmx25600m -XX:+UseG1GC -cp sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar com.optimize.downstream.entry.Main 10000

 

This java program uses PutRecords API method that allows many records to be sent with a single HTTP request. For more information on this you can check this AWS blog. Once you run the above java program, you will see the below output that shows messages are in the process of sending to Kinesis Data Stream.

“Starting producer and consumer.....
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 0
Producer Thread # 9 is going to sleep mode for 500 ms.
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 1
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 2
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 3
::
::
Record sent to Kinesis Stream. Record size is ::: 5042850 KB
Sending a record to Kinesis Stream with 5 messages grouped together.
Record sent to Kinesis Stream. Record size is ::: 5042726 KB
Sending a record to Kinesis Stream with 5 messages grouped together.
Record sent to Kinesis Stream. Record size is ::: 5042729 KB”

When running the sample Kinesis producer jar, notice that the number of messages is 10,000. This program generates the test data messages and is not a replacement for your load testing tool. This is created to demonstrate the use case presented in this post.

After all of the messages generated and sent to Amazon Kinesis Data Streams, program will exit gracefully.

The sample JSON input message format is shown as follows:

   "processedDate":"2018/10/30 19:05:19",
   "currentDate":"2018/10/30 19:05:07",
   "hashDeviceId":"0c2745e4-c2d6-4d43-8339-9c2401e80e92",
   "deviceId":"94581b5f-a117-484a-8e3c-4fcc2dbd53b7",
   "accelerometerSensorList":[  
      {  
         "accelerometer_Y":8,
         "gravitySensor_X":5,
         "accelerometer_X":9,
         "gravitySensor_Z":4,
         "accelerometer_Z":1,
         "gravitySensor_Y":5,
         "linearAccelerationSensor_Z":3,
         "linearAccelerationSensor_Y":9,
         "linearAccelerationSensor_X":9
      },
      {  
         "accelerometer_Y":1,
         "gravitySensor_X":3,
         "accelerometer_X":5,
         "gravitySensor_Z":5,
         "accelerometer_Z":7,
         "gravitySensor_Y":9,
         "linearAccelerationSensor_Z":6,
         "linearAccelerationSensor_Y":5,
         "linearAccelerationSensor_X":3
      },
 {
   …
 },
 {
   …
 },
 :
 :
   ],
   "tempSensorList":[  
      {  
         "kelvin":585.4928040286752,
         "celsius":43.329574923775425,
         "fahrenheit":50.13864584530086
      },
      {  
         "kelvin":349.95625855125814,
         "celsius":95.68423052685313,
         "fahrenheit":7.854854574219985
      },
 {
   …
 },
 {
   …
 },
 :
 :
 
   ],
   "illuminancesSensorList":[  
      {  
         "illuminance":44.65135784368194
      },
      {  
         "illuminance":98.15404017082403
      },
 {
   …
 },
 {
   …
 },
 :
 :
   ],
   "gpsSensorList":[  
      {  
         "altitude":4.38273213294682,
         "heading":7.416314616289915,
         "latitude":5.759723677991661,
         "longitude":1.4732885894731842
      },
      {  
         "altitude":9.816473807569487,
         "heading":5.118919157684835,
         "latitude":3.581361614110458,
         "longitude":1.3699272610616127
      },
 {
   …
 },
 {
   …
 },
 :
 :
   }

 

Log in to the Kinesis Data Streams console, then choose the Kinesis data stream that was created as part of the step 4.  Choose the Monitor tab to see the graphs. Run the data generation utility for at least 15 mins to generate enough data.

8. Processing Kinesis Data Streams messages using AWS Lambda

As part of the previously-described setup, we also use an AWS Lambda function (name:LambdaForProcessingKinesisRecords) to process the messages from the Kinesis data stream. This Lambda function reads each message content and appends “additional data.” This demonstrates that the incoming message from Kinesis data stream is read, and appended with some additional information to make the message size more than 1 MB.  Several customers have a use case like this to enrich the incoming messages by adding additional information. After the AWS Lambda function appends additional data to incoming messages, it sends them to Amazon Kinesis Data Firehose. Because Kinesis Data Firehose accepts only messages that are less than 1 MB, we must compress the messages before sending to it. In the Lambda function, we are compressing the message using gzip compression before sending it to Kinesis Data Firehose. In addition to compressing each message, we are also appending a new line character (“/n”) to each message after compressing it to separate the messages.

We set the buffer size to 128 MB and duration of the buffer is 900 seconds while creating the Kinesis Data Firehose. This helps merge the incoming compressed messages into larger messages and sends to the provided Amazon S3 bucket.

The AWS Lambda function appends the following content to the original message in Kinesis Data Streams after reading it.

"testAdditonalDataList": [
  {
    "dimesnion_X": 9,
    "dimesnion_Y": 2,
    "dimesnion_Z": 2
  },
  {
    "dimesnion_X": 3,
    "dimesnion_Y": 10,
    "dimesnion_Z": 5
  }
  {
    …
  },
  {
    …
  },
  :
  :
]

If we do not compress the message before sending to Kinesis Data Firehose, it throws this error message in the Amazon CloudWatch Logs.

Here is the code snippet where we are compressing the message in the AWS Lambda function. The complete code can be found in this Git repository.

private String sendToFireHose(String mergedJsonString)
{
    PutRecordResult res = null;
    try {
        //To Firehose -
        System.out.println("MESSAGE SIZE BEFORE COMPRESSION IS : " + mergedJsonString.toString().getBytes(charset).length);
        System.out.println("MESSAGE SIZE AFTER GZIP COMPRESSION IS : " + compressMessage(mergedJsonString.toString().getBytes(charset)).length);
        PutRecordRequest req = new PutRecordRequest()
                .withDeliveryStreamName(firehoseStreamName);

        // Without compression - Send to Firehose
        //Record record = new Record().withData(ByteBuffer.wrap((mergedJsonString.toString() + "\r\n").getBytes()));

        // With compression - send to Firehose
        Record record = new Record().withData(ByteBuffer.wrap(compressMessage((mergedJsonString.toString() + "\r\n").getBytes())));
        req.setRecord(record);
        res = kinesisFirehoseClient.putRecord(req);
    }
    catch (IOException ie) {
        ie.printStackTrace();
    }
    return res.getRecordId();
}

You can check the provided bucket to see if the messages are flowing into the bucket. The Amazon S3 bucket should show something similar to the following example:

You see the files generated from Kinesis Data Firehose that do not have any extension. By default, Kinesis Data Firehose does not provide any extension to the files that are generated in Amazon S3 bucket unless you select a compression option. But in our use case, since the size of the uncompressed input message is greater than 1 MB, we are compressing it before sending to Kinesis Data Firehose. As the message is already compressed, we are not selecting any compression option in Kinesis Data Firehose, as it double-compresses the message and the downstream Spark application cannot process this.

9. Reading and converting the data into parquet format using Apache Spark program with Amazon EMR

As we noted down from the previous screen shot, Kinesis Data Firehose by default does not generate any file extensions to the files that are written into Amazon S3 bucket. This creates a problem while reading the files using Apache Spark. Apache Spark, by default, checks for a valid file name extension if the file is compressed. In this case for gzip compression, it looks for <filename>.gz to successfully read it.

To overcome this issue, we can use Amazon S3 API operations, particularly AmazonS3Client class, to list all the Amazon S3 keys and use Spark’s parallelize method to read the contents of the files. After reading the file content, we can uncompress it using GZipInputStream class. You can find the code snippet below. The complete code can be found in the Git repository.

val allLinesRDD = spark.sparkContext.parallelize(s3ObjectKeys).flatMap
{ key => Source.fromInputStream
  (
   new GZipInputStream(s3Client.getObject(bucketName, key).getObjectContent:   InputStream)
  ).getLines 
}

var finalDF = spark.read.json(allLinesRDD).toDF()

Once the Amazon EMR cluster creation is completed successfully, login to the Amazon EMR master machine using the following command. You can get the “ssh” login command from the AWS CloudFormation stack 5 (step 5) outputs parameter “EMRClusterMaster”.

  • ssh [email protected] -i <KEYPAIR_NAME>.pem
  • Make sure the security port 22 is opened to connect to the Amazon EMR master machine.

Run the Spark program using the following Spark submit command.

spark-submit --class com.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet --master yarn --deploy-mode client s3://aws-bigdata-blog/artifacts/aws-blog-optimize-downstream-data-processing/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar <S3_BUCKET_NAME> fromfirehose/<YEAR>/ output-emr-parquet/

Change the S3_BUCKET_NAME and YEAR values from the previous Spark command.

Argument #PropertyValue
1–classcom.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet
2–masteryarn
3–deploy-modeclient
4s3://aws-bigdata-blog/artifacts/aws-blog-avoid-small-files/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar
5S3_BUCKET_NAMEThe Amazon S3 bucket name that was created as part of the AWS CloudFormation template. The source files are created in this bucket.
6<INPUT S3 LOCATION>“fromfirehose/<YYYY>/”. The input files are created in this Amazon S3 key location under the bucket that was created. “YYYY” represents the current year. For example, “fromfirehose/2018/”
7<OUTPUT S3 LOCATION>Provide an output directory name that will be created under the above provided Amazon S3 bucket. For example: “output-emr-parquet/”

 

When the program finishes running, you can check the Amazon S3 output location to see the files that are written in parquet format.

Cleaning up after the migration

After completing and testing this solution, clean up the resources by stopping your tasks and deleting the AWS CloudFormation stacks. The stack deletion fails if you have any files in the created Amazon S3 bucket. Make sure that you cleaned up the Amazon S3 bucket that was created before deleting the AWS CloudFormation templates.

Conclusion

In this post, we described the process of avoiding small file creation in Amazon S3 by sending the incoming messages to Amazon Kinesis Data Firehose. We also went through the process of reading and storing the data in parquet format using Apache Spark with an Amazon EMR cluster.

 


About the Author

Photo of Srikanth KodaliSrikanth Kodali is a Sr. IOT Data analytics architect at Amazon Web Services. He works with AWS customers to provide guidance and technical assistance on building IoT data and analytics solutions, helping them improve the value of their solutions when using AWS.

 

 

Amazon Kinesis Data Firehose custom prefixes for Amazon S3 objects

Post Syndicated from Rajeev Chakrabarti original https://aws.amazon.com/blogs/big-data/amazon-kinesis-data-firehose-custom-prefixes-for-amazon-s3-objects/

In February 2019, Amazon Web Services (AWS) announced a new feature in Amazon Kinesis Data Firehose called Custom Prefixes for Amazon S3 Objects. It lets customers specify a custom expression for the Amazon S3 prefix where data records are delivered. Previously, Kinesis Data Firehose allowed only specifying a literal prefix. This prefix was then combined with a static date-formatted prefix to create the output folder in a fixed format. Customers asked for flexibility, so AWS listened and delivered.

Kinesis Data Firehose is most commonly used to consume event data from streaming sources, such as applications or IoT devices.  The data then is typically stored in a data lake, so it can be processed and eventually queried.  When storing data on Amazon S3, it is a best practice to partition or group related data and store it together in the same folder.  This provides the ability to filter the partitioned data and control the amount of data scanned by each query, thus improving performance and reducing cost.

A common way to group data is by date.  Kinesis Data Firehose automatically groups data and stores it into the appropriate folders on Amazon S3 based on the date.  However, the naming of folders in Amazon S3 is not compatible with Apache Hive naming conventions. This makes data more difficult to catalog using AWS Glue crawlers and analyze using big data tools.

This post discusses a new capability that lets us customize how Kinesis Data Firehose names the output folders in Amazon S3. It covers how custom prefixes work, the intended use cases, and includes step-by-step instructions to try the feature in your own account.

The need for custom prefixes for Amazon S3 objects

Previously, Kinesis Data Firehose created a static Universal Coordinated Time (UTC) based folder structure in the format YYYY/MM/DD/HH. It then appended it to the provided prefix before writing objects to Amazon S3. For example, if you provided a prefix “mydatalake/”, the generated folder hierarchy would be “mydatalake/2019/02/09/13”.  However, to be compatible with Hive naming conventions, the folder structure is expected to follow the format “/partitionkey=partitionvalue”.  Using this naming convention, data can be easily cataloged with AWS Glue crawlers, resulting in proper partition names.

Other methods for managing partitions also become possible such as running MSCK REPAIR TABLE in Amazon Athena or Apache Hive on Amazon EMR, which can add all partitions through a single statement. Furthermore, you can use other date-based partitioning patterns like “/dt=2019-02-09-13/” instead of expanding the date out into folders.  This is helpful in reducing the total number of partitions that need to be maintained as the table grows over time. It also simplifies range queries. Providing the ability to specify custom prefixes obviates the need for an additional ETL step to put the data in the right folder structure improving the time to insight.

How custom prefixes for Amazon S3 objects works

This new capability does not let you use any date or timestamp value from your event data, nor can you use any other arbitrary value in the event. Kinesis Data Firehose uses an internal timestamp field called ApproximateArrivalTimestamp. Each data record includes an ApproximateArrivalTimestamp (in UTC) that is set when a stream successfully receives and stores the record. This is commonly referred to as a server-side timestamp. Kinesis Data Firehose buffers incoming records according to the configured buffering hints and delivers them into Amazon S3 objects for the Amazon S3 destination. The resulting objects in Amazon S3 may contain multiple records, each with a different ApproximateArrivalTimestamp. When evaluating timestamps, Kinesis Data Firehose uses the ApproximateArrivalTimestamp of the oldest record that’s contained in the Amazon S3 object being written.

Kinesis Data Firehose also provides the ability to deliver records to a different error output location when there is a delivery, AWS Lambda transformation or format conversion failure. Previously, the error output location could not be configured and was determined by the type of delivery failure. With this release, the error output location (ErrorOutputPrefix) can also be configured. One benefit of this new capability is that you can separate failed records into date partitioned folders for easy reprocessing.

So how do you specify the custom Prefix and the ErrorOutputPrefix? You use an expression of the form: !{namespace:value}, where the namespace can be either firehose or timestamp. The value can be either “random-string” or “error-output-type” for the firehose namespace or a date pattern for the timestamp namespace in the Java DateTimeFormatter format. In a single expression, you can use a combination of the two namespaces although the !{firehose: error-output-type} can be used only in the ErrorOutputPrefix. For more information and examples, see Custom Prefixes for Amazon S3 Objects.

Writing streaming data into Amazon S3 with Kinesis Data Firehose

This walkthrough describes how streaming data can be written into Amazon S3 with Kinesis Data Firehose using a Hive compatible folder structure.  It then shows how AWS Glue crawlers can infer the schema and extract the proper partition names that we designated in Kinesis Data Firehose, and catalog them in AWS Glue Data Catalog.  Finally, we run sample queries to show that partitions are indeed being recognized.

To demonstrate this, we use python code to generate sample data.  We also use a Lambda transform on Kinesis Data Firehose to forcibly create failures. This demonstrates how data can be saved to the error output location. The code that you need for this walkthrough is included here in GitHub.

For this walkthrough, this is the architecture that we are building:

Step 1: Create an Amazon S3 bucket

Create an S3 bucket to be used by Kinesis Data Firehose to deliver event records. We use the AWS Command Line Interface (AWS CLI) to create the Amazon S3 bucket in the US East (N. Virginia) Region. Remember to substitute the bucket name in the example for your own.

aws s3 mb s3://kdfs3customprefixesexample --region us-east-1

Step 2: Lambda Transform (optional)

The incoming events have an ApproximateArrivalTimestamp field in the event payload.  This is sufficient to create a proper folder structure on Amazon S3.  However, when querying the data it may be beneficial to expose this timestamp value as a top level column for easy filtering and validation.  To accomplish this, we create a Lambda function that adds the ApproximateArrivalTimestamp as a top level field in the data payload. The data payload is what Kinesis Data Firehose writes as an object in Amazon S3. Additionally, the Lambda code also artificially generates some processing errors that are delivered to the “ErrorOutputPrefix” location specified for the delivery destination to illustrate the use of expressions in the “ErrorOutputPrefix.”

Create an IAM role for the Lambda transform function

First, create a role for the Lambda function called LambdaBasicRole. The TrustPolicyForLambda.json file is included in the GitHub repository.

$ aws iam create-role --role-name KDFLambdaBasicRole --assume-role-policy-document file://TrustPolicyForLambda.json

After the role is created, attach the managed Lambda basic execution policy to it.

$ aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole --role-name KDFLambdaBasicRole

Lambda function

To create the Lambda function, start with the Python Kinesis Data Firehose blueprint “General Firehose Processing” and then modify it. For more information about the structure of the records and what must be returned, see Amazon Kinesis Data Firehose Data Transformation.

Zip up the Python file, and then create the Lambda function using the AWS CLI. The CreateLambdaFunctionS3CustomPrefixes.json file is included in the GitHub repository.

aws lambda create-function --zip-file "fileb://lambda_function.zip" --cli-input-json file://CreateLambdaFunctionS3CustomPrefixes.json

Step3. Delivery Stream

Next, create the Kinesis Data Firehose delivery stream. The createdeliverystream.json file is included in the GitHub repository.

 aws firehose create-delivery-stream --cli-input-json file://createdeliverystream.json

In the previous configuration, we defined a Prefix and an ErrorOutputPrefix under the “ExtendedS3DestinationConfiguration” element. We defined the same for the “S3BackupConfiguration” element. Note that when the “ProcessingConfiguration” element is set to “Disabled”, the ErrorOutputPrefix parameter of the “ExtendedS3DestinationConfiguration” element exists only for consistency. It otherwise has no significance.

We’ve chosen a prefix that will result in a folder structure compatible with hive-style partitioning. This is the prefix we used:

“fhbase/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/”

Kinesis Data Firehose first creates a base folder called “fhbase” directly under the Amazon S3 bucket. Second, it evaluates the expressions !{timestamp:YYYY}, !{timestamp:MM}, !{timestamp:dd}, and !{timestamp:HH} to year, month, day and hour using the Java DateTimeFormatter format. For example, an ApproximateArrivalTimestamp of 1549754078390 in UNIX epoch time, which is 2019-02-09T16:13:01.000000Z in UTC would evaluate to “year=2019”, “month=02”, “day=09” and “hour=16”.  Therefore, the location in Amazon S3 where data records that are delivered evaluate to “fhbase/year=2019/month=02/day=09/hour=16/”.

Similarly, the ErrorOutputPrefix “fherroroutputbase/!{firehose:random-string}/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/” results in a base folder called “fherroroutputbase” directly under the S3 bucket. The expression !{firehose:random-string} evaluates to an 11 character random string like “ztWxkdg3Thg”.  If you use this more than once in the same expression, every instance evaluates to a new random string. The expression !{firehose:error-output-type} evaluates to one of the following:

  1. “processing-failed” for Lambda transformation delivery failures
  2. “elasticsearch-failed” for an Amazon ES destination delivery failures
  3. “splunk-failed” for Splunk destination delivery failures
  4. “format-conversion-failed” for data format conversion failures

So, the location for an Amazon S3 object containing the delivery failed records for a Lambda transformation could evaluate to: fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/09/.

You can run aws firehose describe-delivery-stream --delivery-stream-name KDFS3customPrefixesExample to describe the delivery stream created.

Next, enable encryption-at-rest for the delivery stream:

aws firehose start-delivery-stream-encryption --delivery-stream-name KDFS3customPrefixesExample

Or Create the delivery stream using the AWS Console

  1. Choose the source. For this example, I use Direct PUT.
  2. Choose if you would like to transform the incoming records with a Lambda transformation. I chose Enabled, and chose the name of the Lambda function that I had created earlier.

  1. Choose the destination. I chose the Amazon S3 destination.

  1. Choose the Amazon S3 bucket. I chose the Amazon S3 bucket that I had created earlier in this exercise.

  1. Specify the Amazon S3 Prefix and the Amazon S3 error prefix. This corresponds to the “Prefix” and “ErrorOutputPrefix” explained earlier in the context of the AWS CLI input JSON.

  1. Choose whether you would like to back up the raw (before transformation) records to another Amazon S3 location. I chose Enabled and specified the same bucket (you could choose a different bucket). I also specified a different prefix from the transformed records – the base folder is different but the folder structure below that is the same. This would make it more efficient to crawl this location using an AWS Glue crawler or create external tables in Athena or Redshift Spectrum pointing to this location.

  1. Specify the buffering hints for the Amazon S3 destination. I chose 1 MB and 240 seconds.
  2. Choose the S3 Compression and encryption settings. I chose no compression for the transformed records’ location. I chose to encrypt the Amazon S3 location at rest by using the service-managed AWS KMS customer master key (CMK).
  3. Choose whether you want to enable Error Logging in Cloudwatch. I chose Enabled.
  4. Specify the IAM role that you want Kinesis Data Firehose to assume to access resources on your behalf. Choose either Create new or Choose to display a new screen. Choose Create a new IAM role, name the role, and then choose Allow.
  5. Choose Create Delivery Stream.

The delivery stream is now created and active. You can send events to it.

 Test with sample data

I used Python code to generate sample data. The structure of the generated data is as follows:

{'sector': 'HEALTHCARE', 'price': 194.07, 'ticker_symbol': 'UFG', u'EventTime': '2019-02-12T07:10:52.649000Z', 'change': 20.56}
{'sector': 'HEALTHCARE', 'price': 124.01, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:53.745000Z', 'change': 3.32}
{'sector': 'MANUFACTURING', 'price': 26.95, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:54.864000Z', 'change': 24.53}

Sample code to generate data and push it into Kinesis Data Firehose is included in the GitHub repository.

After you start sending events to the Kinesis Data Firehose delivery stream, objects should start appearing under the specified prefixes in Amazon S3.

I wanted to illustrate Lambda invoke errors and the appearance of files in the ErrorOutputPrefix location for Lambda transform errors. Therefore, I did not give permissions to the “firehose_delivery_role” to invoke my Lambda function. The following file showed up in the location specified by the ErrorOutputPrefix.

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/FxvO2Tf9MQP/processing-failed/2019/02/12/

2019-02-12 16:57:24     260166 KDFS3customPrefixesExample-1-2019-02-12-16-53-20-5262db81-0f3a-48bf-8fc6-2249124923ff

Here is a snippet of the contents of the error file that I previously mentioned.

{"attemptsMade":4,"arrivalTimestamp":1549990400391,"errorCode":"Lambda.InvokeAccessDenied","errorMessage":"Access was denied. Ensure that the access policy allows access to the Lambda function.","attemptEndingTimestamp":1549990478018,"rawData":"eyJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE4Ny45NCwgInRpY2tlcl9zeW1ib2wiOiAiVUZHIiwgIkV2ZW50VGltZSI6ICIyMDE5LTAyLTEyVDE2OjUzOjE5Ljk5MzAwMFoiLCAiY2hhbmdlIjogOS4yNn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

After I gave the “firehose_delivery_role” the appropriate permissions, the data objects showed up in the “Prefix” location specified for the Amazon S3 destination.

aws s3 ls s3://kdfs3customprefixesexample/fhbase/year=2019/month=02/day=12/hour=17/

2019-02-12 17:17:26    1392167 KDFS3customPrefixesExample-1-2019-02-12-17-14-51-fc63e8f6-7421-491d-8417-c5002fca1722

2019-02-12 17:18:39    1391946 KDFS3customPrefixesExample-1-2019-02-12-17-16-43-e080a18a-3e1e-45ad-8f1a-98c7887f5430

Also, because the Lambda code in my Lambda transform set the status failed for 10 percent of the records, those showed up in the ErrorOutputPrefix location for Lambda transform errors.

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/12/

2019-02-12 17:25:54     180092 KDFS3customPrefixesExample-1-2019-02-12-17-21-53-3bbfe7c0-f505-47d0-b880-797ce9035f73

Here is a snippet of the content of the error file:

{"attemptsMade":1,"arrivalTimestamp":1549992113419,"errorCode":"Lambda.ProcessingFailedStatus","errorMessage":"ProcessingFailed status set for record","attemptEndingTimestamp":1549992138424,"rawData":"eyJ0aWNrZXJfc3ltYm9sIjogIlFYWiIsICJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE3LjUyLCAiY2hhbmdlIjogMTcuNTUsICJFdmVudFRpbWUiOiAiMjAxOS0wMi0xMlQxNzoyMTo1My4zOTY2NDdaIn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

You’re now ready to create an AWS Glue crawler. For more information about using the AWS Glue Data Catalog, see Populating the AWS Glue Data Catalog.

  1. In the AWS Glue console, go to Crawlers, and choose Add Crawler.

  1. Add information about your crawler, then choose Next.
  2. In the Include Path, specify the Amazon S3 bucket name that you entered under the Amazon S3 destination. Also include the static prefix used when you created the Kinesis Data Firehose delivery stream. Do not include the custom prefix expression.
  3. Choose Next.

  1. Choose Next, No, Next.
  2. Specify the IAM role that AWS Glue would use. I chose to create a new IAM Role. Choose Next.
  3. Specify a schedule to run the crawler. I chose to Run it on Demand. Choose Next.
  4. Specify where the crawler adds the crawled and discovered tables. I chose the default database. Choose Next.

  1. Choose Finish.
  1. The crawler has been created and is ready to be run. Choose Run crawler.

  1. In the AWS Glue console, go to Tables. You can see that a table has been created with the name of the base folder. Choose fhbase.

The crawler has discovered and populated the table and its properties.

You can see the discovered schema. The crawler has identified and created the partitions based on the folder structure specified by the prefix expression.

Open the Amazon Athena console, and select the default database from the drop-down menu. Write the following query in the New query1 window, then choose Run query.

SELECT * FROM "default"."fhbase"

where year = '2019' and day = '12' and hour = '17'

order by approxarrtimestamputcfh desc

Notice that Amazon Athena recognizes the fhbase table as a partitioned table. The query can take advantage of the partitions in the query to filter the results.

Conclusion

As this post illustrates, Custom Prefixes for Amazon S3 objects provides much flexibility to customize the folder structure, where Kinesis Data Firehose delivers the data records and failure records in Amazon S3. Having control over the folder structure and naming in Amazon S3 simplifies data discovery, cataloging, and access. As a result, it helps get insight more expediently and helps you better manage the cost of your queries.

 


About the Author

Rajeev Chakrabarti is a Kinesis specialist solutions architect.

 

 

 

 

Trimming AWS WAF logs with Amazon Kinesis Firehose transformations

Post Syndicated from Tino Tran original https://aws.amazon.com/blogs/security/trimming-aws-waf-logs-with-amazon-kinesis-firehose-transformations/

In an earlier post, Enabling serverless security analytics using AWS WAF full logs, Amazon Athena, and Amazon QuickSight, published on March 28, 2019, the authors showed you how to stream WAF logs with Amazon Kinesis Firehose for visualization using QuickSight. This approach used no filtering of the logs so that you could visualize the full data set. However, you are often only interested in seeing specific events. Or you might be looking to minimize log size to save storage costs. In this post, I show you how to apply rules in Amazon Kinesis Firehose to trim down logs. You can then apply the same visualizations you used in the previous solution.

AWS WAF is a web application firewall that supports full logging of all the web requests it inspects. For each request, AWS WAF logs the raw HTTP/S headers along with information on which AWS WAF rules were triggered. Having complete logs is useful for compliance, auditing, forensics, and troubleshooting custom and Managed Rules for AWS WAF. However, for some use cases, you might not want to log all of the requests inspected by AWS WAF. For example, to reduce the volume of logs, you might only want to log the requests blocked by AWS WAF, or you might want to remove certain HTTP header or query string parameter values from your logs. In many cases, unblocked requests are often already stored in your CloudFront access logs or web server logs and, therefore, using AWS WAF logs can result in redundant data for these requests, while logging blocked traffic can help you to identify bad actors or root cause false positives.

In this post, I’ll show you how to create an Amazon Kinesis Data Firehose stream to filter out unneeded records, so that you only retain log records for requests that were blocked by AWS WAF. From here, the logs can be stored in Amazon S3 or directed to SIEM (Security information and event management) and log analysis tools.

To simplify things, I’ll provide you with a CloudFormation template that will create the resources highlighted in the diagram below:
 

Figure 1: Solution architecture

Figure 1: Solution architecture

  1. A Kinesis Data Firehose delivery stream is used to receive log records from AWS WAF.
  2. An IAM role for the Kinesis Data Firehose delivery stream, with permissions needed to invoke Lambda and write to S3.
  3. A Lambda function used to filter out WAF records matching the default action before the records are written to S3.
  4. An IAM role for the Lambda function, with the permissions needed to create CloudWatch logs (for troubleshooting).
  5. An S3 bucket where the WAF logs will be stored.

Prerequisites and assumptions

  • In this post, I assume that the AWS WAF default action is configured to allow requests that don’t explicitly match a blocking WAF rule. So I’ll show you how to omit any records matching the WAF default action.
  • You need to already have a AWS WAF WebACL created. In this example, you’ll use a WebACL generated from the AWS WAF OWASP 10 template. For more information on deploying AWS WAF to a CloudFront or ALB resource, see the Getting Started page.

Step 1: Create a Kinesis Data Firehose delivery stream for AWS WAF logs

In this step, you’ll use the following CloudFormation template to create a Kinesis Data Firehose delivery stream that writes logs to an S3 bucket. The template also creates a Lambda function that omits AWS WAF records matching the default action.

Here’s how to launch the template:

  1. Open CloudFormation in the AWS console.
  2. For WAF deployments on Amazon CloudFront, select region US-EAST-1. Otherwise, create the stack in the same region in which your AWS WAF Web ACL is deployed.
  3. Select the Create Stack button.
  4. In the CloudFormation wizard, select Specify an Amazon S3 template URL and copy and paste the following URL into the text box, then select Next:
    https://s3.amazonaws.com/awsiammedia/public/sample/TrimAWSWAFLogs/KinesisWAFDeliveryStream.yml
  5. On the options page, leave the default values and select Next.
  6. Specify the following and then select Next:
    1. Stack name: (for example, kinesis-waf-logging). Make sure to note your stack name, as you’ll need to provide it later in the walkthrough.
    2. Buffer size: This value specifies the size in MB for which Kinesis will buffer incoming records before processing.
    3. Buffer interval: This value specifies the interval in seconds for which Kinesis will buffer incoming records before processing.

    Note: Kinesis will trigger data delivery based on which buffer condition is satisfied first. This CloudFormation sets the default buffer size to 3MB and interval size to 900 seconds to match the maximum transformation buffer size and intervals which is set by this template. To learn more about Kinesis Data Firehose buffer conditions, read this documentation.

     

    Figure 2: Specify the stack name, buffer size, and buffer interval

    Figure 2: Specify the stack name, buffer size, and buffer interval

  7. Select the check box for I acknowledge that AWS CloudFormation might create IAM resources and choose Create.
  8. Wait for the template to finish creating the resources. This will take a few minutes. On the CloudFormation dashboard, the status next to your stack should say CREATE_COMPLETE.
  9. From the AWS Management Console, open Amazon Kinesis and find the Data Firehose delivery stream on the dashboard. Note that the name of the stream will start with aws-waf-logs- and end with the name of the CloudFormation. This prefix is required in order to configure AWS WAF to write logs to the Kinesis stream.
  10. From the AWS Management Console, open AWS Lambda and view the Lambda function created from the CloudFormation template. The function name should start with the Stack name from the CloudFormation template. I included the function code generated from the CloudFormation template below so you can see what’s going on.

    Note: Through CloudFormation, the code is deployed without indentation. To format it for readability, I recommend using the code formatter built into Lambda under the edit tab. This code can easily be modified for custom record filtering or transformations.

    
        'use strict';
    
        exports.handler = (event, context, callback) => {
            /* Process the list of records and drop those containing Default_Action */
            const output = event.records.map((record) => {
                const entry = (new Buffer(record.data, 'base64')).toString('utf8');
                if (!entry.match(/Default_Action/g)){
                    return {
                        recordId: record.recordId,
                        result: 'Ok',
                        data: record.data,
                    };
                } else {
                    return {
                        recordId: record.recordId,
                        result: 'Dropped',
                        data: record.data,
                    };
                }
            });
        
            console.log(`Processing completed.  Successful records ${output.length}.`);
            callback(null, { records: output });
        };"        
        

You now have a Kinesis Data Firehose stream that AWS WAF can use for logging records.

Cost Considerations

This template sets the Kinesis transformation buffer size to 3MB and buffer interval to 900 seconds (the maximum values) in order to reduce the number of Lambda invocations used to process records. On average, an AWS WAF record is approximately 1-1.5KB. With a buffer size of 3MB, Kinesis will use 1 Lambda invocation per 2000-3000 records. Visit the AWS Lambda website to learn more about pricing.

Step 2: Configure AWS WAF Logging

Now that you have an active Amazon Kinesis Firehose delivery stream, you can configure your AWS WAF WebACL to turn on logging.

  1. From the AWS Management Console, open WAF & Shield.
  2. Select the WebACL for which you would like to enable logging.
  3. Select the Logging tab.
  4. Select the Enable Logging button.
  5. Next to Amazon Kinesis Data Firehose, select the stream that was created from the CloudFormation template in Step 1 (for example, aws-waf-logs-kinesis-waf-stream) and select Create.

Congratulations! Your AWS WAF WebACL is now configured to send records of requests inspected by AWS WAF to Kinesis Data Firehose. From there, records that match the default action will be dropped, and the remaining records will be stored in S3 in JSON format.

Below is a sample of the logs generated from this example. Notice that there are only blocked records in the logs.
 

Figure 3: Sample logs

Figure 3: Sample logs

Conclusion

In this blog, I’ve provided you with a CloudFormation template to generate a Kinesis Data Firehose stream that can be used to log requests blocked by AWS WAF, omitting requests matching the default action. By omitting the default action, I have reduced the number of log records that must be reviewed to identify bad actors, tune new WAF rules, and/or root cause false positives. For unblocked traffic, consider using CloudFront’s access logs with Amazon Athena or CloudWatch Logs Insights to query and analyze the data. To learn more about AWS WAF logs, read our developer guide for AWS WAF.

If you have feedback about this blog post, , please submit them in the Comments section below. If you have issues with AWS WAF, start a thread on the AWS SSO forum or contact AWS Support.

Want more AWS Security how-to content, news, and feature announcements? Follow us on Twitter.

Author

Tino Tran

Tino is a Senior Edge Specialized Solutions Architect based out of Florida. His main focus is to help companies deliver online content in a secure, reliable, and fast way using AWS Edge Services. He is a experienced technologist with a background in software engineering, content delivery networks, and security.

Enabling serverless security analytics using AWS WAF full logs, Amazon Athena, and Amazon QuickSight

Post Syndicated from Umesh Ramesh original https://aws.amazon.com/blogs/security/enabling-serverless-security-analytics-using-aws-waf-full-logs/

Traditionally, analyzing data logs required you to extract, transform, and load your data before using a number of data warehouse and business intelligence tools to derive business intelligence from that data—on top of maintaining the servers that ran behind these tools.

This blog post will show you how to analyze AWS Web Application Firewall (AWS WAF) logs and quickly build multiple dashboards, without booting up any servers. With the new AWS WAF full logs feature, you can now log all traffic inspected by AWS WAF into Amazon Simple Storage Service (Amazon S3) buckets by configuring Amazon Kinesis Data Firehose. In this walkthrough, you’ll create an Amazon Kinesis Data Firehose delivery stream to which AWS WAF full logs can be sent, and you’ll enable AWS WAF logging for a specific web ACL. Then you’ll set up an AWS Glue crawler job and an Amazon Athena table. Finally, you’ll set up Amazon QuickSight dashboards to help you visualize your web application security. You can use these same steps to build additional visualizations to draw insights from AWS WAF rules and the web traffic traversing the AWS WAF layer. Security and operations teams can monitor these dashboards directly, without needing to depend on other teams to analyze the logs.

The following architecture diagram highlights the AWS services used in the solution:

Figure 1: Architecture diagram

Figure 1: Architecture diagram

AWS WAF is a web application firewall that lets you monitor HTTP and HTTPS requests that are forwarded to an Amazon API Gateway API, to Amazon CloudFront or to an Application Load Balancer. AWS WAF also lets you control access to your content. Based on conditions that you specify—such as the IP addresses from which requests originate, or the values of query strings—API Gateway, CloudFront, or the Application Load Balancer responds to requests either with the requested content or with an HTTP 403 status code (Forbidden). You can also configure CloudFront to return a custom error page when a request is blocked.

Amazon Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk. With Kinesis Data Firehose, you don’t need to write applications or manage resources. You configure your data producers to send data to Kinesis Data Firehose, and it automatically delivers the data to the destination that you specified. You can also configure Kinesis Data Firehose to transform your data before delivering it.

AWS Glue can be used to run serverless queries against your Amazon S3 data lake. AWS Glue can catalog your S3 data, making it available for querying with Amazon Athena and Amazon Redshift Spectrum. With crawlers, your metadata stays in sync with the underlying data (more details about crawlers later in this post). Amazon Athena and Amazon Redshift Spectrum can directly query your Amazon S3 data lake by using the AWS Glue Data Catalog. With AWS Glue, you access and analyze data through one unified interface without loading it into multiple data silos.

Amazon Athena is an interactive query service that makes it easy to analyze data directly in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Amazon QuickSight is a business analytics service you can use to build visualizations, perform one-off analysis, and get business insights from your data. It can automatically discover AWS data sources and also works with your data sources. Amazon QuickSight enables organizations to scale to hundreds of thousands of users and delivers responsive performance by using a robust in-memory engine called SPICE.

SPICE stands for Super-fast, Parallel, In-memory Calculation Engine. SPICE supports rich calculations to help you derive insights from your analysis without worrying about provisioning or managing infrastructure. Data in SPICE is persisted until it is explicitly deleted by the user. SPICE also automatically replicates data for high availability and enables Amazon QuickSight to scale to hundreds of thousands of users who can all simultaneously perform fast interactive analysis across a wide variety of AWS data sources.

Step one: Set up a new Amazon Kinesis Data Firehose delivery stream

  1. In the AWS Management Console, open the Amazon Kinesis Data Firehose service and choose the button to create a new stream.
    1. In the Delivery stream name field, enter a name for your new stream that starts with aws-waf-logs- as shown in the screenshot below. AWS WAF filters all streams starting with the keyword aws-waf-logs when it displays the delivery streams. Note the name of your stream since you’ll need it again later in the walkthrough.
    2. For Source, choose Direct PUT, since AWS WAF logs will be the source in this walkthrough.

      Figure 2: Select the delivery stream name and source

      Figure 2: Select the delivery stream name and source

  2. Next, you have the option to enable AWS Lambda if you need to transform your data before transferring it to your destination. (You can learn more about data transformation in the Amazon Kinesis Data Firehose documentation.) In this walkthrough, there are no transformations that need to be performed, so for Record transformation, choose Disabled.
    Figure 3: Select "Disabled" for record transformations

    Figure 3: Select “Disabled” for record transformations

    1. You’ll have the option to convert the JSON object to Apache Parquet or Apache ORC format for better query performance. In this example, you’ll be reading the AWS WAF logs in JSON format, so for Record format conversion, choose Disabled.

      Figure 4: Choose "Disabled" to not convert the JSON object

      Figure 4: Choose “Disabled” to not convert the JSON object

  3. On the Select destination screen, for Destination, choose Amazon S3.
    Figure 5: Choose the destination

    Figure 5: Choose the destination

    1. For the S3 destination, you can either enter the name of an existing S3 bucket or create a new S3 bucket. Note the name of the S3 bucket since you’ll need the bucket name in a later step in this walkthrough.
    2. For Source record S3 backup, choose Disabled, because the destination in this walkthrough is an S3 bucket.

      Figure 6: Enter the S3 bucket name, and select "Disabled" for the Source record S3 backup

      Figure 6: Enter the S3 bucket name, and select “Disabled” for the source record S3 backup

  4. On the next screen, leave the default conditions for Buffer size, Buffer interval, S3 compression and S3 encryption as they are. However, we recommend that you set Error logging to Enabled initially, for troubleshooting purposes.
    1. For IAM role, select Create new or choose. This opens up a new window that will prompt you to create firehose_delivery_role, as shown in the following screenshot. Choose Allow in this window to accept the role creation. This grants the Kinesis Data Firehose service access to the S3 bucket.

      Figure 7: Select "Create new or choose" for IAM Role

      Figure 7: Select “Allow” to create the IAM role “firehose_delivery_role”

  5. On the last step of configuration, review all the options you’ve chosen, and then select Create delivery stream. This will cause the delivery stream to display as “Creating” under Status. In a couple of minutes, the status will change to “Active,” as shown in the below screenshot.

    Figure 8: Review the options you selected

    Figure 8: Review the options you selected

Step two: Enable AWS WAF logging for a specific Web ACL

  1. From the AWS Management Console, open the AWS WAF service and choose Web ACLs. Open your Web ACL resource, which can either be deployed on a CloudFront distribution or on an Application Load Balancer.
    1. Choose the Web ACL for which you want to enable logging. (In the below screenshot, we’ve selected a Web ACL in the US East Region.)
    2. On the Logging tab, choose Enable Logging.

      Figure 9: Choose "Enable Logging"

      Figure 9: Choose “Enable Logging”

  2. The next page displays all the delivery streams that start with aws-waf-logs. Choose the Amazon Kinesis Data Firehose delivery stream that you created for AWS WAF logs at the start of this walkthrough. (In the screenshot below, our example stream name is “aws-waf-logs-us-east-1)
    1. You can also choose to redact certain fields that you wish to exclude from being captured in the logs. In this walkthrough, you don’t need to choose any fields to redact.
    2. Select Create.

      Figure 10: Choose your delivery stream, and select "Create"

      Figure 10: Choose your delivery stream, and select “Create”

After a couple of minutes, you’ll be able to inspect the S3 bucket that you defined in the Kinesis Data Firehose delivery stream. The log files are created in directories by year, month, day, and hour.

Step three: Set up an AWS Glue crawler job and Amazon Athena table

The purpose of a crawler within your Data Catalog is to traverse your data stores (such as S3) and extract the metadata fields of the files. The output of the crawler consists of one or more metadata tables that are defined in your Data Catalog. When the crawler runs, the first classifier in your list to successfully recognize your data store is used to create a schema for your table. AWS Glue provides built-in classifiers to infer schemas from common files with formats that include JSON, CSV, and Apache Avro.

  1. In the AWS Management Console, open the AWS Glue service and choose Crawler to setup a crawler job.
  2. Choose Add crawler to launch a wizard to setup the crawler job. For Crawler name, enter a relevant name. Then select Next.

    Figure 11: Enter "Crawler name," and select "Next"

    Figure 11: Enter “Crawler name,” and select “Next”

  3. For Choose a data store, select S3 and include the path of the S3 bucket that stores your AWS WAF logs, which you made note of in step 1.3. Then choose Next.

    Figure 12: Choose a data store

    Figure 12: Choose a data store

  4. When you’re given the option to add another data store, choose No.
  5. Then, choose Create an IAM role and enter a name. The role grants access to the S3 bucket for the AWS Glue service to access the log files.

    Figure 13: Choose "Create an IAM role," and enter a name

    Figure 13: Choose “Create an IAM role,” and enter a name

  6. Next, set the frequency to Run on demand. You can also schedule the crawler to run periodically to make sure any changes in the file structure are reflected in your data catalog.

    Figure 14: Set the "Frequency" to "Run on demand"

    Figure 14: Set the “Frequency” to “Run on demand”

  7. For output, choose the database in which the Athena table is to be created and add a prefix to identify your table name easily. Select Next.

    Figure 15: Choose the database, and enter a prefix

    Figure 15: Choose the database, and enter a prefix

  8. Review all the options you’ve selected for the crawler job and complete the wizard by selecting the Finish button.
  9. Now that the crawler job parameters are set up, on the left panel of the console, choose Crawlers to select your job and then choose Run crawler. The job creates an Amazon Athena table. The duration depends on the size of the log files.

    Figure 16: Choose "Run crawler" to create an Amazon Athena table

    Figure 16: Choose “Run crawler” to create an Amazon Athena table

  10. To see the Amazon Athena table created by the AWS Glue crawler job, from the AWS Management Console, open the Amazon Athena service. You can filter by your table name prefix.
      1. To view the data, choose Preview table. This displays the table data with certain fields showing data in JSON object structure.
    Figure 17: Choose "Preview table" to view the data

    Figure 17: Choose “Preview table” to view the data

Step four: Create visualizations using Amazon QuickSight

  1. From the AWS Management Console, open Amazon QuickSight.
  2. In the Amazon QuickSight window, in the top left, choose New Analysis. Choose New Data set, and for the data source choose Athena. Enter an appropriate name for the data source name and choose Create data source.

    Figure 18: Enter the "Data source name," and choose "Create data source"

    Figure 18: Enter the “Data source name,” and choose “Create data source”

  3. Next, choose Use custom SQL to extract all the fields in the JSON object using the following SQL query:
    
        ```
        with d as (select
        waf.timestamp,
            waf.formatversion,
            waf.webaclid,
            waf.terminatingruleid,
            waf.terminatingruletype,
            waf.action,
            waf.httpsourcename,
            waf.httpsourceid,
            waf.HTTPREQUEST.clientip as clientip,
            waf.HTTPREQUEST.country as country,
            waf.HTTPREQUEST.httpMethod as httpMethod,
            map_agg(f.name,f.value) as kv
        from sampledb.jsonwaflogs_useast1 waf,
        UNNEST(waf.httprequest.headers) as t(f)
        group by 1,2,3,4,5,6,7,8,9,10,11)
        select d.timestamp,
            d.formatversion,
            d.webaclid,
            d.terminatingruleid,
            d.terminatingruletype,
            d.action,
            d.httpsourcename,
            d.httpsourceid,
            d.clientip,
            d.country,
            d.httpMethod,
            d.kv['Host'] as host,
            d.kv['User-Agent'] as UA,
            d.kv['Accept'] as Acc,
            d.kv['Accept-Language'] as AccL,
            d.kv['Accept-Encoding'] as AccE,
            d.kv['Upgrade-Insecure-Requests'] as UIR,
            d.kv['Cookie'] as Cookie,
            d.kv['X-IMForwards'] as XIMF,
            d.kv['Referer'] as Referer
        from d;
        ```        
        

  4. To extract individual fields, copy the previous SQL query and paste it in the New custom SQL box, then choose Edit/Preview data.
    Figure 19: Paste the SQL query in "New custom SQL query"

    Figure 19: Paste the SQL query in “New custom SQL query”

    1. In the Edit/Preview data view, for Data source, choose SPICE, then choose Finish.

      Figure 20: Choose "Spice" and then "Finish"

      Figure 20: Choose “Spice” and then “Finish”

  5. Back in the Amazon Quicksight console, under the Fields section, select the drop-down menu and change the data type to Date.

    Figure 21: In the Amazon Quicksight console, change the data type to "Date"

    Figure 21: In the Amazon Quicksight console, change the data type to “Date”

  6. After you see the Date column appear, enter an appropriate name for the visualizations at the top of the page, then choose Save.

    Figure 22: Enter the name for the visualizations, and choose "Save"

    Figure 22: Enter the name for the visualizations, and choose “Save”

  7. You can now create various visualization dashboards with multiple visual types by using the drag-and-drop feature. You can drag and drop combinations of fields such as Action, Client IP, Country, Httpmethod, and User Agents. You can also add filters on Date to view dashboards for a specific timeline. Here are some sample screenshots:
    Figure 23: Visualization dashboard samples

    Figure 23a: Visualization dashboard samples

    Figure 23: Visualization dashboard samples

    Figure 23b: Visualization dashboard samples

    Figure 23: Visualization dashboard samples

    Figure 23c: Visualization dashboard samples

    Figure 23: Visualization dashboard samples

    Figure 23d: Visualization dashboard samples

Conclusion

You can enable AWS WAF logs to Amazon S3 buckets and analyze the logs while they are being streamed by configuring Amazon Kinesis Data Firehose. You can further enhance this solution by automating the streaming of data and using AWS Lambda for any data transformations based on your specific requirements. Using Amazon Athena and Amazon QuickSight makes it easy to analyze logs and build visualizations and dashboards for executive leadership teams. Using these solutions, you can go serverless and let AWS do the heavy lifting for you.

Author photo

Umesh Kumar Ramesh

Umesh is a Cloud Infrastructure Architect with Amazon Web Services. He delivers proof-of-concept projects, topical workshops, and lead implementation projects to various AWS customers. He holds a Bachelor’s degree in Computer Science & Engineering from National Institute of Technology, Jamshedpur (India). Outside of work, Umesh enjoys watching documentaries, biking, and practicing meditation.

Author photo

Muralidhar Ramarao

Muralidhar is a Data Engineer with the Amazon Payment Products Machine Learning Team. He has a Bachelor’s degree in Industrial and Production Engineering from the National Institute of Engineering, Mysore, India. Outside of work, he loves to hike. You will find him with his camera or snapping pictures with his phone, and always looking for his next travel destination.

Our data lake story: How Woot.com built a serverless data lake on AWS

Post Syndicated from Karthik Kumar Odapally original https://aws.amazon.com/blogs/big-data/our-data-lake-story-how-woot-com-built-a-serverless-data-lake-on-aws/

In this post, we talk about designing a cloud-native data warehouse as a replacement for our legacy data warehouse built on a relational database.

At the beginning of the design process, the simplest solution appeared to be a straightforward lift-and-shift migration from one relational database to another. However, we decided to step back and focus first on what we really needed out of a data warehouse. We started looking at how we could decouple our legacy Oracle database into smaller microservices, using the right tool for the right job. Our process wasn’t just about using the AWS tools. More, it was about having a mind shift to use cloud-native technologies to get us to our final state.

This migration required developing new extract, transform, load (ETL) pipelines to get new data flowing in while also migrating existing data. Because of this migration, we were able to deprecate multiple servers and move to a fully serverless data warehouse orchestrated by AWS Glue.

In this blog post, we are going to show you:

  • Why we chose a serverless data lake for our data warehouse.
  • An architectural diagram of Woot’s systems.
  • An overview of the migration project.
  • Our migration results.

Architectural and design concerns

Here are some of the design points that we considered:

  • Customer experience. We always start with what our customer needs, and then work backwards from there. Our data warehouse is used across the business by people with varying level of technical expertise. We focused on the ability for different types of users to gain insights into their operations and to provide better feedback mechanisms to improve the overall customer experience.
  • Minimal infrastructure maintenance. The “Woot data warehouse team” is really just one person—Chaya! Because of this, it’s important for us to focus on AWS services that enable us to use cloud-native technologies. These remove the undifferentiated heavy lifting of managing infrastructure as demand changes and technologies evolve.
  • Responsiveness to data source changes. Our data warehouse gets data from a range of internal services. In our existing data warehouse, any updates to those services required manual updates to ETL jobs and tables. The response times for these data sources are critical to our key stakeholders. This requires us to take a data-driven approach to selecting a high-performance architecture.
  • Separation from production systems. Access to our production systems is tightly coupled. To allow multiple users, we needed to decouple it from our production systems and minimize the complexities of navigating resources in multiple VPCs.

Based on these requirements, we decided to change the data warehouse both operationally and architecturally. From an operational standpoint, we designed a new shared responsibility model for data ingestion. Architecturally, we chose a serverless model over a traditional relational database. These two decisions ended up driving every design and implementation decision that we made in our migration.

As we moved to a shared responsibility model, several important points came up. First, our new way of data ingestion was a major cultural shift for Woot’s technical organization. In the past, data ingestion had been exclusively the responsibility of the data warehouse team and required customized pipelines to pull data from services. We decided to shift to “push, not pull”: Services should send data to the data warehouse.

This is where shared responsibility came in. For the first time, our development teams had ownership over their services’ data in the data warehouse. However, we didn’t want our developers to have to become mini data engineers. Instead, we had to give them an easy way to push data that fit with the existing skill set of a developer. The data also needed to be accessible by the range of technologies used by our website.

These considerations led us to select the following AWS services for our serverless data warehouse:

The following diagram shows at a high level how we use these services.

Tradeoffs

These components together met all of our requirements and enabled our shared responsibility model. However, we made few tradeoffs compared to a lift-and-shift migration to another relational database:

  • The biggest tradeoff was upfront effort vs. ongoing maintenance. We effectively had to start from scratch with all of our data pipelines and introduce a new technology into all of our website services, which required a concerted effort across multiple teams. Minimal ongoing maintenance was a core requirement. We were willing to make this tradeoff to take advantage of the managed infrastructure of the serverless components that we use.
  • Another tradeoff was balancing usability for nontechnical users vs. taking advantage of big data technologies. Making customer experience a core requirement helped us navigate the decision-making when considering these tradeoffs. Ultimately, only switching to another relational database would mean that our customers would have the same experience, not a better one.

Building data pipelines with Kinesis Data Firehose and Lambda

Because our site already runs on AWS, using an AWS SDK to send data to Kinesis Data Firehose was an easy sell to developers. Things like the following were considerations:

  • Direct PUT ingestion for Kinesis Data Firehose is natural for developers to implement, works in all languages used across our services, and delivers data to Amazon S3.
  • Using S3 for data storage means that we automatically get high availability, scalability, and durability. And because S3 is a global resource, it enables us to manage the data warehouse in a separate AWS account and avoid the complexity of navigating multiple VPCs.

We also consume data stored in Amazon DynamoDB tables. Kinesis Data Firehose again provided the core of the solution, this time combined with DynamoDB Streams and Lambda. For each DynamoDB table, we enabled DynamoDB Streams and then used the stream to trigger a Lambda function.

The Lambda function cleans the DynamoDB stream output and writes the cleaned JSON to Kinesis Data Firehose using boto3. After doing this, it converges with the other process and outputs the data to S3. For more information, see How to Stream Data from Amazon DynamoDB to Amazon Aurora using AWS Lambda and Amazon Kinesis Firehose on the AWS Database Blog.

Lambda gave us more fine-grained control and enabled us to move files between accounts:

  • We enabled S3 event notifications on the S3 bucket and created an Amazon SNS topic to receive notifications whenever Kinesis Data Firehose put an object in the bucket.
  • The SNS topic triggered a Lambda function, which took the Kinesis output and moved it to the data warehouse account in our chosen partition structure.

S3 event notifications can trigger Lambda functions, but we chose SNS as an intermediary because the S3 bucket and Lambda function were in separate accounts.

Migrating existing data with AWS DMS and AWS Glue

We needed to migrate data from our existing RDS database to S3, which we accomplished with AWS DMS. DMS natively supports S3 as a target, as described in the DMS documentation.

Setting this up was relatively straightforward. We exported data directly from our production VPC to the separate data warehouse account by tweaking the connection attributes in DMS. The string that we used was this:

"cannedAclForObjects=BUCKET_OWNER_FULL_CONTROL;compressionType=GZIP;addColumnName=true;”

This code gives ownership to the bucket owner (the destination data warehouse account), compresses the files to save on storage costs, and includes all column names. After the data was in S3, we used an AWS Glue crawler to infer the schemas of all exported tables and then compared against the source data.

With AWS Glue, some of the challenges we overcame were these:

  • Unstructured text data, such as forum and blog posts. DMS exports these to CSV. This approach conflicted with the commas present in the text data. We opted to use AWS Glue to export data from RDS to S3 in Parquet format, which is unaffected by commas because it encodes columns directly.
  • Cross-account exports. We resolved this by including the code

"glueContext._jsc.hadoopConfiguration().set("fs.s3.canned.acl", "BucketOwnerFullControl”)”

at the top of each AWS Glue job to grant bucket owner access to all S3 files produced by AWS Glue.

Overall, AWS DMS was quicker to set up and great for exporting large amounts of data with rule-based transformations. AWS Glue required more upfront effort to set up jobs, but provided better results for cases where we needed more control over the output.

If you’re looking to convert existing raw data (CSV or JSON) into Parquet, you can set up an AWS Glue job to do that. The process is described in the AWS Big Data Blog post Build a data lake foundation with AWS Glue and Amazon S3.

Bringing it all together with AWS Glue, Amazon Athena, and Amazon QuickSight

After data landed in S3, it was time for the real fun to start: actually working with the data! Can you tell I’m a data engineer? For me, a big part of the fun was exploring AWS Glue:

  • AWS Glue handles our ETL job scheduling.
  • AWS Glue crawlers manage the metadata in the AWS Glue Data Catalog.

Crawlers are the “secret sauce” that enables us to be responsive to schema changes. Throughout the pipeline, we chose to make each step as schema-agnostic as possible, which allows any schema changes to flow through until they reach AWS Glue.

However, raw data is not ideal for most of our business users, because it often has duplicates or incorrect data types. Most importantly, the data out of Firehose is in JSON format, but we quickly observed significant query performance gains from using Parquet format. Here, we used one of the performance tips in the Big Data Blog post Top 10 performance tuning tips for Amazon Athena.

With our shared responsibility model, the data warehouse and BI teams are responsible for the final processing of data into curated datasets ready for reporting. Using Lambda and AWS Glue enables these teams to work in Python and SQL (the core languages for Amazon data engineering and BI roles). It also enables them to deploy code with minimal infrastructure setup or maintenance.

Our ETL process is as follows:

  • Scheduled triggers.
  • Series of conditional triggers that control the flow of subsequent jobs that depend on previous jobs.
  • A similar pattern across many jobs of reading in the raw data, deduplicating the data, and then writing to Parquet. We centralized this logic by creating a Python library of functions and uploading it to S3. We then included that library in the AWS Glue job as an additional Python library. For more information on how to do this, see Using Python Libraries with AWS Glue in the AWS Glue documentation.

We also migrated complex jobs used to create reporting tables with business metrics:

  • The AWS Glue use of PySpark simplified the migration of these queries, because you can embed SparkSQL queries directly in the job.
  • Converting to SparkSQL took some trial and error, but ultimately required less work than translating SQL queries into Spark methods. However, for people on our BI team who had previously worked with Pandas or Spark, working with Spark dataframes was a natural transition. As someone who used SQL for several years before learning Python, I appreciate that PySpark lets me quickly switch back and forth between SQL and an object-oriented framework.

Another hidden benefit of using AWS Glue jobs is that the AWS Glue version of Python (like Lambda) already has boto3 installed. Thus, ETL jobs can directly use AWS API operations without additional configuration.

For example, some of our longer-running jobs created read inconsistency if a user happened to query that table while AWS Glue was writing data to S3. We modified the AWS Glue jobs to write to a temporary directory with Spark and then used boto3 to move the files into place. Doing this reduced read inconsistency by up to 90 percent. It was great to have this functionality readily available, which may not have been the case if we managed our own Spark cluster.

Comparing previous state and current state

After we had all the datasets in place, it was time for our customers to come on board and start querying. This is where we really leveled up the customer experience.

Previously, users had to download a SQL client, request a user name and password, set it up, and learn SQL to get data out. Now, users just sign in to the AWS Management Console through automatically provisioned IAM roles and run queries in their browser with Athena. Or if they want to skip SQL altogether, they can use our Amazon QuickSight account with accounts managed through our pre-existing Active Directory server.

Integration with Active Directory was a big win for us. We wanted to enable users to get up and running without having to wait for an account to be created or managing separate credentials. We already use Active Directory across the company for access to multiple resources. Upgrading to Amazon QuickSight Enterprise Edition enabled us to manage access with our existing AD groups and credentials.

Migration results

Our legacy data warehouse was developed over the course of five years. We recreated it as a serverless data lake using AWS Glue in about three months.

In the end, it took more upfront effort than simply migrating to another relational database. We also dealt with more uncertainty because we used many products that were relatively new to us (especially AWS Glue).

However, in the months since the migration was completed, we’ve gotten great feedback from data warehouse users about the new tools. Our users have been amazed by these things:

  • How fast Athena is.
  • How intuitive and beautiful Amazon QuickSight is. They love that no setup is required—it’s easy enough that even our CEO has started using it!
  • That Athena plus the AWS Glue Data Catalog have given us the performance gains of a true big data platform, but for end users it retains the look and feel of a relational database.

Summary

From an operational perspective, the investment has already started to pay off. Literally: Our operating costs have fallen by almost 90 percent.

Personally, I was thrilled that recently I was able to take a three-week vacation and didn’t get paged once, thanks to the serverless infrastructure. And for our BI engineers in addition to myself, the S3-centric architecture is enabling us to experiment with new technologies by integrating seamlessly with other services, such as Amazon EMR, Amazon SageMaker, Amazon Redshift Spectrum, and Lambda. It’s been exciting to see how these services have grown in the time since we’ve adopted them (for example, the recent AWS Glue launch of Amazon CloudWatch metrics and Athena’s launch of views).

We are thrilled that we’ve invested in technologies that continue to grow as we do. We are incredibly proud of our team for accomplishing this ambitious migration. We hope our experience can inspire other engineers to dive in to building a data lake of their own.

For additional information, see these similar AWS Big Data blog posts:


About the authors

Chaya Carey is a data engineer at Woot.com. At Woot, she’s responsible for managing the data warehouse and other scalable data solutions. Outside of work, she’s passionate about Seattle’s bar and restaurant scene, books, and video games.

 

 

 

Karthik Odapally is a senior solutions architect at AWS. His passion is to build cost-effective and highly scalable solutions on the cloud. In his spare time, he bakes cookies and cupcakes for family and friends here in the PNW. He loves vintage racing cars.