All posts by Radhika Ravirala

Amazon EMR Serverless cost estimator

Post Syndicated from Radhika Ravirala original https://aws.amazon.com/blogs/big-data/amazon-emr-serverless-cost-estimator/

Amazon EMR Serverless is a serverless option in Amazon EMR that makes it easy for data analysts and engineers to run applications using open-source big data analytics frameworks such as Apache Spark and Hive without configuring, managing, and scaling clusters or servers. You get all the features of the latest open-source frameworks with the performance-optimized runtime of Amazon EMR, and without having to plan and operate instances and clusters.

With Amazon EMR, you can run your analytics applications on dedicated EMR clusters, on existing Amazon Elastic Kubernetes Service (Amazon EKS) clusters, or using the new EMR Serverless deployment option where you don’t have to manage clusters or instances. When you build a Spark or Hive application using an Amazon EMR release, say Amazon EMR 6.8, you can run the application on EMR clusters, on EKS clusters using Amazon EMR on EKS, or using EMR Serverless without having to change the application.

To learn about the benefits of each deployment option in EMR Serverless, refer to What are some of the feature differences between EMR Serverless and Amazon EMR on EC2? in the Amazon EMR FAQ. You can also learn about the pricing for these options from the Amazon EMR pricing page. Many customers already run data analytics applications on EMR clusters, and find that the new serverless option is simpler and less expensive.

In this post, we discuss how you can estimate what it may cost to run an application that currently runs on EMR clusters using the new serverless option, and perform this analysis simply by using your current application metrics. This approach helps you evaluate and adopt the deployment option that is most cost effective for the application. However, the Amazon EMR pricing page doesn’t tell you how you can easily estimate the cost of running your existing EMR cluster applications on EMR Serverless. In the following sections, we describe an approach that enables you to do that.

Although the example in this post discusses how you can get a cost estimate for applications running on EMR clusters, you can also use the approach if you’re running a Spark or Hive application elsewhere, and want to estimate the cost of running it on EMR Serverless. For example, if you run self-managed Spark or Hive applications on Amazon Elastic Compute Cloud (Amazon EC2) clusters, or if you run Spark jobs on AWS Glue, we show you how you can use this approach to estimate the cost of running the application on EMR Serverless.

Estimating the cost of running applications on your EMR cluster

When you run applications on Amazon EMR clusters, you’re separately charged for the following:

  1. The Amazon EC2 price of running cluster instances (the price for the underlying servers)
  2. The price for Amazon Elastic Block Store (Amazon EBS) volumes, if you choose to attach EBS volumes
  3. The Amazon EMR price for the cluster instances

The total cost of running the cluster includes all three. There are a variety of Amazon EC2 pricing options you can choose from, including On Demand, 1-year and 3-year Reserved Instances, Capacity Savings Plans, and Spot Instances. The Amazon EC2 pricing option that you choose determines (a), the Amazon EC2 price. The cost of running the application on EMR clusters is the sum of (a), (b), and (c). You can compute this cost for the lifetime of running the cluster (from the time a cluster is started to the time the cluster is terminated), or for a specific period of time while the cluster is running. We recommend running the former, that is to compute (a), (b), and (c) from the time the cluster is started to the time the cluster is terminated. If you have set up tags for your Amazon EMR cluster, you can easily get the detailed cost report for your EMR cluster using AWS Cost Explorer.

Estimating the cost of running the same applications using EMR Serverless

When you run the same applications using EMR Serverless, you pay for the amount of vCPU, memory, and storage resources consumed by your applications. There is no separate charge for EC2 instances or EBS volumes. And, you only pay for the resources that are actually used by the application and not for EC2 instances provisioned. For example, when running applications on EMR clusters, when an EC2 instance in the cluster is partially utilized (say, 16 GB memory is used out of 64 GB available on the instance, or 4 VCPUs are utilized out of 16 VCPUs available on the instance), or when the EC2 instance is idle (for example, when the instance is initializing or waiting for an application to start), you still incur Amazon EC2, Amazon EMR, and Amazon EBS charges for the full EC2 instance and for the duration that the instance is active in the EMR cluster. With EMR Serverless, you only pay for the vCPU, memory, and storage resources used from the time workers start to run your Spark or Hive job until the time they stop.

