Tag Archives: Amazon Kinesis Data Firehose

Ingest streaming data into Amazon Elasticsearch Service within the privacy of your VPC with Amazon Kinesis Data Firehose

Post Syndicated from Tarik Makota original https://aws.amazon.com/blogs/big-data/ingest-streaming-data-into-amazon-elasticsearch-service-within-the-privacy-of-your-vpc-with-amazon-kinesis-data-firehose/

Today we are adding a new Amazon Kinesis Data Firehose feature to set up VPC delivery to your Amazon Elasticsearch Service domain from the Kinesis Data Firehose. If you have been managing a custom application on Amazon Kinesis Data Streams to keep traffic private, you can now use Kinesis Data Firehose and load your data into an Amazon Elasticsearch Service endpoint in a VPC without having to invest, operate, and scale ingestion and delivery infrastructure. You can start using this new feature from Kinesis Data Firehose console, AWS CLI, and API by selecting Amazon Elasticsearch Service as the destination, the specific domain with VPC access, and setting the VPC configuration with subnets and the optional security groups.

Before this feature

Amazon Elasticsearch Service domains can have public or private endpoints. Public endpoints are backed by IP addresses on the public internet. Private endpoints are backed by IP addresses within the IP space of your VPC.

If you have been using an Amazon Elasticsearch Service VPC endpoint, you most likely use Kinesis Data Streams or similar soultion to ingest streaming data. This means running a custom application on the stream that delivers it to the Amazon Elasticsearch Service VPC domain. You likely had to perform the following actions:

  • Implement buffering
  • Format conversions
  • Perform compression
  • Apply transformation
  • Manage backup
  • Handle transient delivery failures

Additionally, you have to build, scale, monitor, update, and maintain this custom application.

Kinesis Data Firehose delivery to Amazon Elasticsearch Service VPC endpoint

Kinesis Data Firehose can now deliver data into an Amazon Elasticsearch Service VPC endpoint. This provides a secure and easy way to ingest, transform, and deliver streaming data. You don’t need to worry about managing your data ingestion and delivery infrastructure. With this new feature, Kinesis Data Firehose enables additional secure communication to Amazon Elasticsearch Service VPC endpoints. Amazon Elasticsearch Service endpoints that live within a VPC give you an extra layer of security.

How it works

When you create a Kinesis Data Firehose delivery stream that delivers data to an Amazon Elasticsearch Service VPC endpoint, Kinesis Data Firehose creates an Elastic Network Interface (ENI) in each subnet you select. If you only use one Availability Zone, Kinesis Data Firehose places an endpoint into only one subnet. Similarly, when you create an Amazon Elasticsearch Service VPC endpoint, it creates endpoints in the subnets you chose. Kinesis Data Firehose uses ENI to deliver the data to your Amazon Elasticsearch Service ENI, all inside your VPC. The following screenshot outlines the resulting architecture with a single subnet.

For this walkthrough, you have two security groups:

  • kdf-sec-grp for your Kinesis Data Firehose endpoint
  • es-sec-grp for your Amazon Elasticsearch Service endpoint

To let Kinesis Data Firehose access your Amazon Elasticsearch Service VPC endpoint, security group es-sec-grp needs to allow the ENI that Kinesis Data Firehose created to make HTTPS calls. Kinesis Data Firehose scales the ENIs automatically to meet the throughput requirements. As Kinesis Data Firehose scales ENIs, the outbound rules of the enclosing security group kdf-sec-grp control the data stream. You should configure the Amazon Elasticsearch Service security group (es-sec-grp) to allow HTTPS traffic from the Kinesis Data Firehose security group (kdf-sec-grp). The Kinesis Data Firehose security group needs to allow outbound HTTPS traffic, and its destination is the Amazon Elasticsearch Service security group. With Kinesis Data Firehose VPC delivery, you do not need to make the Firehose security group open to outside traffic.

You can also use the same security group for Kinesis Data Firehose and Amazon Elasticsearch Service endpoints. If you use the same security group for both, make sure the security group inbound rule allows HTTPS traffic.

For your existing delivery streams, you can change the destination endpoint. The new destination must be accessible within the same VPC, subnets, and security groups. Changing either of the VPC, subnets, and security groups requires you to recreate a delivery stream.

All existing Kinesis Data Firehose limits apply to this capability. For example, you can increase the default 50 delivery streams per account by submitting a quota increase request. Also, Kinesis Data Firehose creates one or more ENIs per VPC destination subnet per delivery stream. Kinesis Data Firehose automatically scales the number of ENIs as needed based on the actual throughput. The default throughput limit per delivery stream is 5 MB/second (dependent on Region). You can request an increase to this limit by submitting a support case.

You need to make sure you have enough ENIs available. By default, VPC has a quota of 5000 ENIs per Region. For more information, see Amazon VPC Quotas.

The advantage of using a managed service like Kinesis Data Firehose is that you can focus on the value of your data and not the underlying plumbing. You can configure the frequency of data delivery from your delivery stream to your Amazon Elasticsearch Service domain. Kinesis Data Firehose buffers incoming data before delivering it to Amazon ES. You can configure the values for Amazon Elasticsearch Service buffer size (1 MB–100 MB) or buffer interval (60–900 seconds), and the condition satisfied first triggers data delivery to Amazon Elasticsearch Service. In case data delivery fails for an Amazon Elasticsearch Service destination, you can specify a retry duration between 0 and 7,200 seconds when you create the delivery stream. If data delivery to your Amazon Elasticsearch Service endpoint fails, Kinesis Data Firehose retries data delivery for the specified time duration. After the retrial period, Kinesis Data Firehose skips the current batch of data and moves on to the next batch. Skipped documents go to your Amazon S3 bucket elasticsearch_failed folder, which you can use for manual backfill.

For more information about sizing, see Get started with Amazon Elasticsearch Service: T-shirt-size your domain.

Solution overview

To show you how to use this new feature, this post uses stock demo data available on the Kinesis Data Firehose console to deliver to an Amazon Elasticsearch Service endpoint in VPC. The following diagram illustrates the workflow.

This use case simulates a producer sending stock ticker data to the delivery stream (A). You use an AWS Lambda function (B) to add a timestamp to the stock records so that you can create Kibana visualization. Kinesis Data Firehose streams the stock records to the Amazon Elasticsearch Service endpoint (C) in your VPC. Finally, you can visualize the data using Kibana (D).

This post uses the Amazon Management Console to implement this solution, but you can also use AWS CLI.

Creating security groups

Start by creating two security groups: one for the Amazon Elasticsearch Service VPC endpoint (es-sec-grp) and another for the delivery stream (kdf-sec-grp). Create security groups without any rules first. After you have created them, set the inbound and outbound rules. The following table summarizes these rules.

Creating an Amazon Elasticsearch Service VPC endpoint

To create an Amazon Elasticsearch Service endpoint in VPC, complete the following steps:

  1. On the Amazon Elasticsearch Service console, choose Create a new Domain.
  2. For Deployment Type and Latest Version, choose Development and Testing.
  3. Choose Next.
  4. Give your Amazon Elasticsearch Service endpoint a name.
  5. Select your instance type.

This post uses m5.xlarge.elasticsearch. For production environments, select the appropriately sized instance type. For this post, leave the number of nodes at 1, though best practice is to set it to 2.

  1. Set EBS storage size per node to 100 GiB.
  2. Leave the rest of the settings at their defaults and choose Next.
  3. Select the VPC and private subnet for your Amazon Elasticsearch Service endpoint and the security group for Amazon Elasticsearch Service that you created previously (es-sec-grp).
  4. To access Kibana, choose fine-grained access.
  5. Choose Create Master User.

In this post, we are using internal user database enabled with HTTP basic authentication. For production environments, use IAM roles and configure the appropriate fine-grained access. For more information, see Fine-Grained Access Control in Amazon Elasticsearch Service.

  1. Choose Allow open access to Domain.

Security groups already enforce IP-based access policies. This step opens access to your Amazon Elasticsearch Service endpoint to resources in your VPC, and your Amazon Elasticsearch Service endpoint is not accessible to the internet. For an additional layer of security in your Amazon Elasticsearch Service endpoint, use access policies that specify IAM users or roles. For more information about controlling access to your domains, see Identity and Access Management in Amazon Elasticsearch Service.

  1. Choose Next.
  2. Review your settings and choose Confirm.

The following screenshot shows an example of what your Amazon Elasticsearch Service endpoint VPC settings should look like.

Creating a Lambda function for record transformation

Create a Lambda function to add a timestamp to the data feed. Complete the following steps:

  1. On the Lambda console, choose Create Function.
  2. Choose Author from scratch.
  3. Name your function; for example, tmakAddTSToStream.
  4. Choose Python 3.7 as your runtime.
  5. Choose Create.

The following code is for your Lambda function (under the basic settings section, change the timeout from 3 sec to 45 sec):

import base64
import json
from datetime import datetime

def lambda_handler(event, context):
    
    send_back = []
    now = datetime.utcnow().isoformat()

    for record in event['records']:
        stock_rec = json.loads(base64.b64decode(record['data']))
        stock_rec["timestamp"] = now
       
        record_w_ts = {
                'recordId': record['recordId'],
                'result': 'Ok',
                'data': base64.b64encode(json.dumps(stock_rec).encode('utf-8') + b'\n').decode('utf-8')
        }
        send_back.append(record_w_ts)

    return {'records': send_back} 

Creating a Kinesis Data Firehose delivery stream

To create your delivery stream, complete the following steps:

  1. On the Kinesis Data Firehose console, under Data Firehose, choose Create Delivery Stream.
  2. Enter a name for your stream; for example, tmak-kdf-stock-delivery-stream.
  3. For source, choose Direct PUT or other sources.
  4. Choose Next.
  5. For Data transformation, choose Enabled.
  6. Choose the Lambda function you created.
  7. Choose Next.
  8. Choose Amazon Elasticsearch Service as the destination for your delivery stream.
  9. For Index, enter stockdata.

The VPC section populates automatically. Make sure you use the security group you created for Kinesis Data Firehose (kdf-sec-grp).

  1. For Backup Mode, choose Failed records only.

You can select an existing S3 bucket or create a new one. The following screenshot shows an example of your delivery stream settings.

  1. Choose Next.
  2. Review the buffering settings and set any tags to identify your stream.

A delivery stream that delivers to VPC destinations needs permissions to manage ENIs, list VPCs, and subnets. The console gives you the option to create a new role based on a template that includes all the needed permissions. You can also use an existing role if you already created one.

  1. Choose Next.
  2. Review the settings and choose Create Stream.

It may take up to a few minutes to see the stream status show as Active. See the following screenshot.

On the Amazon EC2 console, under Network and Security, you can see the endpoints created in your VPC by Kinesis Data Firehose and Amazon ES. See the following screenshot.

Configuring Kibana fine-grained access for Kinesis Data Firehose

You need to give Kinesis Data Firehose permissions to deliver stock data to your Amazon Elasticsearch Service endpoint. You can accomplish this via the Kibana console or API. For more information, see API on the Open Distro for Elasticsearch website.

For more information about controlling access to your Amazon Elasticsearch Service endpoint, see How to Control Access to Your Amazon Elasticsearch Service Domain.

Because your Amazon Elasticsearch Service endpoint is in the VPC to access Kibana, you must first connect to the VPC. This process varies by network configuration, but likely involves connecting to a VPN or corporate network. For this post, create a remote desktop EC2 instance public subnet of your VPC. The newly created security group (rdp-sec-grp) protects the instance. You can modify the es-sec-grp security group and allow inbound RDP traffic from rdp-sec-grp so you can access the Kibana URL. The following diagram illustrates this architecture.

Kinesis Data Firehose uses the delivery role to sign HTTP (Signature Version 4) requests before sending the data to the Amazon Elasticsearch Service endpoint. You manage Amazon Elasticsearch Service fine-grained access control permissions using roles, users, and mappings. This section describes how to create roles and set permissions for Kinesis Data Firehose.

The roles you create in this section are different from IAM roles. For more information, see Key Concepts.

Complete the following steps:

  1. Navigate to Kibana (you can find the URL on the Amazon Elasticsearch Service console).
  2. Enter the master user and password that you set up when you created the Amazon Elasticsearch Service endpoint.
  3. Under Security, choose Roles.
  4. Choose Add New Role.
  5. Name your role; for example, firehose-role.
  6. For cluster permissions, add cluster_composite_ops and cluster_monitor.
  7. Under Index permissions, choose Index Patterns and enter stockdata*.
  8. Under Permissions, add three action groups: crud, create_index, and manage.
  9. Choose Save Role Definition.

In the next step, you map the IAM role that Kinesis Data Firehose uses to the role you just created.

  1. Under Security, choose Role Mappings.
  2. Choose the role you just created (firehose-role).
  3. For Backend Roles, choose Add Backend Role.
  4. Enter the IAM ARN of the role Kinesis Data Firehose uses: arn:aws:iam::123456789012:role/firehose_stream_role_name.

You can find your delivery stream ARN on the Kinesis Data Firehose console.

Streaming stock data through Kinesis Data Firehose

To stream your stock data, complete the following steps:

  1. On the Kinesis Data Firehose console, choose the stream you created.
  2. Choose Test with demo data.
  3. Choose Start sending demo data.

If everything is working, you see message Demo data is being sent to your delivery stream. Wait a few minutes before you choose Stop sending demo data.

Analyzing and visualizing data

To analyze and visualize your data, complete the following steps:

  1. On the Kibana console, choose Management.
  2. Choose Index patterns.
  3. For Index pattern, enter stockdata*.
  4. Choose Next.
  5. For the Time filter field, choose timestamp.
  6. Choose Visualize.
  7. Create a new visualization and choose Line.
  8. For Index pattern, choose stockdata*.
  9. For Y-Axis, choose Aggregation=Average and Field=price.
  10. For X-Axis, choose Aggregation=Data Histogram, Field=timestamp, and Interval=seconds.
  11. Under X-Axis, choose Add Sub-buckets.
  12. Choose Split Series.
  13. Set Sub-Aggregation=Terms and Field=ticker_symbol.keyword.
  14. Choose Apply Changes.

The following screenshot shows an example visualization.

You can see the raw data by choosing Discover on the Kibana dashboard. See the following screenshot.

Summary

This post demonstrated how you can move an Amazon Elasticsearch Service endpoint inside your VPC with Kinesis Data Firehose. Additionally, you do not need to enable and secure public access to your Amazon Elasticsearch Service endpoint. If you have been reluctant to expose your Amazon Elasticsearch Service endpoint to the internet but want to stream data, you can now do so with Kinesis Data Firehose.

 


About the Authors

Tarik Makota is a Principal Solutions Architect with the Amazon Web Services. He provides technical guidance, design advice and thought leadership to AWS’ customers across US Northeast. He holds an M.S. in Software Development and Management from Rochester Institute of Technology.

 

Serverless Stream-Based Processing for Real-Time Insights

Post Syndicated from Justin Pirtle original https://aws.amazon.com/blogs/architecture/serverless-stream-based-processing-for-real-time-insights/

Building on our previous posts regarding messaging patterns and queue-based processing, we now explore stream-based processing and how it helps you achieve low-latency, near real-time data processing in your applications. AWS offers two managed services for streaming, Amazon Kinesis and Amazon Managed Streaming for Apache Kafka (Amazon MSK).

What is streaming data?

At AWS, we define streaming data as data that is emitted at high volume in a continuous, incremental manner with the goal of low-latency processing. Whereas traditional batch-oriented business intelligence would offer insights in retrospect after months, days, or hours have passed, stream-based processing can offer actionable insights in real time. Stream-based processing is commonly used to respond to clickstream events, rapidly ingest various types of logs, and extract, transform, and load (ETL) data in real-time into data lakes and data warehouses.

Amazon Kinesis is the AWS service that makes it easy to collect, process, and analyze such real-time, streaming data with four different capabilities:

For this blog post, we focus on Kinesis Data Streams and Kinesis Data Firehose, since both of these services are foundational for streaming, ingestion, buffering, and processing in your streaming data pipeline.

Kinesis Data Streams

Amazon Kinesis Data Streams is a massively scalable service that can continuously capture gigabytes of data per second from hundreds of thousands of sources. Like many distributed systems, Kinesis Data Streams achieves this level of scalability by partitioning or sharding your data where records are simultaneously written to and read from different shards in parallel. All Kinesis Data Streams require allocation of at least one shard and you choose how many shards you want to allocate to a given stream.

When writing to a shard in a Kinesis Data Stream, each shard supports ingestion of up to 1 MB of data per second or 1,000 records written per second. When reading from a shard, each shard supports output of 2 MB of data per second. You choose an initial number of shards to allocate for your Kinesis Data Stream, then can update your shard allocation over time. Increasing your shard allocation enables your application to easily scale from thousands of records to millions of records written per second.

Producing streaming data

Streaming data producers are processes that put records onto a Kinesis stream by calling the putRecord API to write a single record or putRecords API to write multiple records in a single invocation. Common approaches for producing messages including direct use of AWS tools, including:

  • AWS SDK, which simplifies authentication and other semantics of invoking AWS service APIs
  • Amazon Kinesis Agent, which enables local file/log monitoring and rotation sending in real time
  • Amazon Kinesis Producer Library, which simplifies aggregating records into larger payloads to improve throughput.

Additionally, several AWS services natively integrate with Amazon Kinesis as a data producer:

There are also several third-party services that offer native integration as data producers, including:

Regardless of the producer service/tool of choice, all data producers put records onto a stream by providing a partition key, stream name, and the data itself, which altogether must not exceed 1 MB in size. The partition key provided is used to determine which shard the data should be written to on the stream. Amazon Kinesis Data Streams offers ordering guarantees and maintain message ordering within a given shard in a stream using sequence numbers to track the unique position of each message sent.

Consuming streaming data

Once records are written to a Kinesis Data Stream, they are buffered in their respective shards for consumption. Unlike queue-based processing, the records are buffered until the data retention period set on the stream elapses, enabling one or more consumers to replay all in the messages in the shards of the stream. If your application must deliver your records to a data lake, data warehouse, Elasticsearch Service cluster, or Splunk, Kinesis Data Firehose can natively deliver your records to the following without needing to write any custom code:

  • Amazon S3
  • Amazon Redshift
  • Amazon Elasticsearch Service
  • Splunk

You simply indicate the desired delivery destination and configuration regarding how to batch and deliver the messages. Kinesis Data Firehose can also use your configured S3 desired object naming, Amazon Redshift table name, Amazon Elasticsearch index name, and more.

For custom processing or destinations outside of the Amazon Kinesis Data Firehose supported services above, you will need to write and execute custom code to consume data from the stream. Though you can use the Kinesis Client Library (KCL) to run your own custom processing application on persistent virtual machines or container instances, AWS Lambda offers serverless computing with native event source integration with Amazon Kinesis Data Streams. AWS Lambda as a stream consumer takes care of the operational overhead of reading shards, maintaining record order, check pointing as records are processed, and parallelizing processing.

Serverless stream processing with AWS Lambda

When configured with a Kinesis Stream as its event source, AWS Lambda continuously polls every shard in your stream at no extra charge and only invokes your Lambda code if and when there are messages in the stream. It additionally scales up the number of concurrent executions to parallelize reading all shards of a stream at the same time (and can have multiple executions reading the same shard simultaneously for a higher parallelization factor, if desired). AWS Lambda automatically checkpoints which records were successfully processed and handles retries and any failures automatically according to your desired configuration.

Best of all, there is no additional cost of the Lambda service handling all of these operational needs for you. You only pay for compute time when your function is invoked and messages are available on the stream for processing. You’re able to focus on processing your data with your business logic directly in your code since your records are sent as an array to your Lambda code. There is no additional code to author/manage regarding checkpointing, shard splits/merges, or other complexities.

Conclusion

In this blog, we defined streaming data and explored the Amazon Kinesis service and its various capabilities. We then reviewed the various options available for producing and consuming real-time streaming data with Amazon Kinesis, including using AWS Lambda for serverless streaming data processing. Please refer to the following resources for further learning on AWS streaming data processing:

Build a cloud-native network performance analytics solution on AWS for wireless service providers

Post Syndicated from Angelo Sampietro original https://aws.amazon.com/blogs/big-data/build-a-cloud-native-network-performance-analytics-solution-on-aws-for-wireless-service-providers/

This post demonstrates a serverless, cloud-based approach to building a network performance analytics solution using AWS services that can provide flexibility and performance while keeping costs under control with pay-per-use AWS services.

Without good network performance, you may struggle to face the challenges of real-time and low latency services and the increase of the total bandwidth your customers consume.

Considering the large volume of data that you need to ingest, store, and process every second for optimal performance monitoring, standard on-premises monitoring approaches are no longer efficient.

A cloud-native approach allows you to invest in the solutions that generate business value and move from the typical capital expenditure model to an operational expenditure model by avoiding upfront costs and over-provisioning of infrastructure.

Data and voice network complexity for mobile service providers

According to Cisco’s global mobile data traffic forecast, there will be 13.1 billion mobile-connected devices by 2023, and 1.4 billion of those will be 5G capable.

As a mobile service provider, you must understand how to perform accurate network planning and sizing for your access and core networks.

The increase in the global demand for network throughput and the number of services such as VoLTE, IoT, and video streaming on mobile networks is forcing mobile service providers to implement new architectures to match the desired quality of service (QoS).

Addressing optimal QoS when a multitude of services are running on a converged network is not an easy task. The workflow is complex, starting from collecting counters and statistics data from a multitude of different network elements to transforming the collected data into key performance indicators (KPIs) that you can link to the quality of one of the multiple services delivered over the network.

Modern mobile networks, with deployment of 4G, 5G, and IoT services, have given rise to an increased number of cells that are deployed on the territory, so you must collect counters and generate KPIs on thousands of different network elements.

Considering that every network element can generate a few thousand counters, the network performance system has to manage millions of measurements at every collection cycle.

This is difficult to manage at scale with on-premises deployments without a high-cost solution. Instead, you can use AWS services to design a modern network performance analytics solution that covers all the requirements of different departments of telecommunications service providers (TSPs).

Data and voice network architecture

The main problem that you may face as a service provider is the complexity of modern mobile networks, which originate from several evolutions of communication standards in the last few decades (from 2G to 5G for data core, from CS to VoLTE for voice core) and the hardware and capabilities of the network elements.

The following diagram illustrates a simplified schema of a mobile wireless network element currently deployed, where you can find:

  • Access network, with the network elements needed for the coverage from 2G to 5G
  • Core Network, that includes the network elements for all the functions needed to deliver the  services, the authentication and the database of all the users on the network
  • Services delivered by the network, that includes Voice (PSTN/PLMN), Internet (data service ) and Application Services

IoT network architecture and service differentiation

The IoT traffic paradigm is completely different from the traffic patterns of smartphone and tablets. These mobile devices normally open an always on session (PDP context or bearer) and generate a considerable amount of user plane traffic. An IoT device opens a session, sends few bytes, and closes the session to limit the power consumption and avoid allocating resources on cells when not necessary.

The following diagram depicts a real-world use case in which a service provider deploys IoT services, in most cases, IoT traffic is delivered to a different 4G core for the following reasons:

  • EPC optimization needed to manage IoT traffic
  • Avoiding IoT traffic managed by the same core network of customers’ traffic for security reasons
  • IoT devices generate huge volumes of control plane traffic that can impact customer performance if managed by the same network

This usage pattern requires a different traffic model and forces mobile service providers to perform firmware upgrades to the current cells or deploy new cells into the network. This has an impact on the QoS due to the increase in the number of counters and cells.

What is network performance, and why is it important?

You can use network performance monitoring systems to generate reports and insights and track network performance. Multiple organizations like Operations, Network Planning, and Engineering use these tools to view the overall quality of networks and services and control important KPIs like availability, response times, and download and upload speeds.

Network performance is strictly connected to the QoS that, by definition, is the mechanism to control the performance, reliability, and usability of a telecommunications service.

Pain points for service providers

Every vendor of network elements usually also provides the element managers for the hardware. These systems are proprietary, but most of the time, they comply with the standard provided by 3rd Generation Partnership Project (3GPP) for the XML file format of the performance measurement (PM) files to export the counters you need to measure network performance accurately.

Service providers normally also use multiple vendors for core and access network elements, so monitoring a non-heterogenous environment is one of the main challenges in today’s deployments.

Data types and 3GPP standards

Counters and statistics about the performance on a network element are exported in PM files. 3GPP standardized the format in a specific document, which you can download from the 3GPP website.

The network vendor is not guaranteed to follow this standard, but even if there are some customizations, the structure and format in most cases remain very similar to the open standard.

The following schema from 3GPP describes the XML format for PM that is exported from network elements.

The XML schema root element is “measCollectFile” and it has three child elements:

  1. “fileHeader”: contains information such as file sender and the beginning time of the measurements.
  2. “fileFooter”: contains the end time of the measurements.
  3. “measData”: contains all the measurements information such as measurement types and their values.

The most important tags are:

  • measInfo – Contains the family of the measurements, the granularity, and the counters list for each measValue
  • measValue – Contains multiple measResults with the results of the measurements

Building a network performance solution on AWS

In this section we describe a possible architecture that can be used to implement the solution on AWS following a serverless approach.

Prerequisites

To implement this solution, you must have the following prerequisites:

  • An AWS account in the same Region
  • The AdministratorAccess policy granted to your AWS account (for production, you should restrict access as needed)

This post uses the EU (Ireland) Region. However, you can choose another Region of your choice where the following services are available:

For more information about AWS Regions and where AWS services are available, see Region Table.

The following diagram illustrates the high-level, end-to-end solution and the AWS services it uses, the workflow begins with the ingestion of the files using SFTP or Kinesis Data Firehose, data is stored in S3 and processed with a Lambda function and Glue to create a data catalog. Data querying is done with Athena and the visualization in Quicksight.

Collecting data using Kinesis Data Firehose or AWS SFTP

For information about collecting PM files via element managers, see chapter 5 of Technical Specification 3GPP TS 31.432 on the 3GPP website.

Element managers act as collectors of XML measurement files that the network elements provide. They can send the files to an S3 bucket via AWS SFTP or Kinesis Data Firehose.

For the data transfer, you can refer to the following documentation about creating a Kinesis Firehose Delivery Stream and sending data to Kinesis Firehose Delivery Stream, or If you are planning to use SFTP transfer, you can read here how AWS Transfer for SFTP works.

Transforming data using a Lambda function

This post provides a Lambda function that is associated with the ObjectCreated event type of the destination S3 bucket rawxml prefix (s3://wireless-pm/rawxml). The function runs every time a new XML file is saved in this location.

The Lambda function is written in Python 3.7. You can download it from the GitHub repository for this blog post. The function uses a layer to resolve the dependency for the xmltodict library used in the code.

The following screenshot of the AWS Management Console shows some of the main properties of the Lambda function

  1. The Lambda name and its associated Lambda Layer.
  2. The S3 bucket trigger event on the ObjectCreated event.

The function transposes the XML files, converts them into CSV or JSON (depending on the value set in the function’s output_format environment variable), and formats the files with one record per measurement (measData), measure type, and value (measInfo). The technical specification 3GPP TS 32.435 document on the 3GPP website provides three example XML files in the Annex section.

The following code is an example A.1 XML file:

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="MeasDataCollection.xsl"?>
<measCollecFile xmlns="http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec http://www.3gpp.org/ftp/specs/archive/32_series/32.435#measCollec">
    <fileHeader fileFormatVersion="32.435 V7.0" vendorName="Company NN" dnPrefix="DC=a1.companyNN.com,SubNetwork=1,IRPAgent=1">
        <fileSender localDn="SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1" elementType="RNC"/>
        <measCollec beginTime="2000-03-01T14:00:00+02:00"/>
    </fileHeader>
    <measData>
        <managedElement localDn="SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1" userLabel="RNC Telecomville"/>
        <measInfo>
            <job jobId="1231"/>
            <granPeriod duration="PT900S" endTime="2000-03-01T14:14:30+02:00"/>
            <repPeriod duration="PT1800S"/>
            <measTypes>attTCHSeizures succTCHSeizures attImmediateAssignProcs succImmediateAssignProcs</measTypes>
            <measValue measObjLdn="RncFunction=RF-1,UtranCell=Gbg-997">
                <measResults>234 345 567 789</measResults>
            </measValue>
            <measValue measObjLdn="RncFunction=RF-1,UtranCell=Gbg-998">
                <measResults>890 901 123 234</measResults>
            </measValue>
            <measValue measObjLdn="RncFunction=RF-1,UtranCell=Gbg-999">
                <measResults>456 567 678 789</measResults>
                <suspect>true</suspect>
            </measValue>
        </measInfo>
    </measData>
    <fileFooter>
        <measCollec endTime="2000-03-01T14:15:00+02:00"/>
    </fileFooter>
</measCollecFile>

The Lambda function transforms the preceding file into the following JSON format (this code example only shows the first record of the transformed and transposed dataset):

{
    "fh_file_format_version": "32.435 V7.0",
    "fh_vendor_name": "Company NN",
    "fh_dn_prefix": "DC=a1.companyNN.com,SubNetwork=1,IRPAgent=1",
    "fh_fs_local_dn": "SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1",
    "fh_fs_element_type": "RNC",
    "fh_mc_begin_time": "2000-03-01T14:00:00+02:00",
    "ff_mc_end_time": "2000-03-01T14:15:00+02:00",
    "md_me_local_dn": "SubNetwork=CountryNN,MeContext=MEC-Gbg-1,ManagedElement=RNC-Gbg-1",
    "md_me_user_label": "RNC Telecomville",
    "md_me_sw_version": "",
    "md_mi_meas_info_id": "",
    "md_mi_job_jobid": "1231",
    "md_mi_gp_duration": "PT900S",
    "md_mi_gp_end_time": "2000-03-01T14:14:30+02:00",
    "md_mi_rp_duration": "PT1800S",
    "md_mi_meas_obj_ldn": "RncFunction=RF-1,UtranCell=Gbg-997",
    "md_mi_meas_name": "attTCHSeizures",
    "md_mi_meas_value": 234,
    "md_mi_meas_p": null,
    "md_mi_meas_suspect": null
}

You can change the output field names in the get_record_header static method of the GPPXml class defined in the Lambda function. The fields md_mi_meas_name and md_mi_meas_value contain the measure name and measure value, respectively.

The transformed CSV and JSON files are saved in the raw_transform_csv and raw_transform_json prefixes, respectively, in the S3 bucket (only one format is created for each execution, depending on the value of the output_format environment variable). The following screenshot shows the S3 bucket overview on the Amazon S3 console.

For this use case, a Lambda function is triggered for the transformation task every time a new XML file arrives. Depending on the volume and size of the XML files, you can choose other compute options on AWS based on the right fit, such as in containers using AWS Batch, Amazon ECS, or Amazon EKS. For more information about configuring and running processes using Amazon ECS, see Building, deploying, and operating containerized applications with AWS Fargate.

Alternatively, you could ingest the raw XML file using a Kinesis Firehose stream with a Lambda function attached to it to transform the data to JSON and output directly to Amazon S3 in Parquet or ORC format using the record format conversion feature of Kinesis Firehose. You would need to predefine a table in the Data Catalog whose schema, serializer, and deserializer you use to convert the data.

Building a Data Catalog using AWS Glue

To create an AWS Glue table in the Data Catalog, first create an AWS Glue crawler. Complete the following steps:

  1. Select AWS Glue from the services in the AWS Console.
  2. Select Crawlers in the left side menu.
  3. Click the button “Add Crawlers”.
  4. For Crawler name, enter a name for your crawler; for example, Wireless_PM_crawler.
  5. Choose Next.
  6. For Choose a data store, choose S3.
  7. For Crawl data in, select Specified path in my account.
  8. For Include path, enter the path to the input files in CSV format.
  9. Choose Next.
  10. In the Choose an IAM role section, select Choose an existing IAM role.
  11. For IAM role, enter the name of your role to grant read permission to access the S3 bucket.
  12. Choose Next.The following code is the AWSGlueServiceRole-S3 IAM policy for this post:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject"
                ],
                "Resource": [
                    "arn:aws:s3:::npm-blog/raw_transform_csv/Parquet*",
                    "arn:aws:s3:::npm-blog/raw_transform_csv/*"
                ]
            }
        ]
    }

  13. In the Choose the crawler’s output section, for Database, choose the database in which the Data Catalog table is created; for example, wireless_pm.
  14. Choose Next.In these next steps, you create an ETL job that converts from CSV to Parquet (a columnar file format) and writes the Parquet files to an S3 bucket. Parquet file format helps with performance of the downstream consumption of the data. To convert to Parquet, you can either use an AWS Glue generated script (which this post uses) or use your own PySpark or Scala script.
  15. In the Configure the job properties section, for Name, enter wireless_pm_parquet_conversion.
  16. For IAM role, enter AWSGlueServiceRole-wirelesspm
  17. For Type, choose Spark.
  18. For Glue version, choose Spark 2.4, Python 3 (Glue Version 1.0).
  19. For This job runs, select A proposed script generated by AWS Glue.
  20. For Script file name, enter wireless_pm_parquet_conversion.
  21. In the Choose a data target section, select Create tables in your data target.
  22. For Data store, choose Amazon S3.
  23. For Format, choose Parquet.
  24. For Target path, enter the path to your S3 bucket.
  25. Choose Next.

You can configure a trigger to start the job by creating a workflow in AWS Glue that runs the job after the crawler. When the job is complete, a new set of Parquet files are stored in the destination S3 bucket.

You can analyze the processed data directly with Athena by selecting the database created in AWS Glue, and analyzing the data from the AWS Glue table.

The following screenshot shows the AWS Glue database and the two tables created.

The following screenshot shows an example Athena query, which you can run by selecting a few columns to validate the data from the Data Catalog table you created.

You can also use other AWS services for analytics such as Amazon Redshift or Amazon EMR for further processing, analysis, and KPI calculations from the data.

Visualizing data using Amazon QuickSight

Amazon QuickSight is a business analytics service you can use to build visualizations, perform ad hoc analysis, and get business insights from your data. For more information, see What Is Amazon QuickSight?

Amazon QuickSight provides out-of-the-box integration with Athena, which lets you run SQL queries on top of the metadata in your Data Catalog. For more information, see Creating a Data Set Using Amazon Athena Data.

The following screenshot shows an Amazon QuickSight analysis created from an Amazon QuickSight dataset. The dataset is based on the table that AWS Glue defined, and you query it via Athena.