To estimate the cost of running your EMR Spark or Hive application on EMR Serverless, you need to first aggregate the total compute vCore-seconds, memory MB-seconds, and storage GB-seconds consumed by each YARN application that ran on your EMR cluster, from the time the YARN container is started to the time the YARN container is terminated. You can obtain these metrics from YARN resource manager logs accessible from YARN timeline server or YARN CLI tools. You can retrieve the running time, vCore-seconds, and memory MB-seconds used by each of the YARN applications.

If your cluster only runs Spark applications, there is a simpler approach to estimate. Instead of obtaining the vCore-seconds, memory MB-seconds, and storage GB-seconds from YARN resource manager logs, you can obtain these metrics from Spark event logs. We have provided the tool EMR Servless Estimator, which can parse the Spark event logs for your applications and provide the aggregated metrics for your cost estimate.

After you get the usage metrics for your application, you can compute the estimated EMR Serverless cost using EMR Serverless pricing. Simply multiple your aggregated vCore-seconds with EMR Serverless vCPU pricing per second, multiply aggregated memory MB-seconds with the EMR Serverless memory pricing per second, and multiply storage GB-seconds with the EMR Serverless storage pricing per second (only if the storage requirements exceed 20 GB per worker). By adding up these costs for vCPU, memory, and storage, you can compare the cost of running the same applications on EMR Serverless.

In this approach, we assume that the performance of the application is equivalent. In other words, the size (vCPU, memory) and runtime duration for each YARN container on the EMR cluster is the same as the number, size, and runtime duration of workers needed to run the application on EMR Serverless. We make this assumption because the EMR runtime for an EMR release is the same regardless of whether the application is run on an EMR cluster or on EMR Serverless.

Example

Let’s do a sample cost comparison of Amazon EMR on EC2 and EMR Serverless using a single cluster.

We ran a Spark application on an EMR cluster with five nodes (one primary, two core, and two task and gathered YARN metrics using the YARN CLI. The following code shows our aggregate resource allocation.

aggregate resource allocation

We computed the Amazon EMR on EC2 costs as follows:

  • Cluster instances
    • Primary: m5.2xlarge:1
    • Core: r5.2xlarge:2
    • Task: r5.2xlarge:2
  • Cluster runtime = 18 min
  • Instance on-demand cost
    • m5.2xlarge (8 vCPU, 32 GiB memory)
      • Amazon EC2: $0.384/hr
      • Amazon EMR incremental: $0.096/hr
    • r5.2xlarge (8 vCPU, 64 GiB memory)
      • Amazon EC2: $0.504/hr
      • Amazon EMR incremental: $0.126/hr

The following is the EMR on EC2 cost calculation:

  • Amazon EMR cost = ((1 primary node x $0.096/hr) + (2 core nodes x $ 0.126/hr) + (2 task nodes x $0.126/hr)) = $0.60
  • Amazon EC2 cost = ((1 primary x $0.384 /hr ) + (2 core nodes x $0.504/hr) + (2 task nodes x $0.504/hr)) = $2.40
  • Amazon EMR on EC2 cluster cost/hr = $0.6 + $2.40 = $3/hr * 8/60 hr (runtime in hrs)

The total Amazon EMR on Amazon EC2 cost is $0.40/hr.

To calculate EMR Serverless cost, aggregate the vCore-seconds and memory MB-seconds for the same application you ran previously on the EMR cluster. Then multiply those numbers with the EMR Serverless vCPU and memory price. Our calculation results are as follows:

  • Total_vcore_seconds = 5737
  • Total_Memory_mb_seconds = 120156631
  • Convert to vCPU/hr and memory-GB/hr:
    • Aggregated vCPU/hr: 5737/(60*60)=1.59
    • Aggregated memory/hr: 120156631/(60*60*1024)=32.5
  • Total vCPU-hours cost = 33 vCPU * 0.052624 VCPU/hr * 8/60 = $0.23
  • Total memory GB cost = 1.59 MB * 0.0057785 memory/hr * 8/60 = $0.00122

In this example, the total EMR Serverless cost is $0.231, a 42% reduction.

Conclusion

Amazon EMR Serverless is a recently launched serverless option in Amazon EMR that makes it easy to run open-source frameworks such as Spark and Hive without configuring, managing, and scaling clusters. Customers that already use EMR clusters want to understand how they can estimate the cost of running their EMR applications using EMR Serverless. We have presented an approach that you can use to conduct a cost analysis based on analyzing application metrics from your EMR clusters.

We hope you give this a try, and share your feedback with us!


About the authors

Radhika Ravirala is the Principal Product Manager at AWS.

Matthew Liem is the Senior Solution Architecture Manager at AWS.

Optimize performance and reduce costs for network analytics with VPC Flow Logs in Apache Parquet format

Post Syndicated from Radhika Ravirala original https://aws.amazon.com/blogs/big-data/optimize-performance-and-reduce-costs-for-network-analytics-with-vpc-flow-logs-in-apache-parquet-format/

VPC Flow Logs help you understand network traffic patterns, identify security issues, audit usage, and diagnose network connectivity on AWS. Customers often route their VPC flow logs directly to Amazon Simple Storage Service (Amazon S3) for long-term retention. You can then use a custom format conversion application to convert these text files into an Apache Parquet format to optimize the analytical processing of the log data and reduce the cost of log storage. This custom format conversion step added complexity, time to insight, and costs to the VPC flow log traffic analytics. Until today, VPC flow logs were delivered to Amazon S3 as raw text files in GZIP format.

Today, we’re excited to announce a new feature that delivers VPC flow logs in the Apache Parquet format, making it easier, faster, and more cost-efficient to analyze your VPC flow logs stored in Amazon S3. You can also deliver VPC flow logs to Amazon S3 with Hive-compatible S3 prefixes partitioned by the hour.

Apache Parquet is an open-source file format that stores data efficiently in columnar format, provides different encoding types, and supports predicate filtering. With good compression ratios and efficient encoding, VPC flow logs stored in Parquet reduce your Amazon S3 storage costs. When querying flow logs persisted in Parquet format with analytic frameworks, non-relevant data is skipped, requiring fewer reads on Amazon S3 and thereby improving query performance. To reduce query running time and cost with Amazon Athena and Amazon Redshift Spectrum, Apache Parquet is often the recommended file format.

In this post, we explore this new feature and how it can help you run performant queries on your flow logs with Athena.

Create flow logs with Parquet file format

To take advantage of this feature, simply create a new VPC flow log subscription with Amazon S3 as the destination using the AWS Management Console, AWS Command Line Interface (AWS CLI), or API. On the console, when creating new a VPC flow log subscription with Amazon S3, you can select one or more of the following options:

  • Log file format
  • Hive-compatible S3 prefixes
  • Partition logs by time

We now explore how each of these options can make processing and storage of flow logs more efficient

Apache Parquet formatted files

By default, your logs are delivered in text format. To change to Parquet, for Log file format, select Parquet. This delivers your VPC flow logs to Amazon S3 in the Apache Parquet format.

Note the following considerations:

  • You can’t change existing flow logs to deliver logs in Parquet format. You need to create a new VPC flow log subscription with Parquet as the log file format.
  • Consider using a higher maximum aggregation interval (10 minutes) when aggregating flow packets to ensure larger Parquet files on Amazon S3.
  • Refer to Amazon CloudWatch pricing for pricing of log delivery in Apache Parquet format for VPC flow logs

Hive-compatible partitions

Partitioning is a technique to organize your data to improve the efficiency of your query engine. Partitions aligned with the columns that are frequently used in the query filters can significantly lower your query response time. You can now specify that your flow logs be organized in Hive-compatible format. This allows you to run the MSCK REPAIR command in Athena to quickly and easily add new partitions as they get delivered into Amazon S3. Simply select Enable for Hive-compatible S3 prefix to set this up. This delivers the flow logs to Amazon S3 in the following path:

s3://my-flow-log-bucket/my-custom-flow-logs/AWSLogs/aws-account-id=123456789012/aws-service=vpcflowlogs/aws-region=us-east-1/year=2021/month=10/day=07/123456789012_vpcflowlogs_us-east-1_fl-06a0eeb1087d806aa_20211007T1930Z_d5ab7c14.log.parquet

Per-hour partitions

You can also organize your flow logs at a much more granular level by adding per-hour partitions. You should enable this feature if you constantly need to query large volumes of logs with a specific time frame as the predicate. Querying logs only during certain hours results in less data scanned, which translates to lower cost per query with engines such as Athena and Redshift Spectrum.

You can also set per-hour partitions via an API or the AWS CLI using the --destination-options parameter in create-flow-logs:

aws ec2 create-flow-logs \
--resource-type VPC \
--resource-ids vpc-001122333 \
--traffic-type ALL \
--log-destination-type s3 \
--log-destination arn:aws:s3:::my-flow-log-bucket/my-custom-flow-logs/ \
--destination-options FileFormat=parquet,HiveCompatiblePartitions=True, PerHourPartition=True

The following is a sample flow log file deposited into an hourly bucket. By default, the flow logs in Parquet are compressed using Gzip format, which has the highest compression ratio compared to other compression formats.

s3://my-flow-log-bucket/my-custom-flow-logs/AWSLogs/aws-account-id=123456789012/aws-service=vpcflowlogs/aws-region=us-east-1/year=2021/month=10/day=07/hour=19/123456789012_vpcflowlogs_us-east-1_fl-06a0eeb1087d806aa_20211007T1930Z_d5ab7c14.log.parquet

Query with Athena

You can use the Athena integration for VPC Flow Logs from the Amazon VPC console to automate the Athena setup and query VPC flow logs in Amazon S3. This integration has now been extended to support these new flow log delivery options to Amazon S3.

To demonstrate querying flow logs in Parquet and in plain text in this blog, let’s start from the Amazon Athena console.  We begin by creating an external table pointing to flow logs in Parquet.

Note that this feature supports specifying flow logs fields in Parquet’s native data types. This eliminates the need for you to cast your fields when querying the traffic logs.

Then run MSCK REPAIR TABLE.

Let’s run a sample query on these Parquet-based flow logs.

Now, let’s create a table for flow logs delivered in plain text.

We add the partitions using the ALTER TABLE statement in Athena.

Run a simple flow logs query and note the time it took to run the query.

The Athena query run time with flow logs in Parquet (1.16 seconds) is much faster than the run time with flow logs in plain text (2.51 seconds).

For benchmarks that further describe the cost savings and performance improvements from persisting data in Parquet in granular partitions, see Top 10 Performance Tuning Tips for Amazon Athena.

Summary

You can now deliver your VPC flow logs to Amazon S3 with three new options:

  • In Apache Parquet formatted files
  • With Hive-compatible S3 prefixes
  • In hourly partitioned files

These delivery options make it faster, easier, and more cost-efficient to store and run analytics on your VPC flow logs. To learn more, visit VPC Flow Logs documentation. We hope you will give this feature a try and share your experience with us. Please send feedback to the AWS forum for Amazon VPC or through your usual AWS support contacts.


About the Authors

Radhika Ravirala is a Principal Streaming Architect at Amazon Web Services, where she helps customers craft distributed streaming applications using Amazon Kinesis and Amazon MSK. In her free time, she enjoys long walks with her dog, playing board games, and reading widely.

Vaibhav Katkade is a Senior Product Manager in the Amazon VPC team. He is interested in areas of network security and cloud networking operations. Outside of work, he enjoys cooking and the outdoors.

Crafting serverless streaming ETL jobs with AWS Glue

Post Syndicated from Radhika Ravirala original https://aws.amazon.com/blogs/big-data/crafting-serverless-streaming-etl-jobs-with-aws-glue/

Organizations across verticals have been building streaming-based extract, transform, and load (ETL) applications to more efficiently extract meaningful insights from their datasets. Although streaming ingest and stream processing frameworks have evolved over the past few years, there is now a surge in demand for building streaming pipelines that are completely serverless. Since 2017, AWS Glue has made it easy to prepare and load your data for analytics using ETL.

In this post, we dive into streaming ETL in AWS Glue, a feature that allows you to build continuous ETL applications on streaming data. Streaming ETL in AWS Glue is based on Apache Spark’s Structured Streaming engine, which provides a fault-tolerant, scalable, and easy way to achieve end-to-end stream processing with exactly-once semantics. This post walks you through an example of building a stream processing pipeline using AWS Glue that includes reading streaming data from Amazon Kinesis Data Streams, schema discovery, running streaming ETL, and writing out the results to a sink.

Serverless streaming ETL architecture

For this post, our use case is relevant to our current situation with the COVID-19 pandemic. Ventilators are in high demand and are increasingly used in different settings: hospitals, nursing homes, and even private residences. Ventilators generate data that must be monitored, and an increase in ventilator usage means there is a tremendous amount of streaming data that needs to be processed promptly, so patients can be attended to as quickly as possible as the need arises. In this post, we build a streaming ETL job on ventilator metrics and enhance the data with details to raise the alert level if the metrics fall outside of the normal range. After you enrich the data, you can use it to visualize on monitors.

In our streaming ETL architecture, a Python script generates sample ventilator metrics and publishes them as a stream into Kinesis Data Streams. We create a streaming ETL job in AWS Glue that consumes continuously generated ventilator metrics in micro-batches, applies transformations, performs aggregations, and delivers the data to a sink, so the results can be visualized or used in downstream processes.

Because businesses often augment their data lakes built on Amazon Simple Storage Service (Amazon S3) with streaming data, our first use case applies transformations on the streaming JSON data ingested via Kinesis Data Streams and loads the results in Parquet format to an Amazon S3 data lake. After ingested to Amazon S3, you can query the data with Amazon Athena and build visual dashboards using Amazon QuickSight.

For the second use case, we ingest the data from Kinesis Data Streams, join it with reference data in Amazon DynamoDB to calculate alert levels, and write the results to an Amazon DynamoDB sink. This approach allows you to build near real-time dashboards with alert notifications.

The following diagram illustrates this architecture.

AWS Glue streaming ETL jobs

With AWS Glue, you can now create ETL pipelines on streaming data using continuously running jobs. You can ingest streaming data from Kinesis Data Streams and Amazon Managed Streaming for Kafka (Amazon MSK). AWS Glue streaming jobs can perform aggregations on data in micro-batches and deliver the processed data to Amazon S3. You can read from the data stream and write to Amazon S3 using the AWS Glue DynamicFrame API. You can also write to arbitrary sinks using native Apache Spark Structured Streaming APIs.

The following sections walk you through building a streaming ETL job in AWS Glue.

Creating a Kinesis data stream

First, we need a streaming ingestion source to consume continuously generated streaming data. For this post, we create a Kinesis data stream with five shards, which allows us to push 5,000 records per second into the stream.

  1. On the Amazon Kinesis dashboard, choose Data streams.
  2. Choose Create data stream.
  3. For Data stream name, enter ventilatorsstream.
  4. For Number of open shards, choose 5.

If you prefer to use the AWS Command Line Interface (AWS CLI), you can create the stream with the following code:

aws kinesis create-stream \
    --stream-name ventilatorstream \
    --shard-count 5

Generating streaming data

We can synthetically generate ventilator data in JSON format using a simple Python application (see the GitHub repo) or the Kinesis Data Generator (KDG).

Using a Python-based data generator

To generate streaming data using a Python script, you can run the following command from your laptop or Amazon Elastic Compute Cloud (Amazon EC2) instance. Make sure you have installed the faker library on your system and set up the boto3 credentials correctly before you run the script.

python3 generate_data.py --streamname glue_ventilator_stream

Using the Kinesis Data Generator

Alternatively, you can also use the Kinesis Data Generator with the ventilator template available on the GitHub repo. The following screenshot shows the template on the KDG console.

We start pushing the data after we create our AWS Glue streaming job.

Defining the schema

We need to specify a schema for our streaming data, which we can do one of two ways:

  • Retrieve a small batch of the data (before starting your streaming job) from the streaming source, infer the schema in batch mode, and use the extracted schema for your streaming job
  • Use the AWS Glue Data Catalog to manually create a table

For this post, we use the AWS Glue Data Catalog to create a ventilator schema.

  1. On the AWS Glue console, choose Data Catalog.
  2. Choose Tables.
  3. From the Add Table drop-down menu, choose Add table manually.
  4. For the table name, enter ventilators_table.
  5. Create a database with the name ventilatordb.
  6. Choose Kinesis as the type of source.
  7. Enter the name of the stream and https://kinesis.<aws-region>.amazonaws.com.
  8. For the classification, choose JSON.
  9. Define the schema according to the following table.
Column name Data type
ventilatorid int
eventtime string
serialnumber string
pressurecontrol int
o2stats int
minutevolume int
manufacturer string

 

  1. Choose Finish.

Creating an AWS Glue streaming job to hydrate a data lake on Amazon S3

With the streaming source and schema prepared, we’re now ready to create our AWS Glue streaming jobs. We first create a job to ingest data from the streaming source using AWS Glue DataFrame APIs.

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Add job.
  3. For Name, enter a UTF-8 String with no more than 255 characters.
  4. For IAM role¸ specify a role that is used for authorization to resources used to run the job and access data stores. Because streaming jobs require connecting to sources and sinks, you need to make sure that the AWS Identity and Access Management (IAM) role has permissions to read from Kinesis Data Stream, write to Amazon S3 and read, write to Amazon DynamoDB. Refer to Managing Access Permissions for AWS Glue Resources for details.
  5. For Type, choose Spark Streaming.
  6. For Glue Version, choose Spark 2.4, Python 3.
  7. For This job runs, select A new script authored by you.

You can have AWS Glue generate the streaming ETL code for you, but for this post, we author one from scratch.

  1. For Script file name, enter GlueStreaming-S3.
  2. For S3 path where script is stored, enter your S3 path.
  3. For Job bookmark, choose Disable.

For this post, we use the checkpointing mechanism of AWS Glue to keep track of the data read instead of a job bookmark.

  1. For Monitoring options, select Job metrics and Continuous logging.
  2. For Log filtering, select Standard filter and Spark UI.
  3. For Amazon S3 prefix for Spark event logs, enter the S3 path for the event logs.
  4. For Job parameters, enter the following key-values:
    1. –output path – The S3 path where the final aggregations are persisted
    2. –aws_region – The Region where you run the job

  5. Skip the connections part and choose Save and edit the script.

Streaming ETL to an Amazon S3 sink

We use the AWS Glue DynamicFrameReader class’s from_catalog method to read the streaming data. We specify the table name that has been associated with the data stream as the source of data (see the section Defining the schema). We add additional_options to indicate the starting position to read from in Kinesis Data Streams. TRIM_HORIZON allows us to start reading from the oldest record in the shard.

# Read from Kinesis Data Stream
sourceData = glueContext.create_data_frame.from_catalog( \
    database = "ventilatordb", \
    table_name = "ventilatortable", \
    transformation_ctx = "datasource0", \
    additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})