You can create a visualization by loading the example dataset of PM counters. For example, the following screenshot shows the measurement values for the GPRS SuccessAttach and AbortedAttach and highlights a problem.

Summary

This post discussed the main pain points for wireless service providers and how AWS services can help you build a serverless solution that scales according to your network growth with no upfront costs.

This solution also helps you visualize and analyze data. Additionally, it provided insights that can help operations and network planning departments manage and evolve their network according to current and future standards and services.

As always, AWS welcomes feedback. Please submit comments or questions in the comments section.

 


About the Authors

Angelo Sampietro is a senior partner solutions architect at Amazon Web Services. Angelo has a strong background in cloud computing, with 20 years of experience in the Telecommunications industry, working in the United States and Europe. With 5 AWS Certifications, including Big Data and Machine Learning Specialties, he currently works as Senior Partner Solutions Architect, helping Global System Integrators to be successful in the partnership with AWS. He loves to share new ideas with colleagues and friends and propose new solutions thinking out of the box.

 

 

Francesco Marelli is a senior solutions architect at Amazon Web Services. He has lived and worked in London for 10 years, after that he has worked in Italy, Switzerland and other countries in EMEA. He is specialized in the design and implementation of Analytics, Data Management and Big Data systems, mainly for Enterprise and FSI customers. Francesco also has a strong experience in systems integration and design and implementation of web applications. He loves sharing his professional knowledge, collecting vinyl records and playing bass.

 

 

 

Streaming ETL with Apache Flink and Amazon Kinesis Data Analytics

Post Syndicated from Steffen Hausmann original https://aws.amazon.com/blogs/big-data/streaming-etl-with-apache-flink-and-amazon-kinesis-data-analytics/

Most businesses generate data continuously in real time and at ever-increasing volumes. Data is generated as users play mobile games, load balancers log requests, customers shop on your website, and temperature changes on IoT sensors. You can capitalize on time-sensitive events, improve customer experiences, increase efficiency, and drive innovation by analyzing this data quickly. The speed at which you get these insights often depends on how quickly you can load data into data lakes, data stores, and other analytics tools. As data volume and velocity increases, it becomes more important to not only load the incoming data, but also to transform and analyze it in near-real time.

This post looks at how to use Apache Flink as a basis for sophisticated streaming extract-transform-load (ETL) pipelines. Apache Flink is a framework and distributed processing engine for processing data streams. AWS provides a fully managed service for Apache Flink through Amazon Kinesis Data Analytics, which enables you to build and run sophisticated streaming applications quickly, easily, and with low operational overhead.

This post discusses the concepts that are required to implement powerful and flexible streaming ETL pipelines with Apache Flink and Kinesis Data Analytics. It also looks at code examples for different sources and sinks. For more information, see the GitHub repo. The repo also contains an AWS CloudFormation template so you can get started in minutes and explore the example streaming ETL pipeline.

Architecture for streaming ETL with Apache Flink

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. It supports a wide range of highly customizable connectors, including connectors for Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, and Amazon Simple Storage Service (Amazon S3). Moreover, Apache Flink provides a powerful API to transform, aggregate, and enrich events, and supports exactly-once semantics. Apache Flink is therefore a good foundation for the core of your streaming architecture.

To deploy and run the streaming ETL pipeline, the architecture relies on Kinesis Data Analytics. Kinesis Data Analytics enables you to run Flink applications in a fully managed environment. The service provisions and manages the required infrastructure, scales the Flink application in response to changing traffic patterns, and automatically recovers from infrastructure and application failures. You can combine the expressive Flink API for processing streaming data with the advantages of a managed service by using Kinesis Data Analytics to deploy and run Flink applications. It allows you to build robust streaming ETL pipelines and reduces the operational overhead of provisioning and operating infrastructure.

The architecture in this post takes advantage of several capabilities that you can achieve when you run Apache Flink with Kinesis Data Analytics. Specifically, the architecture supports the following:

  • Private network connectivity – Connect to resources in your Amazon Virtual Private Cloud (Amazon VPC), in your data center with a VPN connection, or in a remote region with a VPC peering connection
  • Multiple sources and sinks – Read and write data from Kinesis data streams, Apache Kafka clusters, and Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters
  • Data partitioning – Determine the partitioning of data that is ingested into Amazon S3 based on information extracted from the event payload
  • Multiple Elasticsearch indexes and custom document IDs – Fan out from a single input stream to different Elasticsearch indexes and explicitly control the document ID
  • Exactly-once semantics – Avoid duplicates when ingesting and delivering data between Apache Kafka, Amazon S3, and Amazon Elasticsearch Service (Amazon ES)

The following diagram illustrates this architecture.

The remainder of this post discusses how to implement streaming ETL architectures with Apache Flink and Kinesis Data Analytics. The architecture persists streaming data from one or multiple sources to different destinations and is extensible to your needs. This post does not cover additional filtering, enrichment, and aggregation transformations, although that is a natural extension for practical applications.

This post shows how to build, deploy, and operate the Flink application with Kinesis Data Analytics, without further focusing on these operational aspects. It is only relevant to know that you can create a Kinesis Data Analytics application by uploading the compiled Flink application jar file to Amazon S3 and specifying some additional configuration options with the service. You can then execute the Kinesis Data Analytics application in a fully managed environment. For more information, see Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications and the Amazon Kinesis Data Analytics developer guide.

Exploring a streaming ETL pipeline in your AWS account

Before you consider the implementation details and operational aspects, you should get a first impression of the streaming ETL pipeline in action. To create the required resources, deploy the following AWS CloudFormation template:

The template creates a Kinesis data stream and an Amazon Elastic Compute Cloud (Amazon EC2) instance to replay a historic data set into the data stream. This post uses data based on the public dataset obtained from the New York City Taxi and Limousine Commission. Each event describes a taxi trip made in New York City and includes timestamps for the start and end of a trip, information on the boroughs the trip started and ended in, and various details on the fare of the trip. A Kinesis Data Analytics application then reads the events and persists them to Amazon S3 in Parquet format and partitioned by event time.

Connect to the instance by following the link next to ConnectToInstance in the output section of the CloudFromation template that you executed previously. You can then start replaying a set of taxi trips into the data stream with the following code:

$ java -jar /tmp/amazon-kinesis-replay-*.jar -noWatermark -objectPrefix artifacts/kinesis-analytics-taxi-consumer/taxi-trips-partitioned.json.lz4/dropoff_year=2018/ -speedup 3600 -streamName <Kinesis stream name>

You can obtain this command with the correct parameters from the output section of the AWS CloudFormation template. The output section also points you to the S3 bucket to which events are persisted and an Amazon CloudWatch dashboard that lets you monitor the pipeline.

For more information about enabling the remaining combinations of sources and sinks, for example, Apache Kafka and Elasticsearch, see the GitHub repo.

Building a streaming ETL pipeline with Apache Flink

Now that you have seen the pipeline in action, you can dive into the technical aspects of how to implement the functionality with Apache Flink and Kinesis Data Analytics.

Reading and writing to private resources

Kinesis Data Analytics applications can access resources on the public internet and resources in a private subnet that is part of your VPC. By default, a Kinesis Data Analytics application only enables access to resources that you can reach over the public internet. This works well for resources that provide a public endpoint, for example, Kinesis data streams or Amazon Elasticsearch Service.

If your resources are private to your VPC, either for technical or security-related reasons, you can configure VPC connectivity for your Kinesis Data Analytics application. For example, MSK clusters are private; you cannot access them from the public internet. You may run your own Apache Kafka cluster on premises that is not exposed to the public internet and is only accessible from your VPC through a VPN connection. The same is true for other resources that are private to your VPC, such as relational databases or AWS PrivateLink-powered endpoints.

To enable VPC connectivity, configure the Kinesis Data Analytics application to connect to private subnets in your VPC. Kinesis Data Analytics creates elastic network interfaces in one or more of the subnets provided in your VPC configuration for the application, depending on the parallelism of the application. For more information, see Configuring Kinesis Data Analytics for Java Applications to access Resources in an Amazon VPC.

The following screenshot shows an example configuration of a Kinesis Data Analytics application with VPC connectivity:

The application can then access resources that have network connectivity from the configured subnets. This includes resources that are not directly contained in these subnets, which you can reach over a VPN connection or through VPC peering. This configuration also supports endpoints that are available over the public internet if you have a NAT gateway configured for the respective subnets. For more information, see Internet and Service Access for a VPC-Connected Kinesis Data Analytics for Java application.

Configuring Kinesis and Kafka sources

Apache Flink supports various data sources, including Kinesis Data Streams and Apache Kafka. For more information, see Streaming Connectors on the Apache Flink website.

To connect to a Kinesis data stream, first configure the Region and a credentials provider. As a general best practice, choose AUTO as the credentials provider. The application will then use temporary credentials from the role of the respective Kinesis Data Analytics application to read events from the specified data stream. This avoids baking static credentials into the application. In this context, it is also reasonable to increase the time between two read operations from the data stream. When you increase the default of 200 milliseconds to 1 second, the latency increases slightly, but it facilitates multiple consumers reading from the same data stream. See the following code:

Properties properties = new Properties();
properties.setProperty(AWSConfigConstants.AWS_REGION, "<Region name>");
properties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
properties.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");

This config is passed to the FlinkKinesisConsumer with the stream name and a DeserializationSchema. This post uses the TripEventSchema for deserialization, which specifies how to deserialize a byte array that represents a Kinesis record into a TripEvent object. See the following code:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<TripEvent> events = env.addSource(
  new FlinkKinesisConsumer<>("<Kinesis stream name>", new TripEventSchema(), properties)
);

For more information, see TripEventSchema.java and TripEvent.java on GitHub. Apache Flink provides other more generic serializers that can deserialize data into strings or JSON objects.

Apache Flink is not limited to reading from Kinesis data streams. If you configure the Kinesis Data Analytics application’s VPC settings correctly, Apache Flink can also read events from Apache Kafka and MSK clusters. Specify a comma-separated list of broker and port pairs to use for the initial connection to your cluster. This config is passed to the FlinkKafkaConsumer with the topic name and a DeserializationSchema to create a source that reads from the respective topic of the Apache Kafka cluster. See the following code:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "<comma separated list of broker and port pairs>");

DataStream<TripEvent> events = env.addSource(
  new FlinkKafkaConsumer<>("<topic name>", new TripEventSchema(), properties)
);

The resulting DataStream contains TripEvent objects that have been deserialized from the data ingested into the data stream and Kafka topic, respectively. You can then use the data streams in combination with a sink to persist the events into their respective destination.

Persisting data in Amazon S3 with data partitioning

When you persist streaming data to Amazon S3, you may want to partition the data. You can substantially improve query performance of analytic tools by partitioning data because partitions that cannot contribute to a query can be pruned and therefore do not need to be read. For example, the right partitioning strategy can improve Amazon Athena query performance and cost by reducing the amount of data read for a query. You should choose to partition your data by the same attributes used in your application logic and query patterns. Furthermore, it is common when processing streaming data to include the time of an event, or event time, in your partitioning strategy. This contrasts with using the ingestion time or some other service-side timestamp that does not reflect the time an event occurred as accurately as event time.

For more information about taking data partitioned by ingestion time and repartitioning it by event time with Athena, see Analyze your Amazon CloudFront access logs at scale. However, you can directly partition the incoming data based on event time with Apache Flink by using the payload of events to determine the partitioning, which avoids an additional post-processing step. This capability is called data partitioning and is not limited to partition by time.

You can realize data partitioning with Apache Flink’s StreamingFileSink and BucketAssigner. For more information, see Streaming File Sink on the Apache Flink website.

When given a specific event, the BucketAssigner determines the corresponding partition prefix in the form of a string. See the following code:

public class TripEventBucketAssigner implements BucketAssigner<TripEvent, String> {
  public String getBucketId(TripEvent event, Context context) {
    return String.format("pickup_location=%03d/year=%04d/month=%02d",
        event.getPickupLocationId(),
        event.getPickupDatetime().getYear(),
        event.getPickupDatetime().getMonthOfYear()
    );
  }

  ...
}

The sink takes an argument for the S3 bucket as a destination path and a function that converts the TripEvent Java objects into a string. See the following code:

SinkFunction<TripEvent> sink = StreamingFileSink
  .forRowFormat(
    new Path("s3://<Bucket name>"),
    (Encoder<TripEvent>) (element, outputStream) -> {
      PrintStream out = new PrintStream(outputStream);
      out.println(TripEventSchema.toJson(element));
    }
  )
  .withBucketAssigner(new TripEventBucketAssigner())
  .withRollingPolicy(DefaultRollingPolicy.create().build())
  .build();

events.keyBy(TripEvent::getPickupLocationId).addSink(sink);

You can further customize the size of the objects you write to Amazon S3 and the frequency of the object creation with a rolling policy. You can configure your policy to have more events aggregated into fewer objects at the cost of increased latency, or vice versa. This can help avoid many small objects on Amazon S3, which can be a desirable trade-off for increased latency. A high number of objects can negatively impact the query performance of consumers reading the data from Amazon S3. For more information, see DefaultRollingPolicy on the Apache Flink website.

The number of output files that arrive in your S3 bucket per your rolling policy also depends on the parallelism of the StreamingFileSink and how you distribute events between Flink application operators. In the previous example, the Flink internal DataStream is partitioned by pickup location ID with the keyBy operator. The location ID is also used in the BucketAssigner as part of the prefix for objects that are written to Amazon S3. Therefore, the same node aggregates and persists all events with the same prefix, which results in particularly large objects on Amazon S3.

Apache Flink uses multipart uploads under the hood when writing to Amazon S3 with the StreamingFileSink. In case of failures, Apache Flink may not be able to clean up incomplete multipart uploads. To avoid unnecessary storage fees, set up the automatic cleanup of incomplete multipart uploads by configuring appropriate lifecycle rules on the S3 bucket. For more information, see Important Considerations for S3 on the Apache Flink website and Example 8: Lifecycle Configuration to Abort Multipart Uploads.

Converting output to Apache Parquet

In addition to partitioning data before delivery to Amazon S3, you may want to compress it with a columnar storage format. Apache Parquet is a popular columnar format, which is well supported in the AWS ecosystem. It reduces the storage footprint and can substantially increase query performance and reduce cost.

The StreamingFileSink supports Apache Parquet and other bulk-encoded formats through a built-in BulkWriter factory. See the following code:

SinkFunction<TripEvent> sink = StreamingFileSink
  .forBulkFormat(
    new Path("s3://<bucket name>"),
    ParquetAvroWriters.forSpecificRecord(TripEvent.class)
  )
  .withBucketAssigner(new TripEventBucketAssigner())
  .build();

events.keyBy(TripEvent::getPickupLocationId).addSink(sink);

For more information, see Bulk-encoded Formats on the Apache Flink website.

Persisting events works a bit differently when you use the Parquet conversion. When you enable Parquet conversion, you can only configure the StreamingFileSink with the OnCheckpointRollingPolicy, which commits completed part files to Amazon S3 only when a checkpoint is triggered. You need to enable Apache Flink checkpoints in your Kinesis Data Analytics application to persist data to Amazon S3. It only becomes visible for consumers when a checkpoint is triggered, so your delivery latency depends on how often your application is checkpointing.

Moreover, you previously just needed to generate a string representation of the data to write to Amazon S3. In contrast, the ParquetAvroWriters expects an Apache Avro schema for events. For more information, see the GitHub repo. You can use and extend the schema on the repo if you are want an example.

In general, it is highly desirable to convert data into Parquet if you want to work with and query the persisted data effectively. Although it requires some additional effort, the benefits of the conversion outweigh these additional complexities compared to storing raw data.

Fanning out to multiple Elasticsearch indexes and custom document IDs

Amazon ES is a fully managed service that makes it easy for you to deploy, secure, and run Elasticsearch clusters. A popular use case is to stream application and network log data into Amazon S3. These logs are documents in Elasticsearch parlance, and you can create one for every event and store it in an Elasticsearch index.

The Elasticsearch sink that Apache Flink provides is flexible and extensible. You can specify an index based on the payload of each event. This is useful when the stream contains different event types and you want to store the respective documents in different Elasticsearch indexes. With this capability, you can use a single sink and application, respectively, to write into multiple indexes. With newer Elasticsearch versions, a single index cannot contain multiple types. See the following code:

SinkFunction<TripEvent> sink = AmazonElasticsearchSink.buildElasticsearchSink(
  "<Elasticsearch endpoint>",
  "<AWS region>",
  new ElasticsearchSinkFunction<TripEvent>() {
   public IndexRequest createIndexRequest(TripEvent element) {
    String type = element.getType().toString();
    String tripId = Long.toString(element.getTripId());

    return Requests.indexRequest()
      .index(type)
      .type(type)
      .id(tripId)
      .source(TripEventSchema.toJson(element), XContentType.JSON);
   }
);

events.addSink(sink);

You can also explicitly set the document ID when you send documents to Elasticsearch. If an event with the same ID is ingested into Elasticsearch multiple times, it is overwritten rather than creating duplicates. This enables your writes to Elasticsearch to be idempotent. In this way, you can obtain exactly-once semantics of the entire architecture, even if your data sources only provide at-least-once semantics.

The AmazonElasticsearchSink used above is an extension of the Elasticsearch sink that is comes with Apache Flink. The sink adds support to sign requests with IAM credentials so you can use the strong IAM-based authentication and authorization that is available from the service. To this end, the sink picks up temporary credentials from the Kinesis Data Analytics environment in which the application is running. It uses the Signature Version 4 method to add authentication information to the request that is sent to the Elasticsearch endpoint.

Leveraging exactly-once semantics

You can obtain exactly-once semantics by combining an idempotent sink with at-least-once semantics, but that is not always feasible. For instance, if you want to replicate data from one Apache Kafka cluster to another or persist transactional CDC data from Apache Kafka to Amazon S3, you may not be able to tolerate duplicates in the destination, but both of these sinks are not idempotent.

Apache Flink natively supports exactly-once semantics. Kinesis Data Analytics implicitly enables exactly-once mode for checkpoints. To obtain end-to-end exactly-once semantics, you need to enable checkpoints for the Kinesis Data Analytics application and choose a connector that supports exactly-once semantics, such as the StreamingFileSink. For more information, see Fault Tolerance Guarantees of Data Sources and Sinks on the Apache Flink website.

There are some side effects to using exactly-once semantics. For example, end-to-end latency increases for several reasons. First, you can only commit the output when a checkpoint is triggered. This is the same as the latency increases that occurred when you turned on Parquet conversion. The default checkpoint interval is 1 minute, which you can decrease. However, obtaining sub-second delivery latencies are difficult with this approach.

Also, the details of end-to-end exactly-once semantics are subtle. Although the Flink application may read in an exactly-once fashion from a data stream, duplicates may already be part of the stream, so you can only obtain at-least-once semantics of the entire application. For Apache Kafka as a source and sink, different caveats apply. For more information, see Caveats on the Apache Flink website.

Be sure that you understand all the details of the entire application stack before you take a hard dependency on exactly-once semantics. In general, if your application can tolerate at-least-once semantics, it’s a good idea to use that semantic instead of relying on stronger semantics that you don’t need.

Using multiple sources and sinks

One Flink application can read data from multiple sources and persist data to multiple destinations. This is interesting for several reasons. First, you can persist the data or different subsets of the data to different destinations. For example, you can use the same application to replicate all events from your on-premises Apache Kafka cluster to an MSK cluster. At the same time, you can deliver specific, valuable events to an Elasticsearch cluster.

Second, you can use multiple sinks to increase the robustness of your application. For example, your application that applies filters and enriches streaming data can also archive the raw data stream. If something goes wrong with your more complex application logics, Amazon S3 still has the raw data, which you can use to backfill the sink.

However, there are some trade-offs. When you bundle many functionalities in a single application, you increase the blast radius of failures. If a single component of the application fails, the entire application fails and you need to recover it from the last checkpoint. This causes some downtime and increased delivery latency to all delivery destinations in the application. Also, a single large application is often harder to maintain and to change. You should strike a balance between adding functionality or creating additional Kinesis Data Analytics applications.

Operational aspects

When you run the architecture in production, you set out to execute a single Flink application continuously and indefinitely. It is crucial to implement monitoring and proper alarming to make sure that the pipeline is working as expected and the processing can keep up with the incoming data. Ideally, the pipeline should adapt to changing throughput conditions and cause notifications if it fails to deliver data from the sources to the destinations.

Some aspects require specific attention from an operational perspective. The following section provides some ideas and further references on how you can increase the robustness of your streaming ETL pipelines.

Monitoring and scaling sources

The data stream and the MSK cluster, respectively, are the entry point to the entire architecture. They decouple the data producers from the rest of the architecture. To avoid any impact to data producers, which you often cannot control directly, you need to scale the input stream of the architecture appropriately and make sure that it can ingest messages at any time.

Kinesis Data Streams uses a throughput provisioning model based on shards. Each shard provides a certain read and write capacity. From the number of provisioned shards, you can derive the maximum throughput of the stream in terms of ingested and emitted events and data volume per second. For more information, see Kinesis Data Streams Quotas.

Kinesis Data Streams exposes metrics through CloudWatch that report on these characteristics and indicate whether the stream is over- or under-provisioned. You can use the IncomingBytes and IncomingRecords metrics to scale the stream proactively, or you can use the WriteProvisionedThroughputExceeded metrics to scale the stream reactively. Similar metrics exist for data egress, which you should also monitor. For more information, see Monitoring the Amazon Kinesis Data Streams with Amazon CloudWatch.

The following graph shows some of these metrics for the data stream of the example architecture. On average the Kinesis data stream receives 2.8 million events and 1.1 GB of data every minute.

You can use the IncomingBytes and IncomingRecords metrics to scale the stream proactively whereas you can use the WriteProvisionedThroughputExceeded metrics to scale the stream reactively. You can even automate the scaling of your Kinesis Data Streams. For more information, see Scale Your Amazon Kinesis Stream Capacity with UpdateShardCount.

Apache Kafka and Amazon MSK use a node-based provisioning model. Amazon MSK also exposes metrics through CloudWatch, including metrics that indicate how much data and how many events are ingested into the cluster. For more information, see Amazon MSK Metrics for Monitoring with CloudWatch.

In addition, you can also enable open monitoring with Prometheus for MSK clusters. It is a bit harder to know the total capacity of the cluster, and you often need benchmarking to know when you should scale. For more information about important metrics to monitor, see Monitoring Kafka on the Confluent website.

Monitoring and scaling the Kinesis Data Analytics application

The Flink application is the central core of the architecture. Kinesis Data Analytics executes it in a managed environment, and you want to make sure that it continuously reads data from the sources and persists data in the data sinks without falling behind or getting stuck.

When the application falls behind, it often is an indicator that it is not scaled appropriately. Two important metrics to track the progress of the application are millisBehindLastest (when the application is reading from a Kinesis data stream) and records-lag-max (when it is reading from Apache Kafka and Amazon MSK). These metrics not only indicate that data is read from the sources, but they also tell if data is read fast enough. If the values of these metrics are continuously growing, the application is continuously falling behind, which may indicate that you need to scale up the Kinesis Data Analytics application. For more information, see Kinesis Data Streams Connector Metrics and Application Metrics.

The following graph shows the metrics for the example application in this post. During checkpointing, the maximum millisBehindLatest metric occasionally spikes up to 7 seconds. However, because the reported average of the metric is less than 1 second and the application immediately catches up to the tip of the stream again, it is not a concern for this architecture.

Although the lag of the application is one of the most important metrics to monitor, there are other relevant metrics that Apache Flink and Kinesis Data Analytics expose. For more information, see Monitoring Apache Flink Applications 101 on the Apache Flink website.

Monitoring sinks

To verify that sinks are receiving data and, depending on the sink type, do not run out of storage, you need to monitor sinks closely.

You can enable detailed metrics for your S3 buckets that track the number of requests and data uploaded into the bucket with 1-minute granularity. For more information, see Monitoring Metrics with Amazon CloudWatch. The following graph shows these metrics for the S3 bucket of the example architecture:

When the architecture persists data into a Kinesis data stream or a Kafka topic, it acts as a producer, so the same recommendations as for monitoring and scaling sources apply. For more information about operating and monitoring the service in production environments, see Amazon Elasticsearch Service Best Practices.

Handling errors

“Failures are a given and everything eventually fails over time”, so you should expect the application to fail at some point. For example, an underlying node of the infrastructure that Kinesis Data Analytics manages might fail, or intermittent timeouts on the network can prevent the application from reading from sources or writing to sinks. When this happens, Kinesis Data Analytics restarts the application and resumes processing by recovering from the latest checkpoint. Because the raw events have been persisted in a data stream or Kafka topic, the application can reread the events that have been persisted in the stream between the last checkpoint and when it recovered and continue standard processing.

These kinds of failures are rare and the application can gracefully recover without sacrificing processing semantics, including exactly-once semantics. However, other failure modes need additional attention and mitigation.

When an exception is thrown anywhere in the application code, for example, in the component that contains the logic for parsing events, the entire application crashes. As before, the application eventually recovers, but if the exception is from a bug in your code that a specific event always hits, it results in an infinite loop. After recovering from the failure, the application rereads the event, because it was not processed successfully before, and crashes again. The process starts again and repeats indefinitely, which effectively blocks the application from making any progress.

Therefore, you want to catch and handle exceptions in the application code to avoid crashing the application. If there is a persistent problem that you cannot resolve programmatically, you can use side outputs to redirect the problematic raw events to a secondary data stream, which you can persist to a dead letter queue or an S3 bucket for later inspection. For more information, see Side Outputs on the Apache Flink website.

When the application is stuck and cannot make any progress, it is at least visible in the metrics for application lag. If your streaming ETL pipeline filters or enriches events, failures may be much more subtle, and you may only notice them long after they have been ingested. For instance, due to a bug in the application, you may accidentally drop important events or corrupt their payload in unintended ways. Kinesis data streams stores events for up to 7 days and, though technically possible, Apache Kafka is often not configured to store events indefinitely either. If you don’t identify the corruption quickly enough, you risk losing information when the retention of the raw events expires.

To protect against this scenario, you can persist the raw events to Amazon S3 before you apply any additional transformations or processing to them. You can keep the raw events and reprocess or replay them into the stream if you need to. To integrate the functionality into the application, add a second sink that just writes to Amazon S3. Alternatively, use a separate application that only reads and persists the raw events from the stream, at the cost of running and paying for an additional application.

When to choose what

AWS provides many services that work with streaming data and can perform streaming ETL. Amazon Kinesis Data Firehose can ingest, process, and persist streaming data into a range of supported destinations. There is a significant overlap of the functionality between Kinesis Data Firehose and the solution in this post, but there are different reasons to use one or the other.

As a rule of thumb, use Kinesis Data Firehose whenever it fits your requirements. The service is built with simplicity and ease of use in mind. To use Kinesis Data Firehose, you just need to configure the service. You can use Kinesis Data Firehose for streaming ETL use cases with no code, no servers, and no ongoing administration. Moreover, Kinesis Data Firehose comes with many built-in capabilities, and its pricing model allows you to only pay for the data processed and delivered. If you don’t ingest data into Kinesis Data Firehose, you pay nothing for the service.

In contrast, the solution in this post requires you to create, build, and deploy a Flink application. Moreover, you need to think about monitoring and how to obtain a robust architecture that is not only tolerant against infrastructure failures but also resilient against application failures and bugs. However, this added complexity unlocks many advanced capabilities, which your use case may require. For more information, see Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications and the Amazon Kinesis Data Analytics Developer Guide.

What’s next?

This post discussed how to build a streaming ETL pipeline with Apache Flink and Kinesis Data Analytics. It focused on how to build an extensible solution that addresses some advanced use cases for streaming ingest while maintaining low operational overhead. The solution allows you to quickly enrich, transform, and load your streaming data into your data lake, data store, or another analytical tool without the need for an additional ETL step. The post also explored ways to extend the application with monitoring and error handling.

You should now have a good understanding of how to build streaming ETL pipelines on AWS. You can start capitalizing on your time-sensitive events by using a streaming ETL pipeline that makes valuable information quickly accessible to consumers. You can tailor the format and shape of this information to your use case without adding the substantial latency of traditional batch-based ETL processes.

 


About the Author

Steffen Hausmann is a Specialist Solutions Architect for Analytics at AWS. He works with customers around the globe to design and build streaming architectures so that they can get value from analyzing their streaming data. He holds a doctorate degree in computer science from the University of Munich and in his free time, he tries to lure his daughters into tech with cute stickers he collects at conferences. You can follow his ruthless attempts on Twitter (@sthmmm).

 

 

Optimizing downstream data processing with Amazon Kinesis Data Firehose and Amazon EMR running Apache Spark

Post Syndicated from Srikanth Kodali original https://aws.amazon.com/blogs/big-data/optimizing-downstream-data-processing-with-amazon-kinesis-data-firehose-and-amazon-emr-running-apache-spark/

For most organizations, working with ever-increasing volumes of data and incorporating new data sources can be a challenge.  Often, AWS customers have messages coming from various connected devices and sensors that must be efficiently ingested and processed before further analysis.  Amazon S3 is a natural landing spot for data of all types.  However, the way data is stored in Amazon S3 can make a significant difference in the efficiency and cost of downstream data processing.  Specifically, Apache Spark can be over-burdened with file operations if it is processing a large number of small files versus fewer larger files.  Each of these files has its own overhead of a few milliseconds for opening, reading metadata information, and closing. This overhead of file operations on these large numbers of files results in slow processing. This blog post shows how to use Amazon Kinesis Data Firehose to merge many small messages into larger messages for delivery to Amazon S3.  This results in faster processing with Amazon EMR running Spark.

Like Amazon Kinesis Data Streams, Kinesis Data Firehose accepts a maximum incoming message size of 1 MB.  If a single message is greater than 1 MB, it can be compressed before placing it on the stream.  However, at large volumes, a message or file size of 1 MB or less is usually too small.  Although there is no right answer for file size, 1 MB for many datasets would just yield too many files and file operations.

This post also shows how to read the compressed files using Apache Spark that are in Amazon S3, which does not have a proper file name extension and store back in Amazon S3 in parquet format.

Solution overview

The steps we follow in this blog post are:

  1. Create a virtual private cloud (VPC) and an Amazon S3 bucket.
  2. Provision a Kinesis data stream, and an AWS Lambda function to process the messages from the Kinesis data stream.
  3. Provision Kinesis Data Firehose to deliver messages to Amazon S3 sent from the Lambda function in step 2. This step also provisions an Amazon EMR cluster to process the data in Amazon S3.
  4. Generate test data with custom code running on an Amazon EC2
  5. Run a sample Spark program from the Amazon EMR cluster’s master instance to read the files from Amazon S3, convert them into parquet format and write back to an Amazon S3 destination.

The following diagram explains how the services work together:

The AWS Lambda function in the diagram reads the messages, append additional data to them, and compress them with gzip before sending to Amazon Kinesis Data Firehose. The reason for this is most customers need some enrichment to the data before arriving to Amazon S3.

Amazon Kinesis Data Firehose can buffer incoming messages into larger records before delivering them to your Amazon S3 bucket. It does so according to two conditions, buffer size (up to 128 MB) and buffer interval (up to 900 seconds). Record delivery is triggered once either of these conditions has been satisfied.

An Apache Spark job reads the messages from Amazon S3, and stores them in parquet format. With parquet, data is stored in a columnar format that provides more efficient scanning and enables ad hoc querying or further processing by services like Amazon Athena.

Considerations

The maximum size of a record sent to Kinesis Data Firehose is 1,000 KB. If your message size is greater than this value, compressing the message before it is sent to Kinesis Data Firehose is the best approach. Kinesis Data Firehose  also offers compression of messages after they are written to the Kinesis Data Firehose data stream. Unfortunately, this does not overcome the message size limitation, because this compression happens after the message is written. When Kinesis Data Firehose delivers a previously compressed message to Amazon S3 it is written as an object without a file extension. For example, if a message is compressed with gzip before it is written to Kinesis Data Firehose, it is delivered to Amazon S3 without the .gz extension. This is problematic if you are using Apache Spark for downstream processing because a “.gz” extension is required.

We will see how to overcome this issue by reading the files using the Amazon S3 API operations later in this blog.

Prerequisites and assumptions

To follow the steps outlined in this blog post, you need the following:

  • An AWS account that provides access to AWS services.
  • An AWS Identity and Access Management (IAM) user with an access key and secret access key to configure the AWS CLI.
  • The templates and code are intended to work in the US East (N. Virginia) Region only.

Additionally, be aware of the following:

  • We configure all services in the same VPC to simplify networking considerations.
  • Important: The AWS CloudFormation templates and the sample code that we provide use hardcoded user names and passwords and open security groups. These are for testing purposes only. They aren’t intended for production use without any modifications.

Implementing the solution

You can use this downloadable template for single-click deployment. This template is launched in the US East (N. Virginia) Region by default. Do not change to a different Region. The template is designed to work only in the US East (N. Virginia) Region. To launch directly through the console, choose the Launch Stack button.

This template takes the following parameters. Some of the parameters have default values, and you can’t edit these. These predefined names are hardcoded in the code. For some of the parameters, you must provide the values. The following table provides additional details.

For this parameterProvide this
StackNameProvide the stack name.

ClientIP

 

The IP address range of the client that is allowed to connect to the cluster using SSH.
FirehoseDeliveryStreamNameThe name of the Amazon Firehose delivery stream. Default value is set to “AWSBlogs-LambdaToFireHose”.
InstanceTypeThe EC2 instance type.
KeyNameThe name of an existing EC2 key pair to enable access to login.
KinesisStreamNameThe name of the Amazon Kinesis Stream. Default value is set to “AWS-Blog-BaseKinesisStream”
RegionAWS Region – By default it is us-east-1 — US East (N. Virginia). Do not change this as the scripts are developed to work in this Region only.

EMRClusterName

 

A name for the EMR cluster.
S3BucketNameThe name of the bucket that is created in your account. Provide some unique name to this bucket. This bucket is used for storing the messages and output from the Spark code.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names and for I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND. And then click on the Create button.

If you use this one-step solution, you can skip to Step 7: Generate test dataset and load into Kinesis Data Streams.

To create each component individually, use the following steps.

1. Use the AWS CloudFormation template to configure Amazon VPC and create an Amazon S3 bucket

In this step, we set up a VPC, public subnet, internet gateway, route table, and a security group. The security group has two inbound access rules. The first inbound rule allows access to the TCP port 22 (SSH) from the provided client IP CIDR range and the second inbound rule allows access to any TCP port from any host with in the same security group. We use this VPC and subnet for all other services that are created in the next steps. In addition to these resources, we will also create a standard Amazon S3 bucket with a provided bucket name to store the incoming data and processed data. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.
S3BucketNameProvide a unique Amazon S3 bucket. This bucket is created in your account.
ClientIpProvide a CIDR IP address range that is added to inbound rule of the security group. You can get your current IP address from “checkip.amazon.com” web url.

After you specify the template details, choose Next. On the Review page, choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
StackNameName
VPCIDVpc-xxxxxxx
SubnetIDsubnet-xxxxxxxx
SecurityGroupsg-xxxxxxxxxx
S3BucketDomain<S3_BUCKET_NAME>.s3.amazonaws.com
S3BucketARNarn:aws:s3:::<S3_BUCKET_NAME>

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

2.  Use the AWS CloudFormation template to create necessary IAM Roles

In this step, we set up two AWS IAM roles. One of the IAM roles will be used by an AWS Lambda function to allow access to Amazon S3 service, Amazon Kinesis Data Firehose, Amazon CloudWatch Logs, and Amazon EC2 instances.  The second IAM role is used by the Amazon Kinesis Data Firehose service to access Amazon S3 service. You can use this downloadable CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, select the check box for I acknowledge that AWS CloudFormation might create IAM resources with custom names. Choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
LambdaRoleArnarn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-lamdarole
FirehoseRoleArnarn:aws:iam::<ACCOUNT_NUMBER>:role/small-files-firehoserole

When the stack launch is complete, it returns the output with information about the resources that were created. Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

3. Use an AWS CloudFormation template to configure the Amazon Kinesis Data Firehose data stream

In this step, we set up Amazon Kinesis Data Firehose with Amazon S3 as destination for the incoming messages. We select the Uncompressed option for compression format, buffering options with 128 MB size and interval seconds of 300. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
StackNameProvide the stack name.
FirehoseDeliveryStreamNameProvide the name of the Amazon Kinesis Data Firehose delivery stream. The default value is set to “AWSBlogs-LambdaToFirehose”
RoleProvide the Kinesis Data Firehose IAM role ARN that was created as part of step 2.
S3BucketARNSelect the S3BucketARN. You can get this from the step 1 AWS CloudFormation output.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

4. Use an AWS CloudFormation template to create a Kinesis data stream and a Lambda function

In this step, we set up a Kinesis data stream and an AWS Lambda function. We can use the AWS Lambda function to process incoming messages in a Kinesis data stream. An event source mapping is also created as part of this template. This adds a trigger to the AWS Lambda function for the Kinesis data stream source. For more information about creating event source mapping, see Creating an Event Source Mapping. This Kinesis data stream is created with 10 shards and the Lambda function is created with a Java 8 runtime. We allocate memory size of 1920 MB and timeout of 300 seconds. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides details.

For this parameterDo this
StackNameProvide the stack name.
KinesisStreamNameProvide the name of the Amazon Kinesis stream. Default value is set to ‘AWS-Blog-BaseKinesisStream’
RoleProvide the IAM Role created for Lambda function as part of the second AWS CloudFormation template. Get the value from the output of second AWS CloudFormation template.
S3BucketProvide the existing Amazon S3 bucket name that was created using first AWS CloudFormation template. Do not use the domain name. Provide the bucket name only.
RegionSelect the AWS Region. By default it is us-east-1 — US East (N. Virginia).

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

5. Use an AWS CloudFormation template to configure the Amazon EMR cluster

In this step, we set up an Amazon EMR 5.16.0 cluster with “Spark”, “Ganglia” and “Hive” applications. We create this cluster with one master and two core nodes, and use an r4.xlarge instance type. The template uses an AWS Glue metastore for the Amazon EMR hive metastore. This Amazon EMR cluster is used to process the messages in Amazon S3 bucket that are created by the Amazon Kinesis Data Firehose data stream. You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
EMRClusterNameProvide the name for the EMR cluster.
ClusterSecurityGroupSelect the security group ID that was created as part of the first AWS CloudFormation template.
ClusterSubnetIDSelect the subnet ID that was created as part of the first AWS CloudFormation template.
AllowedCIDRProvide the IP address range of the client that is allowed to connect to the cluster.
KeyNameProvide the name of an existing EC2 key pair to access the Amazon EMR cluster.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page, choose Create.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
EMRClusterMasterssh [email protected] -i <KEY_PAIR_NAME>.pem

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

6. Use an AWS CloudFormation template to create an Amazon EC2 Instance to generate test data

In this step, we set up an Amazon EC2 instance and install open-jdk version 1.8. The AWS CloudFormation script that creates this EC2 instance runs two additional steps. First, it downloads and installs open-jdk version 1.8. Second, it downloads a Java program jar file on to the EC2 instance’s ec2-user home directory. We use this Java program to generate test data messages with an approximate size of ~900 KB. We then send them to the Kinesis data stream that was created as part of the previous steps. The Java jar file name is: “sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar”.

You can use this downloadable AWS CloudFormation template to set up the previous components. To launch directly through the console, choose Launch Stack.

This template takes the following parameters. The following table provides additional details.

For this parameterDo this
EC2SecurityGroupSelect the security group ID that was created from the first AWS CloudFormation template.
EC2SubnetSelect the subnet that was created from the first AWS CloudFormation template.
InstanceTypeSelect the provided instance type. By default, it selects r4.4xlarge instance.
KeyNameName of an existing EC2 key pair to enable SSH access to the EC2 instance.

After you specify the template details, choose Next. On the options page, choose Next again. On the Review page select “I acknowledge that AWS CloudFormation might create IAM resources with custom names” option and, click Create button.

When the stack launch is complete, it should return outputs similar to the following.

KeyValue
EC2Instancessh [email protected]<Public-IP> -i <KEY_PAIR_NAME>.pem

Make a note of the output, because you use this information in the next step. You can view the stack outputs on the AWS Management Console or by using the following AWS CLI command:

$ aws cloudformation describe-stacks --stack-name <stack_name> --region us-east-1 --query 'Stacks[0].Outputs'

7. Generate the test dataset and load into Kinesis Data Streams

After all of the previous AWS CloudFormation stacks are created successfully, log in to the EC2 instance that was created as part of the step 6. Use the “ssh” command as shown in the CloudFormation stack template output. This template copies the “sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar” file, which we use to generate the test data and send to Amazon Kinesis Data Streams. You can find the code corresponding to this sample Kinesis producer in this Git repository.

Make sure your EC2 instance’s security group allows ssh port 22 (Inbound) from your IP address. If not, update your security group inbound access.

Run the following commands to generate some test data.

$ cd;

 

$ ls -ltra sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

-rwxr-xr-x 1 ec2-user ec2-user 27536802 Oct 29 21:19 sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar

 

$java -Xms1024m -Xmx25600m -XX:+UseG1GC -cp sample-kinesis-producer-1.0-SNAPSHOT-jar-with-dependencies.jar com.optimize.downstream.entry.Main 10000

 

This java program uses PutRecords API method that allows many records to be sent with a single HTTP request. For more information on this you can check this AWS blog. Once you run the above java program, you will see the below output that shows messages are in the process of sending to Kinesis Data Stream.

“Starting producer and consumer.....
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 0
Producer Thread # 9 is going to sleep mode for 500 ms.
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 1
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 2
Inserting a message into blocking queue before sending to Kinesis Firehose and Message number is : 3
::
::
Record sent to Kinesis Stream. Record size is ::: 5042850 KB
Sending a record to Kinesis Stream with 5 messages grouped together.
Record sent to Kinesis Stream. Record size is ::: 5042726 KB
Sending a record to Kinesis Stream with 5 messages grouped together.
Record sent to Kinesis Stream. Record size is ::: 5042729 KB”

When running the sample Kinesis producer jar, notice that the number of messages is 10,000. This program generates the test data messages and is not a replacement for your load testing tool. This is created to demonstrate the use case presented in this post.

After all of the messages generated and sent to Amazon Kinesis Data Streams, program will exit gracefully.

The sample JSON input message format is shown as follows:

   "processedDate":"2018/10/30 19:05:19",
   "currentDate":"2018/10/30 19:05:07",
   "hashDeviceId":"0c2745e4-c2d6-4d43-8339-9c2401e80e92",
   "deviceId":"94581b5f-a117-484a-8e3c-4fcc2dbd53b7",
   "accelerometerSensorList":[  
      {  
         "accelerometer_Y":8,
         "gravitySensor_X":5,
         "accelerometer_X":9,
         "gravitySensor_Z":4,
         "accelerometer_Z":1,
         "gravitySensor_Y":5,
         "linearAccelerationSensor_Z":3,
         "linearAccelerationSensor_Y":9,
         "linearAccelerationSensor_X":9
      },
      {  
         "accelerometer_Y":1,
         "gravitySensor_X":3,
         "accelerometer_X":5,
         "gravitySensor_Z":5,
         "accelerometer_Z":7,
         "gravitySensor_Y":9,
         "linearAccelerationSensor_Z":6,
         "linearAccelerationSensor_Y":5,
         "linearAccelerationSensor_X":3
      },
 {
   …
 },
 {
   …
 },
 :
 :
   ],
   "tempSensorList":[  
      {  
         "kelvin":585.4928040286752,
         "celsius":43.329574923775425,
         "fahrenheit":50.13864584530086
      },
      {  
         "kelvin":349.95625855125814,
         "celsius":95.68423052685313,
         "fahrenheit":7.854854574219985
      },
 {
   …
 },
 {
   …
 },
 :
 :
 
   ],
   "illuminancesSensorList":[  
      {  
         "illuminance":44.65135784368194
      },
      {  
         "illuminance":98.15404017082403
      },
 {
   …
 },
 {
   …
 },
 :
 :
   ],
   "gpsSensorList":[  
      {  
         "altitude":4.38273213294682,
         "heading":7.416314616289915,
         "latitude":5.759723677991661,
         "longitude":1.4732885894731842
      },
      {  
         "altitude":9.816473807569487,
         "heading":5.118919157684835,
         "latitude":3.581361614110458,
         "longitude":1.3699272610616127
      },
 {
   …
 },
 {
   …
 },
 :
 :
   }

 

Log in to the Kinesis Data Streams console, then choose the Kinesis data stream that was created as part of the step 4.  Choose the Monitor tab to see the graphs. Run the data generation utility for at least 15 mins to generate enough data.

8. Processing Kinesis Data Streams messages using AWS Lambda

As part of the previously-described setup, we also use an AWS Lambda function (name:LambdaForProcessingKinesisRecords) to process the messages from the Kinesis data stream. This Lambda function reads each message content and appends “additional data.” This demonstrates that the incoming message from Kinesis data stream is read, and appended with some additional information to make the message size more than 1 MB.  Several customers have a use case like this to enrich the incoming messages by adding additional information. After the AWS Lambda function appends additional data to incoming messages, it sends them to Amazon Kinesis Data Firehose. Because Kinesis Data Firehose accepts only messages that are less than 1 MB, we must compress the messages before sending to it. In the Lambda function, we are compressing the message using gzip compression before sending it to Kinesis Data Firehose. In addition to compressing each message, we are also appending a new line character (“/n”) to each message after compressing it to separate the messages.

We set the buffer size to 128 MB and duration of the buffer is 900 seconds while creating the Kinesis Data Firehose. This helps merge the incoming compressed messages into larger messages and sends to the provided Amazon S3 bucket.

The AWS Lambda function appends the following content to the original message in Kinesis Data Streams after reading it.

"testAdditonalDataList": [
  {
    "dimesnion_X": 9,
    "dimesnion_Y": 2,
    "dimesnion_Z": 2
  },
  {
    "dimesnion_X": 3,
    "dimesnion_Y": 10,
    "dimesnion_Z": 5
  }
  {
    …
  },
  {
    …
  },
  :
  :
]

If we do not compress the message before sending to Kinesis Data Firehose, it throws this error message in the Amazon CloudWatch Logs.

Here is the code snippet where we are compressing the message in the AWS Lambda function. The complete code can be found in this Git repository.

private String sendToFireHose(String mergedJsonString)
{
    PutRecordResult res = null;
    try {
        //To Firehose -
        System.out.println("MESSAGE SIZE BEFORE COMPRESSION IS : " + mergedJsonString.toString().getBytes(charset).length);
        System.out.println("MESSAGE SIZE AFTER GZIP COMPRESSION IS : " + compressMessage(mergedJsonString.toString().getBytes(charset)).length);
        PutRecordRequest req = new PutRecordRequest()
                .withDeliveryStreamName(firehoseStreamName);

        // Without compression - Send to Firehose
        //Record record = new Record().withData(ByteBuffer.wrap((mergedJsonString.toString() + "\r\n").getBytes()));

        // With compression - send to Firehose
        Record record = new Record().withData(ByteBuffer.wrap(compressMessage((mergedJsonString.toString() + "\r\n").getBytes())));
        req.setRecord(record);
        res = kinesisFirehoseClient.putRecord(req);
    }
    catch (IOException ie) {
        ie.printStackTrace();
    }
    return res.getRecordId();
}

You can check the provided bucket to see if the messages are flowing into the bucket. The Amazon S3 bucket should show something similar to the following example:

You see the files generated from Kinesis Data Firehose that do not have any extension. By default, Kinesis Data Firehose does not provide any extension to the files that are generated in Amazon S3 bucket unless you select a compression option. But in our use case, since the size of the uncompressed input message is greater than 1 MB, we are compressing it before sending to Kinesis Data Firehose. As the message is already compressed, we are not selecting any compression option in Kinesis Data Firehose, as it double-compresses the message and the downstream Spark application cannot process this.

9. Reading and converting the data into parquet format using Apache Spark program with Amazon EMR

As we noted down from the previous screen shot, Kinesis Data Firehose by default does not generate any file extensions to the files that are written into Amazon S3 bucket. This creates a problem while reading the files using Apache Spark. Apache Spark, by default, checks for a valid file name extension if the file is compressed. In this case for gzip compression, it looks for <filename>.gz to successfully read it.

To overcome this issue, we can use Amazon S3 API operations, particularly AmazonS3Client class, to list all the Amazon S3 keys and use Spark’s parallelize method to read the contents of the files. After reading the file content, we can uncompress it using GZipInputStream class. You can find the code snippet below. The complete code can be found in the Git repository.

val allLinesRDD = spark.sparkContext.parallelize(s3ObjectKeys).flatMap
{ key => Source.fromInputStream
  (
   new GZipInputStream(s3Client.getObject(bucketName, key).getObjectContent:   InputStream)
  ).getLines 
}

var finalDF = spark.read.json(allLinesRDD).toDF()

Once the Amazon EMR cluster creation is completed successfully, login to the Amazon EMR master machine using the following command. You can get the “ssh” login command from the AWS CloudFormation stack 5 (step 5) outputs parameter “EMRClusterMaster”.

  • ssh [email protected] -i <KEYPAIR_NAME>.pem
  • Make sure the security port 22 is opened to connect to the Amazon EMR master machine.

Run the Spark program using the following Spark submit command.

spark-submit --class com.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet --master yarn --deploy-mode client s3://aws-bigdata-blog/artifacts/aws-blog-optimize-downstream-data-processing/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar <S3_BUCKET_NAME> fromfirehose/<YEAR>/ output-emr-parquet/

Change the S3_BUCKET_NAME and YEAR values from the previous Spark command.

Argument #PropertyValue
1–classcom.optimize.downstream.process.ProcessFilesFromS3AndConvertToParquet
2–masteryarn
3–deploy-modeclient
4s3://aws-bigdata-blog/artifacts/aws-blog-avoid-small-files/appjars/spark-process-1.0-SNAPSHOT-jar-with-dependencies.jar
5S3_BUCKET_NAMEThe Amazon S3 bucket name that was created as part of the AWS CloudFormation template. The source files are created in this bucket.
6<INPUT S3 LOCATION>“fromfirehose/<YYYY>/”. The input files are created in this Amazon S3 key location under the bucket that was created. “YYYY” represents the current year. For example, “fromfirehose/2018/”
7<OUTPUT S3 LOCATION>Provide an output directory name that will be created under the above provided Amazon S3 bucket. For example: “output-emr-parquet/”

 

When the program finishes running, you can check the Amazon S3 output location to see the files that are written in parquet format.

Cleaning up after the migration

After completing and testing this solution, clean up the resources by stopping your tasks and deleting the AWS CloudFormation stacks. The stack deletion fails if you have any files in the created Amazon S3 bucket. Make sure that you cleaned up the Amazon S3 bucket that was created before deleting the AWS CloudFormation templates.

Conclusion

In this post, we described the process of avoiding small file creation in Amazon S3 by sending the incoming messages to Amazon Kinesis Data Firehose. We also went through the process of reading and storing the data in parquet format using Apache Spark with an Amazon EMR cluster.

 


About the Author

Photo of Srikanth KodaliSrikanth Kodali is a Sr. IOT Data analytics architect at Amazon Web Services. He works with AWS customers to provide guidance and technical assistance on building IoT data and analytics solutions, helping them improve the value of their solutions when using AWS.

 

 

Amazon Kinesis Data Firehose custom prefixes for Amazon S3 objects

Post Syndicated from Rajeev Chakrabarti original https://aws.amazon.com/blogs/big-data/amazon-kinesis-data-firehose-custom-prefixes-for-amazon-s3-objects/

In February 2019, Amazon Web Services (AWS) announced a new feature in Amazon Kinesis Data Firehose called Custom Prefixes for Amazon S3 Objects. It lets customers specify a custom expression for the Amazon S3 prefix where data records are delivered. Previously, Kinesis Data Firehose allowed only specifying a literal prefix. This prefix was then combined with a static date-formatted prefix to create the output folder in a fixed format. Customers asked for flexibility, so AWS listened and delivered.

Kinesis Data Firehose is most commonly used to consume event data from streaming sources, such as applications or IoT devices.  The data then is typically stored in a data lake, so it can be processed and eventually queried.  When storing data on Amazon S3, it is a best practice to partition or group related data and store it together in the same folder.  This provides the ability to filter the partitioned data and control the amount of data scanned by each query, thus improving performance and reducing cost.

A common way to group data is by date.  Kinesis Data Firehose automatically groups data and stores it into the appropriate folders on Amazon S3 based on the date.  However, the naming of folders in Amazon S3 is not compatible with Apache Hive naming conventions. This makes data more difficult to catalog using AWS Glue crawlers and analyze using big data tools.

This post discusses a new capability that lets us customize how Kinesis Data Firehose names the output folders in Amazon S3. It covers how custom prefixes work, the intended use cases, and includes step-by-step instructions to try the feature in your own account.

The need for custom prefixes for Amazon S3 objects

Previously, Kinesis Data Firehose created a static Universal Coordinated Time (UTC) based folder structure in the format YYYY/MM/DD/HH. It then appended it to the provided prefix before writing objects to Amazon S3. For example, if you provided a prefix “mydatalake/”, the generated folder hierarchy would be “mydatalake/2019/02/09/13”.  However, to be compatible with Hive naming conventions, the folder structure is expected to follow the format “/partitionkey=partitionvalue”.  Using this naming convention, data can be easily cataloged with AWS Glue crawlers, resulting in proper partition names.

Other methods for managing partitions also become possible such as running MSCK REPAIR TABLE in Amazon Athena or Apache Hive on Amazon EMR, which can add all partitions through a single statement. Furthermore, you can use other date-based partitioning patterns like “/dt=2019-02-09-13/” instead of expanding the date out into folders.  This is helpful in reducing the total number of partitions that need to be maintained as the table grows over time. It also simplifies range queries. Providing the ability to specify custom prefixes obviates the need for an additional ETL step to put the data in the right folder structure improving the time to insight.

How custom prefixes for Amazon S3 objects works

This new capability does not let you use any date or timestamp value from your event data, nor can you use any other arbitrary value in the event. Kinesis Data Firehose uses an internal timestamp field called ApproximateArrivalTimestamp. Each data record includes an ApproximateArrivalTimestamp (in UTC) that is set when a stream successfully receives and stores the record. This is commonly referred to as a server-side timestamp. Kinesis Data Firehose buffers incoming records according to the configured buffering hints and delivers them into Amazon S3 objects for the Amazon S3 destination. The resulting objects in Amazon S3 may contain multiple records, each with a different ApproximateArrivalTimestamp. When evaluating timestamps, Kinesis Data Firehose uses the ApproximateArrivalTimestamp of the oldest record that’s contained in the Amazon S3 object being written.

Kinesis Data Firehose also provides the ability to deliver records to a different error output location when there is a delivery, AWS Lambda transformation or format conversion failure. Previously, the error output location could not be configured and was determined by the type of delivery failure. With this release, the error output location (ErrorOutputPrefix) can also be configured. One benefit of this new capability is that you can separate failed records into date partitioned folders for easy reprocessing.

So how do you specify the custom Prefix and the ErrorOutputPrefix? You use an expression of the form: !{namespace:value}, where the namespace can be either firehose or timestamp. The value can be either “random-string” or “error-output-type” for the firehose namespace or a date pattern for the timestamp namespace in the Java DateTimeFormatter format. In a single expression, you can use a combination of the two namespaces although the !{firehose: error-output-type} can be used only in the ErrorOutputPrefix. For more information and examples, see Custom Prefixes for Amazon S3 Objects.

Writing streaming data into Amazon S3 with Kinesis Data Firehose

This walkthrough describes how streaming data can be written into Amazon S3 with Kinesis Data Firehose using a Hive compatible folder structure.  It then shows how AWS Glue crawlers can infer the schema and extract the proper partition names that we designated in Kinesis Data Firehose, and catalog them in AWS Glue Data Catalog.  Finally, we run sample queries to show that partitions are indeed being recognized.

To demonstrate this, we use python code to generate sample data.  We also use a Lambda transform on Kinesis Data Firehose to forcibly create failures. This demonstrates how data can be saved to the error output location. The code that you need for this walkthrough is included here in GitHub.

For this walkthrough, this is the architecture that we are building:

Step 1: Create an Amazon S3 bucket

Create an S3 bucket to be used by Kinesis Data Firehose to deliver event records. We use the AWS Command Line Interface (AWS CLI) to create the Amazon S3 bucket in the US East (N. Virginia) Region. Remember to substitute the bucket name in the example for your own.

aws s3 mb s3://kdfs3customprefixesexample --region us-east-1

Step 2: Lambda Transform (optional)

The incoming events have an ApproximateArrivalTimestamp field in the event payload.  This is sufficient to create a proper folder structure on Amazon S3.  However, when querying the data it may be beneficial to expose this timestamp value as a top level column for easy filtering and validation.  To accomplish this, we create a Lambda function that adds the ApproximateArrivalTimestamp as a top level field in the data payload. The data payload is what Kinesis Data Firehose writes as an object in Amazon S3. Additionally, the Lambda code also artificially generates some processing errors that are delivered to the “ErrorOutputPrefix” location specified for the delivery destination to illustrate the use of expressions in the “ErrorOutputPrefix.”

Create an IAM role for the Lambda transform function

First, create a role for the Lambda function called LambdaBasicRole. The TrustPolicyForLambda.json file is included in the GitHub repository.

$ aws iam create-role --role-name KDFLambdaBasicRole --assume-role-policy-document file://TrustPolicyForLambda.json

After the role is created, attach the managed Lambda basic execution policy to it.

$ aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole --role-name KDFLambdaBasicRole

Lambda function

To create the Lambda function, start with the Python Kinesis Data Firehose blueprint “General Firehose Processing” and then modify it. For more information about the structure of the records and what must be returned, see Amazon Kinesis Data Firehose Data Transformation.

Zip up the Python file, and then create the Lambda function using the AWS CLI. The CreateLambdaFunctionS3CustomPrefixes.json file is included in the GitHub repository.

aws lambda create-function --zip-file "fileb://lambda_function.zip" --cli-input-json file://CreateLambdaFunctionS3CustomPrefixes.json

Step3. Delivery Stream

Next, create the Kinesis Data Firehose delivery stream. The createdeliverystream.json file is included in the GitHub repository.

 aws firehose create-delivery-stream --cli-input-json file://createdeliverystream.json

In the previous configuration, we defined a Prefix and an ErrorOutputPrefix under the “ExtendedS3DestinationConfiguration” element. We defined the same for the “S3BackupConfiguration” element. Note that when the “ProcessingConfiguration” element is set to “Disabled”, the ErrorOutputPrefix parameter of the “ExtendedS3DestinationConfiguration” element exists only for consistency. It otherwise has no significance.

We’ve chosen a prefix that will result in a folder structure compatible with hive-style partitioning. This is the prefix we used:

“fhbase/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/”

Kinesis Data Firehose first creates a base folder called “fhbase” directly under the Amazon S3 bucket. Second, it evaluates the expressions !{timestamp:YYYY}, !{timestamp:MM}, !{timestamp:dd}, and !{timestamp:HH} to year, month, day and hour using the Java DateTimeFormatter format. For example, an ApproximateArrivalTimestamp of 1549754078390 in UNIX epoch time, which is 2019-02-09T16:13:01.000000Z in UTC would evaluate to “year=2019”, “month=02”, “day=09” and “hour=16”.  Therefore, the location in Amazon S3 where data records that are delivered evaluate to “fhbase/year=2019/month=02/day=09/hour=16/”.

Similarly, the ErrorOutputPrefix “fherroroutputbase/!{firehose:random-string}/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/” results in a base folder called “fherroroutputbase” directly under the S3 bucket. The expression !{firehose:random-string} evaluates to an 11 character random string like “ztWxkdg3Thg”.  If you use this more than once in the same expression, every instance evaluates to a new random string. The expression !{firehose:error-output-type} evaluates to one of the following:

  1. “processing-failed” for Lambda transformation delivery failures
  2. “elasticsearch-failed” for an Amazon ES destination delivery failures
  3. “splunk-failed” for Splunk destination delivery failures
  4. “format-conversion-failed” for data format conversion failures

So, the location for an Amazon S3 object containing the delivery failed records for a Lambda transformation could evaluate to: fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/09/.

You can run aws firehose describe-delivery-stream --delivery-stream-name KDFS3customPrefixesExample to describe the delivery stream created.

Next, enable encryption-at-rest for the delivery stream:

aws firehose start-delivery-stream-encryption --delivery-stream-name KDFS3customPrefixesExample

Or Create the delivery stream using the AWS Console

  1. Choose the source. For this example, I use Direct PUT.
  2. Choose if you would like to transform the incoming records with a Lambda transformation. I chose Enabled, and chose the name of the Lambda function that I had created earlier.

  1. Choose the destination. I chose the Amazon S3 destination.

  1. Choose the Amazon S3 bucket. I chose the Amazon S3 bucket that I had created earlier in this exercise.

  1. Specify the Amazon S3 Prefix and the Amazon S3 error prefix. This corresponds to the “Prefix” and “ErrorOutputPrefix” explained earlier in the context of the AWS CLI input JSON.

  1. Choose whether you would like to back up the raw (before transformation) records to another Amazon S3 location. I chose Enabled and specified the same bucket (you could choose a different bucket). I also specified a different prefix from the transformed records – the base folder is different but the folder structure below that is the same. This would make it more efficient to crawl this location using an AWS Glue crawler or create external tables in Athena or Redshift Spectrum pointing to this location.

  1. Specify the buffering hints for the Amazon S3 destination. I chose 1 MB and 240 seconds.
  2. Choose the S3 Compression and encryption settings. I chose no compression for the transformed records’ location. I chose to encrypt the Amazon S3 location at rest by using the service-managed AWS KMS customer master key (CMK).
  3. Choose whether you want to enable Error Logging in Cloudwatch. I chose Enabled.
  4. Specify the IAM role that you want Kinesis Data Firehose to assume to access resources on your behalf. Choose either Create new or Choose to display a new screen. Choose Create a new IAM role, name the role, and then choose Allow.
  5. Choose Create Delivery Stream.

The delivery stream is now created and active. You can send events to it.

 Test with sample data

I used Python code to generate sample data. The structure of the generated data is as follows:

{'sector': 'HEALTHCARE', 'price': 194.07, 'ticker_symbol': 'UFG', u'EventTime': '2019-02-12T07:10:52.649000Z', 'change': 20.56}
{'sector': 'HEALTHCARE', 'price': 124.01, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:53.745000Z', 'change': 3.32}
{'sector': 'MANUFACTURING', 'price': 26.95, 'ticker_symbol': 'QXZ', u'EventTime': '2019-02-12T07:10:54.864000Z', 'change': 24.53}

Sample code to generate data and push it into Kinesis Data Firehose is included in the GitHub repository.

After you start sending events to the Kinesis Data Firehose delivery stream, objects should start appearing under the specified prefixes in Amazon S3.

I wanted to illustrate Lambda invoke errors and the appearance of files in the ErrorOutputPrefix location for Lambda transform errors. Therefore, I did not give permissions to the “firehose_delivery_role” to invoke my Lambda function. The following file showed up in the location specified by the ErrorOutputPrefix.

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/FxvO2Tf9MQP/processing-failed/2019/02/12/

2019-02-12 16:57:24     260166 KDFS3customPrefixesExample-1-2019-02-12-16-53-20-5262db81-0f3a-48bf-8fc6-2249124923ff

Here is a snippet of the contents of the error file that I previously mentioned.

{"attemptsMade":4,"arrivalTimestamp":1549990400391,"errorCode":"Lambda.InvokeAccessDenied","errorMessage":"Access was denied. Ensure that the access policy allows access to the Lambda function.","attemptEndingTimestamp":1549990478018,"rawData":"eyJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE4Ny45NCwgInRpY2tlcl9zeW1ib2wiOiAiVUZHIiwgIkV2ZW50VGltZSI6ICIyMDE5LTAyLTEyVDE2OjUzOjE5Ljk5MzAwMFoiLCAiY2hhbmdlIjogOS4yNn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

After I gave the “firehose_delivery_role” the appropriate permissions, the data objects showed up in the “Prefix” location specified for the Amazon S3 destination.

aws s3 ls s3://kdfs3customprefixesexample/fhbase/year=2019/month=02/day=12/hour=17/

2019-02-12 17:17:26    1392167 KDFS3customPrefixesExample-1-2019-02-12-17-14-51-fc63e8f6-7421-491d-8417-c5002fca1722

2019-02-12 17:18:39    1391946 KDFS3customPrefixesExample-1-2019-02-12-17-16-43-e080a18a-3e1e-45ad-8f1a-98c7887f5430

Also, because the Lambda code in my Lambda transform set the status failed for 10 percent of the records, those showed up in the ErrorOutputPrefix location for Lambda transform errors.

aws s3 ls s3://kdfs3customprefixesexample/fherroroutputbase/ztWxkdg3Thg/processing-failed/2019/02/12/

2019-02-12 17:25:54     180092 KDFS3customPrefixesExample-1-2019-02-12-17-21-53-3bbfe7c0-f505-47d0-b880-797ce9035f73

Here is a snippet of the content of the error file:

{"attemptsMade":1,"arrivalTimestamp":1549992113419,"errorCode":"Lambda.ProcessingFailedStatus","errorMessage":"ProcessingFailed status set for record","attemptEndingTimestamp":1549992138424,"rawData":"eyJ0aWNrZXJfc3ltYm9sIjogIlFYWiIsICJzZWN0b3IiOiAiSEVBTFRIQ0FSRSIsICJwcmljZSI6IDE3LjUyLCAiY2hhbmdlIjogMTcuNTUsICJFdmVudFRpbWUiOiAiMjAxOS0wMi0xMlQxNzoyMTo1My4zOTY2NDdaIn0=","lambdaArn":"arn:aws:lambda:us-east-1:<account-id>:function:KDFS3CustomPrefixesTransform:$LATEST"}

You’re now ready to create an AWS Glue crawler. For more information about using the AWS Glue Data Catalog, see Populating the AWS Glue Data Catalog.

  1. In the AWS Glue console, go to Crawlers, and choose Add Crawler.

  1. Add information about your crawler, then choose Next.
  2. In the Include Path, specify the Amazon S3 bucket name that you entered under the Amazon S3 destination. Also include the static prefix used when you created the Kinesis Data Firehose delivery stream. Do not include the custom prefix expression.
  3. Choose Next.

  1. Choose Next, No, Next.
  2. Specify the IAM role that AWS Glue would use. I chose to create a new IAM Role. Choose Next.
  3. Specify a schedule to run the crawler. I chose to Run it on Demand. Choose Next.
  4. Specify where the crawler adds the crawled and discovered tables. I chose the default database. Choose Next.

  1. Choose Finish.
  1. The crawler has been created and is ready to be run. Choose Run crawler.

  1. In the AWS Glue console, go to Tables. You can see that a table has been created with the name of the base folder. Choose fhbase.

The crawler has discovered and populated the table and its properties.

You can see the discovered schema. The crawler has identified and created the partitions based on the folder structure specified by the prefix expression.

Open the Amazon Athena console, and select the default database from the drop-down menu. Write the following query in the New query1 window, then choose Run query.

SELECT * FROM "default"."fhbase"

where year = '2019' and day = '12' and hour = '17'

order by approxarrtimestamputcfh desc

Notice that Amazon Athena recognizes the fhbase table as a partitioned table. The query can take advantage of the partitions in the query to filter the results.

Conclusion

As this post illustrates, Custom Prefixes for Amazon S3 objects provides much flexibility to customize the folder structure, where Kinesis Data Firehose delivers the data records and failure records in Amazon S3. Having control over the folder structure and naming in Amazon S3 simplifies data discovery, cataloging, and access. As a result, it helps get insight more expediently and helps you better manage the cost of your queries.

 


About the Author

Rajeev Chakrabarti is a Kinesis specialist solutions architect.

 

 

 

 

Trimming AWS WAF logs with Amazon Kinesis Firehose transformations

Post Syndicated from Tino Tran original https://aws.amazon.com/blogs/security/trimming-aws-waf-logs-with-amazon-kinesis-firehose-transformations/

In an earlier post, Enabling serverless security analytics using AWS WAF full logs, Amazon Athena, and Amazon QuickSight, published on March 28, 2019, the authors showed you how to stream WAF logs with Amazon Kinesis Firehose for visualization using QuickSight. This approach used no filtering of the logs so that you could visualize the full data set. However, you are often only interested in seeing specific events. Or you might be looking to minimize log size to save storage costs. In this post, I show you how to apply rules in Amazon Kinesis Firehose to trim down logs. You can then apply the same visualizations you used in the previous solution.

AWS WAF is a web application firewall that supports full logging of all the web requests it inspects. For each request, AWS WAF logs the raw HTTP/S headers along with information on which AWS WAF rules were triggered. Having complete logs is useful for compliance, auditing, forensics, and troubleshooting custom and Managed Rules for AWS WAF. However, for some use cases, you might not want to log all of the requests inspected by AWS WAF. For example, to reduce the volume of logs, you might only want to log the requests blocked by AWS WAF, or you might want to remove certain HTTP header or query string parameter values from your logs. In many cases, unblocked requests are often already stored in your CloudFront access logs or web server logs and, therefore, using AWS WAF logs can result in redundant data for these requests, while logging blocked traffic can help you to identify bad actors or root cause false positives.

In this post, I’ll show you how to create an Amazon Kinesis Data Firehose stream to filter out unneeded records, so that you only retain log records for requests that were blocked by AWS WAF. From here, the logs can be stored in Amazon S3 or directed to SIEM (Security information and event management) and log analysis tools.

To simplify things, I’ll provide you with a CloudFormation template that will create the resources highlighted in the diagram below:
 

Figure 1: Solution architecture

Figure 1: Solution architecture

  1. A Kinesis Data Firehose delivery stream is used to receive log records from AWS WAF.
  2. An IAM role for the Kinesis Data Firehose delivery stream, with permissions needed to invoke Lambda and write to S3.
  3. A Lambda function used to filter out WAF records matching the default action before the records are written to S3.
  4. An IAM role for the Lambda function, with the permissions needed to create CloudWatch logs (for troubleshooting).
  5. An S3 bucket where the WAF logs will be stored.

Prerequisites and assumptions

  • In this post, I assume that the AWS WAF default action is configured to allow requests that don’t explicitly match a blocking WAF rule. So I’ll show you how to omit any records matching the WAF default action.
  • You need to already have a AWS WAF WebACL created. In this example, you’ll use a WebACL generated from the AWS WAF OWASP 10 template. For more information on deploying AWS WAF to a CloudFront or ALB resource, see the Getting Started page.

Step 1: Create a Kinesis Data Firehose delivery stream for AWS WAF logs

In this step, you’ll use the following CloudFormation template to create a Kinesis Data Firehose delivery stream that writes logs to an S3 bucket. The template also creates a Lambda function that omits AWS WAF records matching the default action.

Here’s how to launch the template:

  1. Open CloudFormation in the AWS console.
  2. For WAF deployments on Amazon CloudFront, select region US-EAST-1. Otherwise, create the stack in the same region in which your AWS WAF Web ACL is deployed.
  3. Select the Create Stack button.
  4. In the CloudFormation wizard, select Specify an Amazon S3 template URL and copy and paste the following URL into the text box, then select Next:
    https://s3.amazonaws.com/awsiammedia/public/sample/TrimAWSWAFLogs/KinesisWAFDeliveryStream.yml
  5. On the options page, leave the default values and select Next.
  6. Specify the following and then select Next:
    1. Stack name: (for example, kinesis-waf-logging). Make sure to note your stack name, as you’ll need to provide it later in the walkthrough.
    2. Buffer size: This value specifies the size in MB for which Kinesis will buffer incoming records before processing.
    3. Buffer interval: This value specifies the interval in seconds for which Kinesis will buffer incoming records before processing.

    Note: Kinesis will trigger data delivery based on which buffer condition is satisfied first. This CloudFormation sets the default buffer size to 3MB and interval size to 900 seconds to match the maximum transformation buffer size and intervals which is set by this template. To learn more about Kinesis Data Firehose buffer conditions, read this documentation.

     

    Figure 2: Specify the stack name, buffer size, and buffer interval

    Figure 2: Specify the stack name, buffer size, and buffer interval

  7. Select the check box for I acknowledge that AWS CloudFormation might create IAM resources and choose Create.
  8. Wait for the template to finish creating the resources. This will take a few minutes. On the CloudFormation dashboard, the status next to your stack should say CREATE_COMPLETE.
  9. From the AWS Management Console, open Amazon Kinesis and find the Data Firehose delivery stream on the dashboard. Note that the name of the stream will start with aws-waf-logs- and end with the name of the CloudFormation. This prefix is required in order to configure AWS WAF to write logs to the Kinesis stream.
  10. From the AWS Management Console, open AWS Lambda and view the Lambda function created from the CloudFormation template. The function name should start with the Stack name from the CloudFormation template. I included the function code generated from the CloudFormation template below so you can see what’s going on.

    Note: Through CloudFormation, the code is deployed without indentation. To format it for readability, I recommend using the code formatter built into Lambda under the edit tab. This code can easily be modified for custom record filtering or transformations.

    
        'use strict';
    
        exports.handler = (event, context, callback) => {
            /* Process the list of records and drop those containing Default_Action */
            const output = event.records.map((record) => {
                const entry = (new Buffer(record.data, 'base64')).toString('utf8');
                if (!entry.match(/Default_Action/g)){
                    return {
                        recordId: record.recordId,
                        result: 'Ok',
                        data: record.data,
                    };
                } else {
                    return {
                        recordId: record.recordId,
                        result: 'Dropped',
                        data: record.data,
                    };
                }
            });
        
            console.log(`Processing completed.  Successful records ${output.length}.`);
            callback(null, { records: output });
        };"        
        

You now have a Kinesis Data Firehose stream that AWS WAF can use for logging records.

Cost Considerations

This template sets the Kinesis transformation buffer size to 3MB and buffer interval to 900 seconds (the maximum values) in order to reduce the number of Lambda invocations used to process records. On average, an AWS WAF record is approximately 1-1.5KB. With a buffer size of 3MB, Kinesis will use 1 Lambda invocation per 2000-3000 records. Visit the AWS Lambda website to learn more about pricing.

Step 2: Configure AWS WAF Logging

Now that you have an active Amazon Kinesis Firehose delivery stream, you can configure your AWS WAF WebACL to turn on logging.

  1. From the AWS Management Console, open WAF & Shield.
  2. Select the WebACL for which you would like to enable logging.
  3. Select the Logging tab.
  4. Select the Enable Logging button.
  5. Next to Amazon Kinesis Data Firehose, select the stream that was created from the CloudFormation template in Step 1 (for example, aws-waf-logs-kinesis-waf-stream) and select Create.

Congratulations! Your AWS WAF WebACL is now configured to send records of requests inspected by AWS WAF to Kinesis Data Firehose. From there, records that match the default action will be dropped, and the remaining records will be stored in S3 in JSON format.

Below is a sample of the logs generated from this example. Notice that there are only blocked records in the logs.
 

Figure 3: Sample logs

Figure 3: Sample logs

Conclusion

In this blog, I’ve provided you with a CloudFormation template to generate a Kinesis Data Firehose stream that can be used to log requests blocked by AWS WAF, omitting requests matching the default action. By omitting the default action, I have reduced the number of log records that must be reviewed to identify bad actors, tune new WAF rules, and/or root cause false positives. For unblocked traffic, consider using CloudFront’s access logs with Amazon Athena or CloudWatch Logs Insights to query and analyze the data. To learn more about AWS WAF logs, read our developer guide for AWS WAF.

If you have feedback about this blog post, , please submit them in the Comments section below. If you have issues with AWS WAF, start a thread on the AWS SSO forum or contact AWS Support.

Want more AWS Security how-to content, news, and feature announcements? Follow us on Twitter.

Author

Tino Tran

Tino is a Senior Edge Specialized Solutions Architect based out of Florida. His main focus is to help companies deliver online content in a secure, reliable, and fast way using AWS Edge Services. He is a experienced technologist with a background in software engineering, content delivery networks, and security.

Enabling serverless security analytics using AWS WAF full logs, Amazon Athena, and Amazon QuickSight

Post Syndicated from Umesh Ramesh original https://aws.amazon.com/blogs/security/enabling-serverless-security-analytics-using-aws-waf-full-logs/

Traditionally, analyzing data logs required you to extract, transform, and load your data before using a number of data warehouse and business intelligence tools to derive business intelligence from that data—on top of maintaining the servers that ran behind these tools.

This blog post will show you how to analyze AWS Web Application Firewall (AWS WAF) logs and quickly build multiple dashboards, without booting up any servers. With the new AWS WAF full logs feature, you can now log all traffic inspected by AWS WAF into Amazon Simple Storage Service (Amazon S3) buckets by configuring Amazon Kinesis Data Firehose. In this walkthrough, you’ll create an Amazon Kinesis Data Firehose delivery stream to which AWS WAF full logs can be sent, and you’ll enable AWS WAF logging for a specific web ACL. Then you’ll set up an AWS Glue crawler job and an Amazon Athena table. Finally, you’ll set up Amazon QuickSight dashboards to help you visualize your web application security. You can use these same steps to build additional visualizations to draw insights from AWS WAF rules and the web traffic traversing the AWS WAF layer. Security and operations teams can monitor these dashboards directly, without needing to depend on other teams to analyze the logs.

The following architecture diagram highlights the AWS services used in the solution:

Figure 1: Architecture diagram

Figure 1: Architecture diagram

AWS WAF is a web application firewall that lets you monitor HTTP and HTTPS requests that are forwarded to an Amazon API Gateway API, to Amazon CloudFront or to an Application Load Balancer. AWS WAF also lets you control access to your content. Based on conditions that you specify—such as the IP addresses from which requests originate, or the values of query strings—API Gateway, CloudFront, or the Application Load Balancer responds to requests either with the requested content or with an HTTP 403 status code (Forbidden). You can also configure CloudFront to return a custom error page when a request is blocked.

Amazon Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk. With Kinesis Data Firehose, you don’t need to write applications or manage resources. You configure your data producers to send data to Kinesis Data Firehose, and it automatically delivers the data to the destination that you specified. You can also configure Kinesis Data Firehose to transform your data before delivering it.

AWS Glue can be used to run serverless queries against your Amazon S3 data lake. AWS Glue can catalog your S3 data, making it available for querying with Amazon Athena and Amazon Redshift Spectrum. With crawlers, your metadata stays in sync with the underlying data (more details about crawlers later in this post). Amazon Athena and Amazon Redshift Spectrum can directly query your Amazon S3 data lake by using the AWS Glue Data Catalog. With AWS Glue, you access and analyze data through one unified interface without loading it into multiple data silos.

Amazon Athena is an interactive query service that makes it easy to analyze data directly in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Amazon QuickSight is a business analytics service you can use to build visualizations, perform one-off analysis, and get business insights from your data. It can automatically discover AWS data sources and also works with your data sources. Amazon QuickSight enables organizations to scale to hundreds of thousands of users and delivers responsive performance by using a robust in-memory engine called SPICE.

SPICE stands for Super-fast, Parallel, In-memory Calculation Engine. SPICE supports rich calculations to help you derive insights from your analysis without worrying about provisioning or managing infrastructure. Data in SPICE is persisted until it is explicitly deleted by the user. SPICE also automatically replicates data for high availability and enables Amazon QuickSight to scale to hundreds of thousands of users who can all simultaneously perform fast interactive analysis across a wide variety of AWS data sources.

Step one: Set up a new Amazon Kinesis Data Firehose delivery stream

  1. In the AWS Management Console, open the Amazon Kinesis Data Firehose service and choose the button to create a new stream.
    1. In the Delivery stream name field, enter a name for your new stream that starts with aws-waf-logs- as shown in the screenshot below. AWS WAF filters all streams starting with the keyword aws-waf-logs when it displays the delivery streams. Note the name of your stream since you’ll need it again later in the walkthrough.
    2. For Source, choose Direct PUT, since AWS WAF logs will be the source in this walkthrough.

      Figure 2: Select the delivery stream name and source

      Figure 2: Select the delivery stream name and source

  2. Next, you have the option to enable AWS Lambda if you need to transform your data before transferring it to your destination. (You can learn more about data transformation in the Amazon Kinesis Data Firehose documentation.) In this walkthrough, there are no transformations that need to be performed, so for Record transformation, choose Disabled.
    Figure 3: Select "Disabled" for record transformations

    Figure 3: Select “Disabled” for record transformations

    1. You’ll have the option to convert the JSON object to Apache Parquet or Apache ORC format for better query performance. In this example, you’ll be reading the AWS WAF logs in JSON format, so for Record format conversion, choose Disabled.

      Figure 4: Choose "Disabled" to not convert the JSON object

      Figure 4: Choose “Disabled” to not convert the JSON object

  3. On the Select destination screen, for Destination, choose Amazon S3.
    Figure 5: Choose the destination

    Figure 5: Choose the destination

    1. For the S3 destination, you can either enter the name of an existing S3 bucket or create a new S3 bucket. Note the name of the S3 bucket since you’ll need the bucket name in a later step in this walkthrough.
    2. For Source record S3 backup, choose Disabled, because the destination in this walkthrough is an S3 bucket.

      Figure 6: Enter the S3 bucket name, and select "Disabled" for the Source record S3 backup

      Figure 6: Enter the S3 bucket name, and select “Disabled” for the source record S3 backup

  4. On the next screen, leave the default conditions for Buffer size, Buffer interval, S3 compression and S3 encryption as they are. However, we recommend that you set Error logging to Enabled initially, for troubleshooting purposes.
    1. For IAM role, select Create new or choose. This opens up a new window that will prompt you to create firehose_delivery_role, as shown in the following screenshot. Choose Allow in this window to accept the role creation. This grants the Kinesis Data Firehose service access to the S3 bucket.

      Figure 7: Select "Create new or choose" for IAM Role

      Figure 7: Select “Allow” to create the IAM role “firehose_delivery_role”

  5. On the last step of configuration, review all the options you’ve chosen, and then select Create delivery stream. This will cause the delivery stream to display as “Creating” under Status. In a couple of minutes, the status will change to “Active,” as shown in the below screenshot.

    Figure 8: Review the options you selected

    Figure 8: Review the options you selected

Step two: Enable AWS WAF logging for a specific Web ACL

  1. From the AWS Management Console, open the AWS WAF service and choose Web ACLs. Open your Web ACL resource, which can either be deployed on a CloudFront distribution or on an Application Load Balancer.
    1. Choose the Web ACL for which you want to enable logging. (In the below screenshot, we’ve selected a Web ACL in the US East Region.)
    2. On the Logging tab, choose Enable Logging.

      Figure 9: Choose "Enable Logging"

      Figure 9: Choose “Enable Logging”

  2. The next page displays all the delivery streams that start with aws-waf-logs. Choose the Amazon Kinesis Data Firehose delivery stream that you created for AWS WAF logs at the start of this walkthrough. (In the screenshot below, our example stream name is “aws-waf-logs-us-east-1)
    1. You can also choose to redact certain fields that you wish to exclude from being captured in the logs. In this walkthrough, you don’t need to choose any fields to redact.
    2. Select Create.

      Figure 10: Choose your delivery stream, and select "Create"

      Figure 10: Choose your delivery stream, and select “Create”

After a couple of minutes, you’ll be able to inspect the S3 bucket that you defined in the Kinesis Data Firehose delivery stream. The log files are created in directories by year, month, day, and hour.

Step three: Set up an AWS Glue crawler job and Amazon Athena table

The purpose of a crawler within your Data Catalog is to traverse your data stores (such as S3) and extract the metadata fields of the files. The output of the crawler consists of one or more metadata tables that are defined in your Data Catalog. When the crawler runs, the first classifier in your list to successfully recognize your data store is used to create a schema for your table. AWS Glue provides built-in classifiers to infer schemas from common files with formats that include JSON, CSV, and Apache Avro.

  1. In the AWS Management Console, open the AWS Glue service and choose Crawler to setup a crawler job.
  2. Choose Add crawler to launch a wizard to setup the crawler job. For Crawler name, enter a relevant name. Then select Next.

    Figure 11: Enter "Crawler name," and select "Next"

    Figure 11: Enter “Crawler name,” and select “Next”

  3. For Choose a data store, select S3 and include the path of the S3 bucket that stores your AWS WAF logs, which you made note of in step 1.3. Then choose Next.

    Figure 12: Choose a data store

    Figure 12: Choose a data store

  4. When you’re given the option to add another data store, choose No.
  5. Then, choose Create an IAM role and enter a name. The role grants access to the S3 bucket for the AWS Glue service to access the log files.

    Figure 13: Choose "Create an IAM role," and enter a name

    Figure 13: Choose “Create an IAM role,” and enter a name

  6. Next, set the frequency to Run on demand. You can also schedule the crawler to run periodically to make sure any changes in the file structure are reflected in your data catalog.

    Figure 14: Set the "Frequency" to "Run on demand"

    Figure 14: Set the “Frequency” to “Run on demand”

  7. For output, choose the database in which the Athena table is to be created and add a prefix to identify your table name easily. Select Next.

    Figure 15: Choose the database, and enter a prefix

    Figure 15: Choose the database, and enter a prefix

  8. Review all the options you’ve selected for the crawler job and complete the wizard by selecting the Finish button.
  9. Now that the crawler job parameters are set up, on the left panel of the console, choose Crawlers to select your job and then choose Run crawler. The job creates an Amazon Athena table. The duration depends on the size of the log files.

    Figure 16: Choose "Run crawler" to create an Amazon Athena table

    Figure 16: Choose “Run crawler” to create an Amazon Athena table

  10. To see the Amazon Athena table created by the AWS Glue crawler job, from the AWS Management Console, open the Amazon Athena service. You can filter by your table name prefix.
      1. To view the data, choose Preview table. This displays the table data with certain fields showing data in JSON object structure.
    Figure 17: Choose "Preview table" to view the data

    Figure 17: Choose “Preview table” to view the data

Step four: Create visualizations using Amazon QuickSight

  1. From the AWS Management Console, open Amazon QuickSight.
  2. In the Amazon QuickSight window, in the top left, choose New Analysis. Choose New Data set, and for the data source choose Athena. Enter an appropriate name for the data source name and choose Create data source.

    Figure 18: Enter the "Data source name," and choose "Create data source"

    Figure 18: Enter the “Data source name,” and choose “Create data source”

  3. Next, choose Use custom SQL to extract all the fields in the JSON object using the following SQL query:
    
        ```
        with d as (select
        waf.timestamp,
            waf.formatversion,
            waf.webaclid,
            waf.terminatingruleid,
            waf.terminatingruletype,
            waf.action,
            waf.httpsourcename,
            waf.httpsourceid,
            waf.HTTPREQUEST.clientip as clientip,
            waf.HTTPREQUEST.country as country,
            waf.HTTPREQUEST.httpMethod as httpMethod,
            map_agg(f.name,f.value) as kv
        from sampledb.jsonwaflogs_useast1 waf,
        UNNEST(waf.httprequest.headers) as t(f)
        group by 1,2,3,4,5,6,7,8,9,10,11)
        select d.timestamp,
            d.formatversion,
            d.webaclid,
            d.terminatingruleid,
            d.terminatingruletype,
            d.action,
            d.httpsourcename,
            d.httpsourceid,
            d.clientip,
            d.country,
            d.httpMethod,
            d.kv['Host'] as host,
            d.kv['User-Agent'] as UA,
            d.kv['Accept'] as Acc,
            d.kv['Accept-Language'] as AccL,
            d.kv['Accept-Encoding'] as AccE,
            d.kv['Upgrade-Insecure-Requests'] as UIR,
            d.kv['Cookie'] as Cookie,
            d.kv['X-IMForwards'] as XIMF,
            d.kv['Referer'] as Referer
        from d;
        ```        
        

  4. To extract individual fields, copy the previous SQL query and paste it in the New custom SQL box, then choose Edit/Preview data.
    Figure 19: Paste the SQL query in "New custom SQL query"

    Figure 19: Paste the SQL query in “New custom SQL query”

    1. In the Edit/Preview data view, for Data source, choose SPICE, then choose Finish.

      Figure 20: Choose "Spice" and then "Finish"

      Figure 20: Choose “Spice” and then “Finish”

  5. Back in the Amazon Quicksight console, under the Fields section, select the drop-down menu and change the data type to Date.

    Figure 21: In the Amazon Quicksight console, change the data type to "Date"

    Figure 21: In the Amazon Quicksight console, change the data type to “Date”

  6. After you see the Date column appear, enter an appropriate name for the visualizations at the top of the page, then choose Save.

    Figure 22: Enter the name for the visualizations, and choose "Save"

    Figure 22: Enter the name for the visualizations, and choose “Save”

  7. You can now create various visualization dashboards with multiple visual types by using the drag-and-drop feature. You can drag and drop combinations of fields such as Action, Client IP, Country, Httpmethod, and User Agents. You can also add filters on Date to view dashboards for a specific timeline. Here are some sample screenshots:
    Figure 23: Visualization dashboard samples

    Figure 23a: Visualization dashboard samples

    Figure 23: Visualization dashboard samples

    Figure 23b: Visualization dashboard samples

    Figure 23: Visualization dashboard samples

    Figure 23c: Visualization dashboard samples

    Figure 23: Visualization dashboard samples

    Figure 23d: Visualization dashboard samples

Conclusion

You can enable AWS WAF logs to Amazon S3 buckets and analyze the logs while they are being streamed by configuring Amazon Kinesis Data Firehose. You can further enhance this solution by automating the streaming of data and using AWS Lambda for any data transformations based on your specific requirements. Using Amazon Athena and Amazon QuickSight makes it easy to analyze logs and build visualizations and dashboards for executive leadership teams. Using these solutions, you can go serverless and let AWS do the heavy lifting for you.

Author photo

Umesh Kumar Ramesh

Umesh is a Cloud Infrastructure Architect with Amazon Web Services. He delivers proof-of-concept projects, topical workshops, and lead implementation projects to various AWS customers. He holds a Bachelor’s degree in Computer Science & Engineering from National Institute of Technology, Jamshedpur (India). Outside of work, Umesh enjoys watching documentaries, biking, and practicing meditation.

Author photo

Muralidhar Ramarao

Muralidhar is a Data Engineer with the Amazon Payment Products Machine Learning Team. He has a Bachelor’s degree in Industrial and Production Engineering from the National Institute of Engineering, Mysore, India. Outside of work, he loves to hike. You will find him with his camera or snapping pictures with his phone, and always looking for his next travel destination.

Our data lake story: How Woot.com built a serverless data lake on AWS

Post Syndicated from Karthik Kumar Odapally original https://aws.amazon.com/blogs/big-data/our-data-lake-story-how-woot-com-built-a-serverless-data-lake-on-aws/

In this post, we talk about designing a cloud-native data warehouse as a replacement for our legacy data warehouse built on a relational database.

At the beginning of the design process, the simplest solution appeared to be a straightforward lift-and-shift migration from one relational database to another. However, we decided to step back and focus first on what we really needed out of a data warehouse. We started looking at how we could decouple our legacy Oracle database into smaller microservices, using the right tool for the right job. Our process wasn’t just about using the AWS tools. More, it was about having a mind shift to use cloud-native technologies to get us to our final state.

This migration required developing new extract, transform, load (ETL) pipelines to get new data flowing in while also migrating existing data. Because of this migration, we were able to deprecate multiple servers and move to a fully serverless data warehouse orchestrated by AWS Glue.

In this blog post, we are going to show you:

  • Why we chose a serverless data lake for our data warehouse.
  • An architectural diagram of Woot’s systems.
  • An overview of the migration project.
  • Our migration results.

Architectural and design concerns

Here are some of the design points that we considered:

  • Customer experience. We always start with what our customer needs, and then work backwards from there. Our data warehouse is used across the business by people with varying level of technical expertise. We focused on the ability for different types of users to gain insights into their operations and to provide better feedback mechanisms to improve the overall customer experience.
  • Minimal infrastructure maintenance. The “Woot data warehouse team” is really just one person—Chaya! Because of this, it’s important for us to focus on AWS services that enable us to use cloud-native technologies. These remove the undifferentiated heavy lifting of managing infrastructure as demand changes and technologies evolve.
  • Responsiveness to data source changes. Our data warehouse gets data from a range of internal services. In our existing data warehouse, any updates to those services required manual updates to ETL jobs and tables. The response times for these data sources are critical to our key stakeholders. This requires us to take a data-driven approach to selecting a high-performance architecture.
  • Separation from production systems. Access to our production systems is tightly coupled. To allow multiple users, we needed to decouple it from our production systems and minimize the complexities of navigating resources in multiple VPCs.

Based on these requirements, we decided to change the data warehouse both operationally and architecturally. From an operational standpoint, we designed a new shared responsibility model for data ingestion. Architecturally, we chose a serverless model over a traditional relational database. These two decisions ended up driving every design and implementation decision that we made in our migration.

As we moved to a shared responsibility model, several important points came up. First, our new way of data ingestion was a major cultural shift for Woot’s technical organization. In the past, data ingestion had been exclusively the responsibility of the data warehouse team and required customized pipelines to pull data from services. We decided to shift to “push, not pull”: Services should send data to the data warehouse.

This is where shared responsibility came in. For the first time, our development teams had ownership over their services’ data in the data warehouse. However, we didn’t want our developers to have to become mini data engineers. Instead, we had to give them an easy way to push data that fit with the existing skill set of a developer. The data also needed to be accessible by the range of technologies used by our website.

These considerations led us to select the following AWS services for our serverless data warehouse:

The following diagram shows at a high level how we use these services.

Tradeoffs

These components together met all of our requirements and enabled our shared responsibility model. However, we made few tradeoffs compared to a lift-and-shift migration to another relational database:

  • The biggest tradeoff was upfront effort vs. ongoing maintenance. We effectively had to start from scratch with all of our data pipelines and introduce a new technology into all of our website services, which required a concerted effort across multiple teams. Minimal ongoing maintenance was a core requirement. We were willing to make this tradeoff to take advantage of the managed infrastructure of the serverless components that we use.
  • Another tradeoff was balancing usability for nontechnical users vs. taking advantage of big data technologies. Making customer experience a core requirement helped us navigate the decision-making when considering these tradeoffs. Ultimately, only switching to another relational database would mean that our customers would have the same experience, not a better one.

Building data pipelines with Kinesis Data Firehose and Lambda

Because our site already runs on AWS, using an AWS SDK to send data to Kinesis Data Firehose was an easy sell to developers. Things like the following were considerations:

  • Direct PUT ingestion for Kinesis Data Firehose is natural for developers to implement, works in all languages used across our services, and delivers data to Amazon S3.
  • Using S3 for data storage means that we automatically get high availability, scalability, and durability. And because S3 is a global resource, it enables us to manage the data warehouse in a separate AWS account and avoid the complexity of navigating multiple VPCs.

We also consume data stored in Amazon DynamoDB tables. Kinesis Data Firehose again provided the core of the solution, this time combined with DynamoDB Streams and Lambda. For each DynamoDB table, we enabled DynamoDB Streams and then used the stream to trigger a Lambda function.

The Lambda function cleans the DynamoDB stream output and writes the cleaned JSON to Kinesis Data Firehose using boto3. After doing this, it converges with the other process and outputs the data to S3. For more information, see How to Stream Data from Amazon DynamoDB to Amazon Aurora using AWS Lambda and Amazon Kinesis Firehose on the AWS Database Blog.

Lambda gave us more fine-grained control and enabled us to move files between accounts:

  • We enabled S3 event notifications on the S3 bucket and created an Amazon SNS topic to receive notifications whenever Kinesis Data Firehose put an object in the bucket.
  • The SNS topic triggered a Lambda function, which took the Kinesis output and moved it to the data warehouse account in our chosen partition structure.

S3 event notifications can trigger Lambda functions, but we chose SNS as an intermediary because the S3 bucket and Lambda function were in separate accounts.

Migrating existing data with AWS DMS and AWS Glue

We needed to migrate data from our existing RDS database to S3, which we accomplished with AWS DMS. DMS natively supports S3 as a target, as described in the DMS documentation.

Setting this up was relatively straightforward. We exported data directly from our production VPC to the separate data warehouse account by tweaking the connection attributes in DMS. The string that we used was this:

"cannedAclForObjects=BUCKET_OWNER_FULL_CONTROL;compressionType=GZIP;addColumnName=true;”

This code gives ownership to the bucket owner (the destination data warehouse account), compresses the files to save on storage costs, and includes all column names. After the data was in S3, we used an AWS Glue crawler to infer the schemas of all exported tables and then compared against the source data.

With AWS Glue, some of the challenges we overcame were these:

  • Unstructured text data, such as forum and blog posts. DMS exports these to CSV. This approach conflicted with the commas present in the text data. We opted to use AWS Glue to export data from RDS to S3 in Parquet format, which is unaffected by commas because it encodes columns directly.
  • Cross-account exports. We resolved this by including the code

"glueContext._jsc.hadoopConfiguration().set("fs.s3.canned.acl", "BucketOwnerFullControl”)”

at the top of each AWS Glue job to grant bucket owner access to all S3 files produced by AWS Glue.

Overall, AWS DMS was quicker to set up and great for exporting large amounts of data with rule-based transformations. AWS Glue required more upfront effort to set up jobs, but provided better results for cases where we needed more control over the output.

If you’re looking to convert existing raw data (CSV or JSON) into Parquet, you can set up an AWS Glue job to do that. The process is described in the AWS Big Data Blog post Build a data lake foundation with AWS Glue and Amazon S3.

Bringing it all together with AWS Glue, Amazon Athena, and Amazon QuickSight

After data landed in S3, it was time for the real fun to start: actually working with the data! Can you tell I’m a data engineer? For me, a big part of the fun was exploring AWS Glue:

  • AWS Glue handles our ETL job scheduling.
  • AWS Glue crawlers manage the metadata in the AWS Glue Data Catalog.

Crawlers are the “secret sauce” that enables us to be responsive to schema changes. Throughout the pipeline, we chose to make each step as schema-agnostic as possible, which allows any schema changes to flow through until they reach AWS Glue.

However, raw data is not ideal for most of our business users, because it often has duplicates or incorrect data types. Most importantly, the data out of Firehose is in JSON format, but we quickly observed significant query performance gains from using Parquet format. Here, we used one of the performance tips in the Big Data Blog post Top 10 performance tuning tips for Amazon Athena.

With our shared responsibility model, the data warehouse and BI teams are responsible for the final processing of data into curated datasets ready for reporting. Using Lambda and AWS Glue enables these teams to work in Python and SQL (the core languages for Amazon data engineering and BI roles). It also enables them to deploy code with minimal infrastructure setup or maintenance.

Our ETL process is as follows:

  • Scheduled triggers.
  • Series of conditional triggers that control the flow of subsequent jobs that depend on previous jobs.
  • A similar pattern across many jobs of reading in the raw data, deduplicating the data, and then writing to Parquet. We centralized this logic by creating a Python library of functions and uploading it to S3. We then included that library in the AWS Glue job as an additional Python library. For more information on how to do this, see Using Python Libraries with AWS Glue in the AWS Glue documentation.

We also migrated complex jobs used to create reporting tables with business metrics:

  • The AWS Glue use of PySpark simplified the migration of these queries, because you can embed SparkSQL queries directly in the job.
  • Converting to SparkSQL took some trial and error, but ultimately required less work than translating SQL queries into Spark methods. However, for people on our BI team who had previously worked with Pandas or Spark, working with Spark dataframes was a natural transition. As someone who used SQL for several years before learning Python, I appreciate that PySpark lets me quickly switch back and forth between SQL and an object-oriented framework.

Another hidden benefit of using AWS Glue jobs is that the AWS Glue version of Python (like Lambda) already has boto3 installed. Thus, ETL jobs can directly use AWS API operations without additional configuration.

For example, some of our longer-running jobs created read inconsistency if a user happened to query that table while AWS Glue was writing data to S3. We modified the AWS Glue jobs to write to a temporary directory with Spark and then used boto3 to move the files into place. Doing this reduced read inconsistency by up to 90 percent. It was great to have this functionality readily available, which may not have been the case if we managed our own Spark cluster.

Comparing previous state and current state

After we had all the datasets in place, it was time for our customers to come on board and start querying. This is where we really leveled up the customer experience.

Previously, users had to download a SQL client, request a user name and password, set it up, and learn SQL to get data out. Now, users just sign in to the AWS Management Console through automatically provisioned IAM roles and run queries in their browser with Athena. Or if they want to skip SQL altogether, they can use our Amazon QuickSight account with accounts managed through our pre-existing Active Directory server.

Integration with Active Directory was a big win for us. We wanted to enable users to get up and running without having to wait for an account to be created or managing separate credentials. We already use Active Directory across the company for access to multiple resources. Upgrading to Amazon QuickSight Enterprise Edition enabled us to manage access with our existing AD groups and credentials.

Migration results

Our legacy data warehouse was developed over the course of five years. We recreated it as a serverless data lake using AWS Glue in about three months.

In the end, it took more upfront effort than simply migrating to another relational database. We also dealt with more uncertainty because we used many products that were relatively new to us (especially AWS Glue).

However, in the months since the migration was completed, we’ve gotten great feedback from data warehouse users about the new tools. Our users have been amazed by these things:

  • How fast Athena is.
  • How intuitive and beautiful Amazon QuickSight is. They love that no setup is required—it’s easy enough that even our CEO has started using it!
  • That Athena plus the AWS Glue Data Catalog have given us the performance gains of a true big data platform, but for end users it retains the look and feel of a relational database.

Summary

From an operational perspective, the investment has already started to pay off. Literally: Our operating costs have fallen by almost 90 percent.

Personally, I was thrilled that recently I was able to take a three-week vacation and didn’t get paged once, thanks to the serverless infrastructure. And for our BI engineers in addition to myself, the S3-centric architecture is enabling us to experiment with new technologies by integrating seamlessly with other services, such as Amazon EMR, Amazon SageMaker, Amazon Redshift Spectrum, and Lambda. It’s been exciting to see how these services have grown in the time since we’ve adopted them (for example, the recent AWS Glue launch of Amazon CloudWatch metrics and Athena’s launch of views).

We are thrilled that we’ve invested in technologies that continue to grow as we do. We are incredibly proud of our team for accomplishing this ambitious migration. We hope our experience can inspire other engineers to dive in to building a data lake of their own.

For additional information, see these similar AWS Big Data blog posts:


About the authors

Chaya Carey is a data engineer at Woot.com. At Woot, she’s responsible for managing the data warehouse and other scalable data solutions. Outside of work, she’s passionate about Seattle’s bar and restaurant scene, books, and video games.

 

 

 

Karthik Odapally is a senior solutions architect at AWS. His passion is to build cost-effective and highly scalable solutions on the cloud. In his spare time, he bakes cookies and cupcakes for family and friends here in the PNW. He loves vintage racing cars.