In the preceding code, sourceData represents a streaming DataFrame. We use the foreachBatch API to invoke a function (processBatch) that processes the data represented by this streaming DataFrame. The processBatch function receives a static DataFrame, which holds streaming data for a window size of 100s (default). It creates a DynamicFrame from the static DataFrame and writes out partitioned data to Amazon S3. See the following code:

glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds", "checkpoint_locationation": checkpoint_location})

To transform the DynamicFrame to fix the data type for eventtime (from string to timestamp) and write the ventilator metrics to Amazon S3 in Parquet format, enter the following code:

def processBatch(data_frame, batchId):
    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    hour = now.hour
    minute = now.minute
    if (data_frame.count() > 0):
        dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
        apply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \
            ("ventilatorid", "long", "ventilatorid", "long"), \
            ("eventtime", "string", "eventtime", "timestamp"), \
            ("serialnumber", "string", "serialnumber", "string"), \
            ("pressurecontrol", "long", "pressurecontrol", "long"), \
            ("o2stats", "long", "o2stats", "long"), \
            ("minutevolume", "long", "minutevolume", "long"), \
            ("manufacturer", "string", "manufacturer", "string")],\
            transformation_ctx = "apply_mapping")

        dynamic_frame.printSchema()

        # Write to S3 Sink
        s3path = s3_target + "/ingest_year=" + "{:0>4}".format(str(year)) + "/ingest_month=" + "{:0>2}".format(str(month)) + "/ingest_day=" + "{:0>2}".format(str(day)) + "/ingest_hour=" + "{:0>2}".format(str(hour)) + "/"
        s3sink = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = "s3", connection_options = {"path": s3path}, format = "parquet", transformation_ctx = "s3sink")

Putting it all together

In the Glue ETL code editor, enter the following code, then save and run the job:

import sys
import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, \
                            ['JOB_NAME', \
                            'aws_region', \
                            'output_path'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# S3 sink locations
aws_region = args['aws_region']
output_path = args['output_path']

s3_target = output_path + "ventilator_metrics"
checkpoint_location = output_path + "cp/"
temp_path = output_path + "temp/"


def processBatch(data_frame, batchId):
    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    hour = now.hour
    minute = now.minute
    if (data_frame.count() > 0):
        dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
        apply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \
            ("ventilatorid", "long", "ventilatorid", "long"), \
            ("eventtime", "string", "eventtime", "timestamp"), \
            ("serialnumber", "string", "serialnumber", "string"), \
            ("pressurecontrol", "long", "pressurecontrol", "long"), \
            ("o2stats", "long", "o2stats", "long"), \
            ("minutevolume", "long", "minutevolume", "long"), \
            ("manufacturer", "string", "manufacturer", "string")],\
            transformation_ctx = "apply_mapping")

        dynamic_frame.printSchema()

        # Write to S3 Sink
        s3path = s3_target + "/ingest_year=" + "{:0>4}".format(str(year)) + "/ingest_month=" + "{:0>2}".format(str(month)) + "/ingest_day=" + "{:0>2}".format(str(day)) + "/ingest_hour=" + "{:0>2}".format(str(hour)) + "/"
        s3sink = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = "s3", connection_options = {"path": s3path}, format = "parquet", transformation_ctx = "s3sink")

# Read from Kinesis Data Stream
sourceData = glueContext.create_data_frame.from_catalog( \
    database = "ventilatordb", \
    table_name = "ventilatortable", \
    transformation_ctx = "datasource0", \
    additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})

sourceData.printSchema()

glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds", "checkpoint_locationation": checkpoint_location})
job.commit()

Querying with Athena

When the processed streaming data is written in Parquet format to Amazon S3, we can run queries on Athena. Run the AWS Glue crawler on the Amazon S3 location where the streaming data is written out. The following screenshot shows our query results.

For instructions on building visual dashboards with the streaming data in Amazon S3, see Quick Start: Create an Analysis with a Single Visual Using Sample Data. The following dashboards show distribution of metrics, averages, and alerts based on anomalies on an hourly basis, but you can create more advanced dashboards with much granular (minute) intervals.

Streaming ETL to a DynamoDB sink

For the second use case, we transform the streaming data as it arrives without micro-batching and persist the data to a DynamoDB table. Scripts to create DynamoDB tables are available in the GitHub repo. We use Apache Spark’s Structured Streaming API to read ventilator-generated data from the data stream, join it with reference data for normal metrics range in a DynamoDB table, compute the status based on the deviation from normal metric values, and write the processed data to a DynamoDB table. See the following code:

import sys
import datetime
import base64
import decimal
import boto3
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, \
                            ['JOB_NAME', \
                            'aws_region', \
                            'checkpoint_location', \
                            'dynamodb_sink_table', \
                            'dynamodb_static_table'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read parameters
checkpoint_location = args['checkpoint_location']
aws_region = args['aws_region']

# DynamoDB config
dynamodb_sink_table = args['dynamodb_sink_table']
dynamodb_static_table = args['dynamodb_static_table']

def write_to_dynamodb(row):
    '''
    Add row to DynamoDB.
    '''
    dynamodb = boto3.resource('dynamodb', region_name=aws_region)
    start = str(row['window'].start)
    end = str(row['window'].end)
    dynamodb.Table(dynamodb_sink_table).put_item(
      Item = { 'ventilatorid': row['ventilatorid'], \
                'status': str(row['status']), \
                'start': start, \
                'end': end, \
                'avg_o2stats': decimal.Decimal(str(row['avg_o2stats'])), \
                'avg_pressurecontrol': decimal.Decimal(str(row['avg_pressurecontrol'])), \
                'avg_minutevolume': decimal.Decimal(str(row['avg_minutevolume']))})

dynamodb_dynamic_frame = glueContext.create_dynamic_frame.from_options( \
    "dynamodb", \
    connection_options={
    "dynamodb.input.tableName": dynamodb_static_table,
    "dynamodb.throughput.read.percent": "1.5"
  }
)

dynamodb_lookup_df = dynamodb_dynamic_frame.toDF().cache()

# Read from Kinesis Data Stream
streaming_data = spark.readStream \
                    .format("kinesis") \
                    .option("streamName","glue_ventilator_stream") \
                    .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") \
                    .option("startingPosition", "TRIM_HORIZON") \
                    .load()

# Retrieve Sensor columns and do a simple projection
ventilator_fields = streaming_data \
    .select(from_json(col("data") \
    .cast("string"),glueContext.get_catalog_schema_as_spark_schema("ventilatordb","ventilators_table")) \
    .alias("ventilatordata")) \
    .select("ventilatordata.*") \
    .withColumn("event_time", to_timestamp(col('eventtime'), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("ts", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))

# Stream static join, ETL to augment with status
ventilator_joined_df = ventilator_fields.join(dynamodb_lookup_df, "ventilatorid") \
    .withColumn('status', when( \
    ((ventilator_fields.o2stats < dynamodb_lookup_df.o2_stats_min) | \
    (ventilator_fields.o2stats > dynamodb_lookup_df.o2_stats_max)) & \
    ((ventilator_fields.pressurecontrol < dynamodb_lookup_df.pressure_control_min) | \
    (ventilator_fields.pressurecontrol > dynamodb_lookup_df.pressure_control_max)) & \
    ((ventilator_fields.minutevolume < dynamodb_lookup_df.minute_volume_min) | \
    (ventilator_fields.minutevolume > dynamodb_lookup_df.minute_volume_max)), "RED") \
    .when( \
    ((ventilator_fields.o2stats >= dynamodb_lookup_df.o2_stats_min) |
    (ventilator_fields.o2stats <= dynamodb_lookup_df.o2_stats_max)) & \
    ((ventilator_fields.pressurecontrol >= dynamodb_lookup_df.pressure_control_min) | \
    (ventilator_fields.pressurecontrol <= dynamodb_lookup_df.pressure_control_max)) & \
    ((ventilator_fields.minutevolume >= dynamodb_lookup_df.minute_volume_min) | \
    (ventilator_fields.minutevolume <= dynamodb_lookup_df.minute_volume_max)), "GREEN") \
    .otherwise("ORANGE"))

ventilator_joined_df.printSchema()

# Drop the normal metric values
ventilator_transformed_df = ventilator_joined_df \
                            .drop('eventtime', 'o2_stats_min', 'o2_stats_max', \
                            'pressure_control_min', 'pressure_control_max', \
                            'minute_volume_min', 'minute_volume_max')

ventilator_transformed_df.printSchema()

ventilators_df = ventilator_transformed_df \
    .groupBy(window(col('ts'), '10 minute', '5 minute'), \
    ventilator_transformed_df.status, ventilator_transformed_df.ventilatorid) \
    .agg( \
    avg(col('o2stats')).alias('avg_o2stats'), \
    avg(col('pressurecontrol')).alias('avg_pressurecontrol'), \
    avg(col('minutevolume')).alias('avg_minutevolume') \
    )

ventilators_df.printSchema()

# Write to DynamoDB sink
ventilator_query = ventilators_df \
    .writeStream \
    .foreach(write_to_dynamodb) \
    .outputMode("update") \
    .option("checkpointLocation", checkpoint_location) \
    .start()

ventilator_query.awaitTermination()

job.commit()

After the above code is run, ventilator metric aggregations get persisted in the Amazon DynamoDB table as follows. You can build custom user interface applications with the data in Amazon DynamoDB to create dashboards.

Conclusion

Streaming applications have become a core component of data lake architectures. With AWS Glue streaming, you can create serverless ETL jobs that run continuously, consuming data from streaming services like Kinesis Data Streams and Amazon MSK. You can load the results of streaming processing into an Amazon S3-based data lake, JDBC data stores, or arbitrary sinks using the Structured Streaming API.

For more information about streaming AWS Glue ETL jobs, see the following:

We encourage you to build a serverless streaming application using AWS Glue streaming ETL and share your experience with us. If you have any questions or suggestions, share them in the comments.


About the Author


Radhika Ravirala is a specialist solutions architect at Amazon Web Services, where she helps customers craft distributed analytics applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley.