Tag Archives: AWS Glue

Architecting a Data Lake for Higher Education Student Analytics

Post Syndicated from Craig Jordan original https://aws.amazon.com/blogs/architecture/architecting-data-lake-for-higher-education-student-analytics/

One of the keys to identifying timely and impactful actions is having enough raw material to work with. However, this up-to-date information typically lives in the databases that sit behind several different applications. One of the first steps to finding data-driven insights is gathering that information into a single store that an analyst can use without interfering with those applications.

For years, reporting environments have relied on a data warehouse stored in a single, separate relational database management system (RDBMS). But now, due to the growing use of Software as a service (SaaS) applications and NoSQL database options, data may be stored outside the data center and in formats other than tables of rows and columns. It’s increasingly difficult to access the data these applications maintain, and a data warehouse may not be flexible enough to house the gathered information.

For these reasons, reporting teams are building data lakes, and those responsible for using data analytics at universities and colleges are no different. However, it can be challenging to know exactly how to start building this expanded data repository so it can be ready to use quickly and still expandable as future requirements are uncovered. Helping higher education institutions address these challenges is the topic of this post.

About Maryville University

Maryville University is a nationally recognized private institution located in St. Louis, Missouri, and was recently named the second fastest growing private university by The Chronicle of Higher Education. Even with its enrollment growth, the university is committed to a highly personalized education for each student, which requires reliable data that is readily available to multiple departments. University leaders want to offer the right help at the right time to students who may be having difficulty completing the first semester of their course of study. To get started, the data experts in the Office of Strategic Information and members of the IT Department needed to create a data environment to identify students needing assistance.

Critical data sources

Like most universities, Maryville’s student-related data centers around two significant sources: the student information system (SIS), which houses student profiles, course completion, and financial aid information; and the learning management system (LMS) in which students review course materials, complete assignments, and engage in online discussions with faculty and fellow students.

The first of these, the SIS, stores its data in an on-premises relational database, and for several years, a significant subset of its contents had been incorporated into the university’s data warehouse. The LMS, however, contains data that the team had not tried to bring into their data warehouse. Moreover, that data is managed by a SaaS application from Instructure, called “Canvas,” and is not directly accessible for traditional extract, transform, and load (ETL) processing. The team recognized they needed a new approach and began down the path of creating a data lake in AWS to support their analysis goals.

Getting started on the data lake

The first step the team took in building their data lake made use of an open source solution that Harvard’s IT department developed. The solution, comprised of AWS Lambda functions and Amazon Simple Storage Service (S3) buckets, is deployed using AWS CloudFormation. It enables any university that uses Canvas for their LMS to implement a solution that moves LMS data into an S3 data lake on a daily basis. The following diagram illustrates this portion of Maryville’s data lake architecture:

The data lake for the Learning Management System data

Diagram 1: The data lake for the Learning Management System data

The AWS Lambda functions invoke the LMS REST API on a daily schedule resulting in Maryville’s data, which has been previously unloaded and compressed by Canvas, to be securely stored into S3 objects. AWS Glue tables are defined to provide access to these S3 objects. Amazon Simple Notification Service (SNS) informs stakeholders the status of the data loads.

Expanding the data lake

The next step was deciding how to copy the SIS data into S3. The team decided to use the AWS Database Migration Service (DMS) to create daily snapshots of more than 2,500 tables from this database. DMS uses a source endpoint for secure access to the on-premises database instance over VPN. A target endpoint determines the specific S3 bucket into which the data should be written. A migration task defines which tables to copy from the source database along with other migration options. Finally, a replication instance, a fully managed virtual machine, runs the migration task to copy the data. With this configuration in place, the data lake architecture for SIS data looks like this:

Diagram 2: Migrating data from the Student Information System

Diagram 2: Migrating data from the Student Information System

Handling sensitive data

In building a data lake you have several options for handling sensitive data including:

  • Leaving it behind in the source system and avoid copying it through the data replication process
  • Copying it into the data lake, but taking precautions to ensure that access to it is limited to authorized staff
  • Copying it into the data lake, but applying processes to eliminate, mask, or otherwise obfuscate the data before it is made accessible to analysts and data scientists

The Maryville team decided to take the first of these approaches. Building the data lake gave them a natural opportunity to assess where this data was stored in the source system and then make changes to the source database itself to limit the number of highly sensitive data fields.

Validating the data lake

With these steps completed, the team turned to the final task, which was to validate the data lake. For this process they chose to make use of Amazon Athena, AWS Glue, and Amazon Redshift. AWS Glue provided multiple capabilities including metadata extraction, ETL, and data orchestration. Metadata extraction, completed by Glue crawlers, quickly converted the information that DMS wrote to S3 into metadata defined in the Glue data catalog. This enabled the data in S3 to be accessed using standard SQL statements interactively in Athena. Without the added cost and complexity of a database, Maryville’s data analyst was able to confirm that the data loads were completing successfully. He was also able to resolve specific issues encountered on particular tables. The SQL queries, written in Athena, could later be converted to ETL jobs in AWS Glue, where they could be triggered on a schedule to create additional data in S3. Athena and Glue enabled the ETL that was needed to transform the raw data delivered to S3 into prepared datasets necessary for existing dashboards.

Once curated datasets were created and stored in S3, the data was loaded into an AWS Redshift data warehouse, which supported direct access by tools outside of AWS using ODBC/JDBC drivers. This capability enabled Maryville’s team to further validate the data by attaching the data in Redshift to existing dashboards that were running in Maryville’s own data center. Redshift’s stored procedure language allowed the team to port some key ETL logic so that the engineering of these datasets could follow a process similar to approaches used in Maryville’s on-premises data warehouse environment.

Conclusion

The overall data lake/data warehouse architecture that the Maryville team constructed currently looks like this:

The complete architecture

Diagram 3: The complete architecture

Through this approach, Maryville’s two-person team has moved key data into position for use in a variety of workloads. The data in S3 is now readily accessible for ad hoc interactive SQL workloads in Athena, ETL jobs in Glue, and ultimately for machine learning workloads running in EC2, Lambda or Amazon Sagemaker. In addition, the S3 storage layer is easy to expand without interrupting prior workloads. At the time of this writing, the Maryville team is both beginning to use this environment for machine learning models described earlier as well as adding other data sources into the S3 layer.

Acknowledgements

The solution described in this post resulted from the collaborative effort of Christine McQuie, Data Engineer, and Josh Tepen, Cloud Engineer, at Maryville University, with guidance from Travis Berkley and Craig Jordan, AWS Solutions Architects.

Crafting serverless streaming ETL jobs with AWS Glue

Post Syndicated from Radhika Ravirala original https://aws.amazon.com/blogs/big-data/crafting-serverless-streaming-etl-jobs-with-aws-glue/

Organizations across verticals have been building streaming-based extract, transform, and load (ETL) applications to more efficiently extract meaningful insights from their datasets. Although streaming ingest and stream processing frameworks have evolved over the past few years, there is now a surge in demand for building streaming pipelines that are completely serverless. Since 2017, AWS Glue has made it easy to prepare and load your data for analytics using ETL.

In this post, we dive into streaming ETL in AWS Glue, a feature that allows you to build continuous ETL applications on streaming data. Streaming ETL in AWS Glue is based on Apache Spark’s Structured Streaming engine, which provides a fault-tolerant, scalable, and easy way to achieve end-to-end stream processing with exactly-once semantics. This post walks you through an example of building a stream processing pipeline using AWS Glue that includes reading streaming data from Amazon Kinesis Data Streams, schema discovery, running streaming ETL, and writing out the results to a sink.

Serverless streaming ETL architecture

For this post, our use case is relevant to our current situation with the COVID-19 pandemic. Ventilators are in high demand and are increasingly used in different settings: hospitals, nursing homes, and even private residences. Ventilators generate data that must be monitored, and an increase in ventilator usage means there is a tremendous amount of streaming data that needs to be processed promptly, so patients can be attended to as quickly as possible as the need arises. In this post, we build a streaming ETL job on ventilator metrics and enhance the data with details to raise the alert level if the metrics fall outside of the normal range. After you enrich the data, you can use it to visualize on monitors.

In our streaming ETL architecture, a Python script generates sample ventilator metrics and publishes them as a stream into Kinesis Data Streams. We create a streaming ETL job in AWS Glue that consumes continuously generated ventilator metrics in micro-batches, applies transformations, performs aggregations, and delivers the data to a sink, so the results can be visualized or used in downstream processes.

Because businesses often augment their data lakes built on Amazon Simple Storage Service (Amazon S3) with streaming data, our first use case applies transformations on the streaming JSON data ingested via Kinesis Data Streams and loads the results in Parquet format to an Amazon S3 data lake. After ingested to Amazon S3, you can query the data with Amazon Athena and build visual dashboards using Amazon QuickSight.

For the second use case, we ingest the data from Kinesis Data Streams, join it with reference data in Amazon DynamoDB to calculate alert levels, and write the results to an Amazon DynamoDB sink. This approach allows you to build near real-time dashboards with alert notifications.

The following diagram illustrates this architecture.

AWS Glue streaming ETL jobs

With AWS Glue, you can now create ETL pipelines on streaming data using continuously running jobs. You can ingest streaming data from Kinesis Data Streams and Amazon Managed Streaming for Kafka (Amazon MSK). AWS Glue streaming jobs can perform aggregations on data in micro-batches and deliver the processed data to Amazon S3. You can read from the data stream and write to Amazon S3 using the AWS Glue DynamicFrame API. You can also write to arbitrary sinks using native Apache Spark Structured Streaming APIs.

The following sections walk you through building a streaming ETL job in AWS Glue.

Creating a Kinesis data stream

First, we need a streaming ingestion source to consume continuously generated streaming data. For this post, we create a Kinesis data stream with five shards, which allows us to push 5,000 records per second into the stream.

  1. On the Amazon Kinesis dashboard, choose Data streams.
  2. Choose Create data stream.
  3. For Data stream name, enter ventilatorsstream.
  4. For Number of open shards, choose 5.

If you prefer to use the AWS Command Line Interface (AWS CLI), you can create the stream with the following code:

aws kinesis create-stream \
    --stream-name ventilatorstream \
    --shard-count 5

Generating streaming data

We can synthetically generate ventilator data in JSON format using a simple Python application (see the GitHub repo) or the Kinesis Data Generator (KDG).

Using a Python-based data generator

To generate streaming data using a Python script, you can run the following command from your laptop or Amazon Elastic Compute Cloud (Amazon EC2) instance. Make sure you have installed the faker library on your system and set up the boto3 credentials correctly before you run the script.

python3 generate_data.py --streamname glue_ventilator_stream

Using the Kinesis Data Generator

Alternatively, you can also use the Kinesis Data Generator with the ventilator template available on the GitHub repo. The following screenshot shows the template on the KDG console.

We start pushing the data after we create our AWS Glue streaming job.

Defining the schema

We need to specify a schema for our streaming data, which we can do one of two ways:

  • Retrieve a small batch of the data (before starting your streaming job) from the streaming source, infer the schema in batch mode, and use the extracted schema for your streaming job
  • Use the AWS Glue Data Catalog to manually create a table

For this post, we use the AWS Glue Data Catalog to create a ventilator schema.

  1. On the AWS Glue console, choose Data Catalog.
  2. Choose Tables.
  3. From the Add Table drop-down menu, choose Add table manually.
  4. For the table name, enter ventilators_table.
  5. Create a database with the name ventilatordb.
  6. Choose Kinesis as the type of source.
  7. Enter the name of the stream and https://kinesis.<aws-region>.amazonaws.com.
  8. For the classification, choose JSON.
  9. Define the schema according to the following table.
Column nameData type
ventilatoridint
eventtimestring
serialnumberstring
pressurecontrolint
o2statsint
minutevolumeint
manufacturerstring

 

  1. Choose Finish.

Creating an AWS Glue streaming job to hydrate a data lake on Amazon S3

With the streaming source and schema prepared, we’re now ready to create our AWS Glue streaming jobs. We first create a job to ingest data from the streaming source using AWS Glue DataFrame APIs.

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Add job.
  3. For Name, enter a UTF-8 String with no more than 255 characters.
  4. For IAM role¸ specify a role that is used for authorization to resources used to run the job and access data stores. Because streaming jobs require connecting to sources and sinks, you need to make sure that the AWS Identity and Access Management (IAM) role has permissions to read from Kinesis Data Stream, write to Amazon S3 and read, write to Amazon DynamoDB. Refer to Managing Access Permissions for AWS Glue Resources for details.
  5. For Type, choose Spark Streaming.
  6. For Glue Version, choose Spark 2.4, Python 3.
  7. For This job runs, select A new script authored by you.

You can have AWS Glue generate the streaming ETL code for you, but for this post, we author one from scratch.

  1. For Script file name, enter GlueStreaming-S3.
  2. For S3 path where script is stored, enter your S3 path.
  3. For Job bookmark, choose Disable.

For this post, we use the checkpointing mechanism of AWS Glue to keep track of the data read instead of a job bookmark.

  1. For Monitoring options, select Job metrics and Continuous logging.
  2. For Log filtering, select Standard filter and Spark UI.
  3. For Amazon S3 prefix for Spark event logs, enter the S3 path for the event logs.
  4. For Job parameters, enter the following key-values:
    1. –output path – The S3 path where the final aggregations are persisted
    2. –aws_region – The Region where you run the job

  5. Skip the connections part and choose Save and edit the script.

Streaming ETL to an Amazon S3 sink

We use the AWS Glue DynamicFrameReader class’s from_catalog method to read the streaming data. We specify the table name that has been associated with the data stream as the source of data (see the section Defining the schema). We add additional_options to indicate the starting position to read from in Kinesis Data Streams. TRIM_HORIZON allows us to start reading from the oldest record in the shard.

# Read from Kinesis Data Stream
sourceData = glueContext.create_data_frame.from_catalog( \
    database = "ventilatordb", \
    table_name = "ventilatortable", \
    transformation_ctx = "datasource0", \
    additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})

In the preceding code, sourceData represents a streaming DataFrame. We use the foreachBatch API to invoke a function (processBatch) that processes the data represented by this streaming DataFrame. The processBatch function receives a static DataFrame, which holds streaming data for a window size of 100s (default). It creates a DynamicFrame from the static DataFrame and writes out partitioned data to Amazon S3. See the following code:

glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds", "checkpoint_locationation": checkpoint_location})

To transform the DynamicFrame to fix the data type for eventtime (from string to timestamp) and write the ventilator metrics to Amazon S3 in Parquet format, enter the following code:

def processBatch(data_frame, batchId):
    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    hour = now.hour
    minute = now.minute
    if (data_frame.count() > 0):
        dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
        apply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \
            ("ventilatorid", "long", "ventilatorid", "long"), \
            ("eventtime", "string", "eventtime", "timestamp"), \
            ("serialnumber", "string", "serialnumber", "string"), \
            ("pressurecontrol", "long", "pressurecontrol", "long"), \
            ("o2stats", "long", "o2stats", "long"), \
            ("minutevolume", "long", "minutevolume", "long"), \
            ("manufacturer", "string", "manufacturer", "string")],\
            transformation_ctx = "apply_mapping")

        dynamic_frame.printSchema()

        # Write to S3 Sink
        s3path = s3_target + "/ingest_year=" + "{:0>4}".format(str(year)) + "/ingest_month=" + "{:0>2}".format(str(month)) + "/ingest_day=" + "{:0>2}".format(str(day)) + "/ingest_hour=" + "{:0>2}".format(str(hour)) + "/"
        s3sink = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = "s3", connection_options = {"path": s3path}, format = "parquet", transformation_ctx = "s3sink")

Putting it all together

In the Glue ETL code editor, enter the following code, then save and run the job:

import sys
import datetime
import boto3
import base64
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, \
                            ['JOB_NAME', \
                            'aws_region', \
                            'output_path'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# S3 sink locations
aws_region = args['aws_region']
output_path = args['output_path']

s3_target = output_path + "ventilator_metrics"
checkpoint_location = output_path + "cp/"
temp_path = output_path + "temp/"


def processBatch(data_frame, batchId):
    now = datetime.datetime.now()
    year = now.year
    month = now.month
    day = now.day
    hour = now.hour
    minute = now.minute
    if (data_frame.count() > 0):
        dynamic_frame = DynamicFrame.fromDF(data_frame, glueContext, "from_data_frame")
        apply_mapping = ApplyMapping.apply(frame = dynamic_frame, mappings = [ \
            ("ventilatorid", "long", "ventilatorid", "long"), \
            ("eventtime", "string", "eventtime", "timestamp"), \
            ("serialnumber", "string", "serialnumber", "string"), \
            ("pressurecontrol", "long", "pressurecontrol", "long"), \
            ("o2stats", "long", "o2stats", "long"), \
            ("minutevolume", "long", "minutevolume", "long"), \
            ("manufacturer", "string", "manufacturer", "string")],\
            transformation_ctx = "apply_mapping")

        dynamic_frame.printSchema()

        # Write to S3 Sink
        s3path = s3_target + "/ingest_year=" + "{:0>4}".format(str(year)) + "/ingest_month=" + "{:0>2}".format(str(month)) + "/ingest_day=" + "{:0>2}".format(str(day)) + "/ingest_hour=" + "{:0>2}".format(str(hour)) + "/"
        s3sink = glueContext.write_dynamic_frame.from_options(frame = apply_mapping, connection_type = "s3", connection_options = {"path": s3path}, format = "parquet", transformation_ctx = "s3sink")

# Read from Kinesis Data Stream
sourceData = glueContext.create_data_frame.from_catalog( \
    database = "ventilatordb", \
    table_name = "ventilatortable", \
    transformation_ctx = "datasource0", \
    additional_options = {"startingPosition": "TRIM_HORIZON", "inferSchema": "true"})

sourceData.printSchema()

glueContext.forEachBatch(frame = sourceData, batch_function = processBatch, options = {"windowSize": "100 seconds", "checkpoint_locationation": checkpoint_location})
job.commit()

Querying with Athena

When the processed streaming data is written in Parquet format to Amazon S3, we can run queries on Athena. Run the AWS Glue crawler on the Amazon S3 location where the streaming data is written out. The following screenshot shows our query results.

For instructions on building visual dashboards with the streaming data in Amazon S3, see Quick Start: Create an Analysis with a Single Visual Using Sample Data. The following dashboards show distribution of metrics, averages, and alerts based on anomalies on an hourly basis, but you can create more advanced dashboards with much granular (minute) intervals.

Streaming ETL to a DynamoDB sink

For the second use case, we transform the streaming data as it arrives without micro-batching and persist the data to a DynamoDB table. Scripts to create DynamoDB tables are available in the GitHub repo. We use Apache Spark’s Structured Streaming API to read ventilator-generated data from the data stream, join it with reference data for normal metrics range in a DynamoDB table, compute the status based on the deviation from normal metric values, and write the processed data to a DynamoDB table. See the following code:

import sys
import datetime
import base64
import decimal
import boto3
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue import DynamicFrame

args = getResolvedOptions(sys.argv, \
                            ['JOB_NAME', \
                            'aws_region', \
                            'checkpoint_location', \
                            'dynamodb_sink_table', \
                            'dynamodb_static_table'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Read parameters
checkpoint_location = args['checkpoint_location']
aws_region = args['aws_region']

# DynamoDB config
dynamodb_sink_table = args['dynamodb_sink_table']
dynamodb_static_table = args['dynamodb_static_table']

def write_to_dynamodb(row):
    '''
    Add row to DynamoDB.
    '''
    dynamodb = boto3.resource('dynamodb', region_name=aws_region)
    start = str(row['window'].start)
    end = str(row['window'].end)
    dynamodb.Table(dynamodb_sink_table).put_item(
      Item = { 'ventilatorid': row['ventilatorid'], \
                'status': str(row['status']), \
                'start': start, \
                'end': end, \
                'avg_o2stats': decimal.Decimal(str(row['avg_o2stats'])), \
                'avg_pressurecontrol': decimal.Decimal(str(row['avg_pressurecontrol'])), \
                'avg_minutevolume': decimal.Decimal(str(row['avg_minutevolume']))})

dynamodb_dynamic_frame = glueContext.create_dynamic_frame.from_options( \
    "dynamodb", \
    connection_options={
    "dynamodb.input.tableName": dynamodb_static_table,
    "dynamodb.throughput.read.percent": "1.5"
  }
)

dynamodb_lookup_df = dynamodb_dynamic_frame.toDF().cache()

# Read from Kinesis Data Stream
streaming_data = spark.readStream \
                    .format("kinesis") \
                    .option("streamName","glue_ventilator_stream") \
                    .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") \
                    .option("startingPosition", "TRIM_HORIZON") \
                    .load()

# Retrieve Sensor columns and do a simple projection
ventilator_fields = streaming_data \
    .select(from_json(col("data") \
    .cast("string"),glueContext.get_catalog_schema_as_spark_schema("ventilatordb","ventilators_table")) \
    .alias("ventilatordata")) \
    .select("ventilatordata.*") \
    .withColumn("event_time", to_timestamp(col('eventtime'), "yyyy-MM-dd HH:mm:ss")) \
    .withColumn("ts", to_timestamp(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))

# Stream static join, ETL to augment with status
ventilator_joined_df = ventilator_fields.join(dynamodb_lookup_df, "ventilatorid") \
    .withColumn('status', when( \
    ((ventilator_fields.o2stats < dynamodb_lookup_df.o2_stats_min) | \
    (ventilator_fields.o2stats > dynamodb_lookup_df.o2_stats_max)) & \
    ((ventilator_fields.pressurecontrol < dynamodb_lookup_df.pressure_control_min) | \
    (ventilator_fields.pressurecontrol > dynamodb_lookup_df.pressure_control_max)) & \
    ((ventilator_fields.minutevolume < dynamodb_lookup_df.minute_volume_min) | \
    (ventilator_fields.minutevolume > dynamodb_lookup_df.minute_volume_max)), "RED") \
    .when( \
    ((ventilator_fields.o2stats >= dynamodb_lookup_df.o2_stats_min) |
    (ventilator_fields.o2stats <= dynamodb_lookup_df.o2_stats_max)) & \
    ((ventilator_fields.pressurecontrol >= dynamodb_lookup_df.pressure_control_min) | \
    (ventilator_fields.pressurecontrol <= dynamodb_lookup_df.pressure_control_max)) & \
    ((ventilator_fields.minutevolume >= dynamodb_lookup_df.minute_volume_min) | \
    (ventilator_fields.minutevolume <= dynamodb_lookup_df.minute_volume_max)), "GREEN") \
    .otherwise("ORANGE"))

ventilator_joined_df.printSchema()

# Drop the normal metric values
ventilator_transformed_df = ventilator_joined_df \
                            .drop('eventtime', 'o2_stats_min', 'o2_stats_max', \
                            'pressure_control_min', 'pressure_control_max', \
                            'minute_volume_min', 'minute_volume_max')

ventilator_transformed_df.printSchema()

ventilators_df = ventilator_transformed_df \
    .groupBy(window(col('ts'), '10 minute', '5 minute'), \
    ventilator_transformed_df.status, ventilator_transformed_df.ventilatorid) \
    .agg( \
    avg(col('o2stats')).alias('avg_o2stats'), \
    avg(col('pressurecontrol')).alias('avg_pressurecontrol'), \
    avg(col('minutevolume')).alias('avg_minutevolume') \
    )

ventilators_df.printSchema()

# Write to DynamoDB sink
ventilator_query = ventilators_df \
    .writeStream \
    .foreach(write_to_dynamodb) \
    .outputMode("update") \
    .option("checkpointLocation", checkpoint_location) \
    .start()

ventilator_query.awaitTermination()

job.commit()

After the above code is run, ventilator metric aggregations get persisted in the Amazon DynamoDB table as follows. You can build custom user interface applications with the data in Amazon DynamoDB to create dashboards.

Conclusion

Streaming applications have become a core component of data lake architectures. With AWS Glue streaming, you can create serverless ETL jobs that run continuously, consuming data from streaming services like Kinesis Data Streams and Amazon MSK. You can load the results of streaming processing into an Amazon S3-based data lake, JDBC data stores, or arbitrary sinks using the Structured Streaming API.

For more information about streaming AWS Glue ETL jobs, see the following:

We encourage you to build a serverless streaming application using AWS Glue streaming ETL and share your experience with us. If you have any questions or suggestions, share them in the comments.


About the Author


Radhika Ravirala is a specialist solutions architect at Amazon Web Services, where she helps customers craft distributed analytics applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley.

Event-driven refresh of SPICE datasets in Amazon QuickSight

Post Syndicated from Dylan Qu original https://aws.amazon.com/blogs/big-data/event-driven-refresh-of-spice-datasets-in-amazon-quicksight/

Businesses are increasingly harnessing data to improve their business outcomes. To enable this transformation to a data-driven business, customers are bringing together data from structured and unstructured sources into a data lake. Then they use business intelligence (BI) tools, such as Amazon QuickSight, to unlock insights from this data.

To provide fast access to datasets, QuickSight provides a fully managed calculation engine called SPICE—the Super-fast, Parallel, In-Memory Calculation Engine. At the time of writing, SPICE enables you to cache up to 250 million rows or 500 GB of data per dataset.

To extract value from the data quickly, you need access to new data as soon as it’s available. In this post, we describe how to achieve this by refreshing SPICE datasets as part of your extract, transform, and load (ETL) pipelines.

Solution architecture

In this post, you automate the refresh of SPICE datasets by implementing the following architecture.

This architecture consists of two parts: an example ETL job and a decoupled event-driven process to refresh SPICE.

For the ETL job, you use Amazon Simple Storage Service (Amazon S3) as your primary data store. Data lands in an S3 bucket, which we refer to as the raw zone. An Amazon S3 trigger configured on this bucket triggers an AWS Lambda function, which starts an AWS Glue ETL job. This job processes the raw data and outputs processed data into another S3 bucket, which we refer to as the processed zone.

This sample ETL job converts the data to Apache Parquet format and stores it in the processed S3 bucket. You can modify the ETL job to achieve other objectives, like more granular partitioning, compression, or enriching of the data. The Glue Data Catalog stores the metadata and QuickSight datasets are created using Amazon Athena data sources.

To trigger the SPICE dataset refresh, after the ETL job finishes, an Amazon EventBridge rule triggers a Lambda function that initiates the refresh.

In summary, this pipeline transforms your data and updates QuickSight SPICE datasets upon completion.

Deploying the automated data pipeline using AWS CloudFormation

Before deploying the AWS CloudFormation template, make sure you have signed up for QuickSight in one of the 11 supported Regions:

  • US East (Ohio)
  • US East (N. Virginia)
  • US West (Oregon)
  • Asia Pacific (Mumbai)
  • Asia Pacific (Seoul)
  • Asia Pacific (Singapore)
  • Asia Pacific (Sydney)
  • Asia Pacific (Tokyo)
  • EU (Frankfurt)
  • EU (Ireland)
  • EU (London)

This post works with both Standard and Enterprise editions of QuickSight. Enterprise Edition provides richer features and higher limits compared to Standard Edition.

  1. After you sign up for QuickSight, you can use CloudFormation templates to create all the necessary resources by choosing Launch stack:
  2. Enter a stack name; for example, SpiceRefreshBlog.
  3. Acknowledge the AWS Identity and Access Management (IAM) resource creation.
  4. Choose Create stack.

The CloudFormation template creates the following resources in your AWS account:

  • Three S3 buckets to store the following:
    • AWS Glue ETL job script
    • Raw data
    • Processed data
  • Three Lambda functions to do the following:
    • Create the ETL job
    • Initiate the ETL job upon upload of new data in the raw zone
    • Initiate the SPICE dataset refresh when the ETL job is complete
  • An AWS Glue database
  • Two AWS Glue tables to store the following:
    • Raw data
    • Processed data
  • An ETL job to convert the raw data from CSV into Apache Parquet format
  • Four IAM roles: One each for the Lambda functions and one for the ETL job
  • An EventBridge rule that triggers on an AWS Glue job state change event with a state of Succeeded and invokes a Lambda function that performs the SPICE dataset refresh

Importing the dataset

For this post, you use the taxi Trip Record Data dataset publicly available from the NYC Taxi & Limousine Commission Trip Record Data dataset. You upload monthly data in CSV format to the raw zone S3 bucket.

This data is available in Amazon S3 through Open Data on AWS, a service designed to let you spend more time on data analysis rather than data acquisition.

You start by copying the For Hire Vehicle (FHV) data for March 2020. Because the data is already available in Amazon S3 through Open Data, run the following command to copy the data into the raw zone. Make sure you replace <raw bucket name> with the name of the raw bucket created by the CloudFormation template:

aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_2020-03.csv" s3://<raw bucket name>

After you copy the data into the raw zone, the Amazon S3 event trigger invokes the Lambda function that triggers the ETL job. You can see the job status on the AWS Glue console by choosing Jobs in the navigation pane. The process takes about 2 minutes.

When the job is complete, check that you can see the Parquet files in the processed zone S3 bucket.

Creating a QuickSight analysis of the data

To visualize the taxi data, we create a QuickSight analysis.

First, you need to give QuickSight the necessary permissions to access the processed zone S3 bucket. For instructions, see I Can’t Connect to Amazon S3.

Then complete the following steps to create an analysis of the taxi data:

  1. On the QuickSight console, choose Datasets.
  2. Choose New dataset.
  3. Choose Athena and provide a name for the data source (such as Athena).
  4. Choose Create data source.
  5. For Database, choose the name of the taxi AWS Glue database (starting with taxigluedatabase).
  6. For Tables, select processed_taxi_data as the table to visualize.
  7. Choose Select.
  8. Ensure Import to SPICE for quicker analytics is selected and choose Visualize.

After the data is imported into SPICE, you can create visuals to display the data. For example, the following screenshot shows a key performance indicator (KPI) of the number of taxi journeys aggregated at the month level and the number of journeys over time.

We use this dashboard to visualize the dataset again after we refresh SPICE with more data.

Automating the SPICE refresh

To refresh the SPICE dataset when the ETL job is complete, the CloudFormation template we deployed created an EventBridge rule that triggers a Lambda function each time an AWS Glue ETL job successfully completes. The following screenshot shows the code for the event pattern.

We need to configure the Lambda function with the ETL job name and the ID of the SPICE dataset we created in QuickSight.

  1. Locate the ETL job name on the AWS Glue console, named TaxiTransformationGlueJob-<unique id>.
  2. To find the SPICE dataset ID, run the following command using the AWS Command Line Interface (AWS CLI):
    aws quicksight list-data-sets --aws-account-id <your AWS account id> 

    The following screenshot shows the output with the dataset ID.

  3. On the Lambda console, open the Lambda function named SpiceRefreshBlog-QuicksightUpdateLambda-<unique id>.
  4. Update line 9 of the code to replace ReplaceWithGlueJobName with the AWS Glue job name and ReplaceWithYourDatasetID with the dataset ID.

Once a Glue job succeeds, this Lambda function is triggered. The EventBridge event that triggers the Lambda contains the name of the job. You can access this from the event as follows, as seen on line 25 of the function:

succeededJob = event[‘detail’][‘jobName’]

The Lambda function looks up the job name in the data_set_map dictionary. If the dictionary contains the job name, the dataset ID is accessed and the function calls the QuickSight Create Ingestion API to refresh the SPICE datasets.

You can extend the data_set_map dictionary to include additional job names and associated SPICE dataset IDs to be refreshed. If using this approach at scale, you might choose to move this configuration information to an Amazon DynamoDB table.

  1. Save the Lambda function by choosing Deploy.

Testing the automated refresh

Now that you have configured the Lambda function, we can test the ETL end-to-end process and make the next month’s data available for analysis.

To add the FHV data for April, run the following AWS CLI command:

aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_2020-04.csv" s3://<raw bucket name>

As before, this upload to the raw zone triggers the Lambda function that starts the ETL job. You can to see the progress of the job on the AWS Glue console.

When the job is complete, navigate to QuickSight and open the taxi analysis (or, if you still have it open, refresh the window).

You can now see that both months’ data is available for analysis. This step might take 1–2 minutes to load.

To see the status of each SPICE refresh, navigate back to the dataset on the QuickSight console and choose View History.

The following screenshot shows the status of previous refreshes and the number of rows that have been ingested into SPICE.

Now that you have tested the end-to-end process, you can try copying more FHV data to the raw zone and see the data within your QuickSight analysis.

Cleaning up

To clean up the resources you created by following along with this post, complete the following steps:

  1. Delete the QuickSight analysis you created.
  2. Delete the QuickSight dataset that you created.
  3. Delete the QuickSight data source:
    1. Choose New dataset.
    2. Select the data source and choose Delete data source.
  4. On the Amazon S3 console, delete the contents of the raw and processed S3 buckets.
  5. On the AWS CloudFormation console, select the stack SpiceRefreshBlog and choose Delete.

Conclusion

Using an event-based architecture to automate the refresh of your SPICE datasets makes sure that your business analysts are always viewing the latest available data. This reduction in time to analysis can help your business unlock insights quicker without having to wait for a manual or scheduled process. Additionally, by only refreshing SPICE when new data is available, the underlying data storage resources are used efficiently, so you only pay for what you need!

Get started with QuickSight today!


About the Authors

Rob Craig is a Senior Solutions Architect with AWS. He supports customers in the UK with their cloud journey, providing them with architectural advice and guidance to help them achieve their business outcomes.

 

 

 

 

Dylan Qu is an AWS solutions architect responsible for providing architectural guidance across the full AWS stack with a focus on Data Analytics, AI/ML and DevOps.

Making ETL easier with AWS Glue Studio

Post Syndicated from Leonardo Gomez original https://aws.amazon.com/blogs/big-data/making-etl-easier-with-aws-glue-studio/

AWS Glue Studio is an easy-to-use graphical interface that speeds up the process of authoring, running, and monitoring extract, transform, and load (ETL) jobs in AWS Glue. The visual interface allows those who don’t know Apache Spark to design jobs without coding experience and accelerates the process for those who do.

AWS Glue Studio was designed to help you create ETL jobs easily. After you design a job in the graphical interface, it generates Apache Spark code for you, abstracting users from the challenges of coding. When the job is ready, you can run it and monitor the job status using the integrated UI.

AWS Glue Studio supports different types of data sources, both structured and semi-structured, and offers data processing in real time and batch. You can extract data from sources like Amazon Simple Storage Service (Amazon S3), Amazon Relational Database Service (Amazon RDS), Amazon Kinesis, and Apache Kafka. It also offers Amazon S3 and tables defined in the AWS Glue Data Catalog as destinations.

This post shows you how to create an ETL job to extract, filter, join, and aggregate data easily using AWS Glue Studio.

About this blog post
Time to read15 minutes
Time to complete45 minutes
Cost to complete (estimated)Amazon S3: $0.023
AWS Glue: 0.036
AWS Identity & Access Management: $0
Total Cost: $0.059
Learning levelIntermediate (200)
Services usedAWS Glue, Amazon S3, AWS Identity and Access Management

Overview of solution

To demonstrate how to create an ETL job using AWS Glue Studio, we use the Toronto parking tickets dataset, specifically the data about parking tickets issued in the city of Toronto in 2018, and the trials dataset, which contains all the information about the trials related to those parking tickets. The goal is to filter, join, and aggregate the two datasets to get the number of parking tickets handled per court in the city of Toronto during that year.

Prerequisites

For this walkthrough, you should have an AWS account. For this post, you launch the required AWS resources using AWS CloudFormation in the us-east-1 Region. If you haven’t signed up for AWS, complete the following tasks:

  1. Create an account.
  2. Create an AWS Identity and Access Management (IAM) user. For instructions, see Create an IAM User.

Important: If the AWS account you use to follow this guide uses AWS Lake Formation to manage permissions on the Glue data catalog, make sure that you log in as a user that is both a Data lake administrator and a Database creator, as described in the documentation.

Launching your CloudFormation stack

To create your resources for this use case, complete the following steps:

  1. Launch your stack in us-east-1:
  2. Select the I acknowledge that AWS CloudFormation might create IAM resources with custom names option.
  3. Choose Create stack.

Launching this stack creates AWS resources. The following resources shown in the AWS CloudFormation output are the ones you need in the next steps:

  • Key – Description
  • AWSGlueStudioRole – IAM role to run AWS Glue jobs
  • AWSGlueStudioS3Bucket – Name of the S3 bucket to store blog-related files
  • AWSGlueStudioTicketsYYZDB – AWS Glue Data Catalog database
  • AWSGlueStudioTableTickets – Data Catalog table to use as a source
  • AWSGlueStudioTableTrials – Data Catalog table to use as a source
  • AWSGlueStudioParkingTicketCount –Data Catalog table to use as the destination

Creating a job

A job is the AWS Glue component that allows the implementation of business logic to transform data as part of the ETL process. For more information, see Adding Jobs in AWS Glue.

To create an AWS Glue job using AWS Glue Studio, complete the following steps:

  1. On the AWS Management Console, choose Services.
  2. Under Analytics, choose AWS Glue.
  3. In the navigation pane, choose AWS Glue Studio.
  4. On the AWS Glue Studio home page, choose Create and manage jobs.

AWS Glue Studio supports different sources, including Amazon S3, Amazon RDS, Amazon Kinesis, and Apache Kafka. For this post, you use two AWS Glue tables as data sources and one S3 bucket as the destination.

  1. In the Create Job section, select Blank graph.
  2. Choose Create.

This takes you to the Visual Canvas to create an AWS Glue job.

  1. Change the Job name from Untitled Job to YYZ-Tickets-Job.

You now have an AWS Glue job ready to filter, join, and aggregate data from two different sources.

Adding sources

For this post, you use two AWS Glue tables as data sources: Tickets and Trials, which the CloudFormation template created. The data is located in an external S3 bucket in Parquet format. To add these tables as sources, complete the following steps:

  1. Choose the (+) icon.
  2. On the Node properties tab, for Name, enter Tickets.
  3. For Node type, choose S3.
  4. On the Data Source properties -S3 tab, for Database, choose yyz-tickets.
  5. For Table, choose tickets.
  6. For Partition predicate (optional), leave blank.

Before adding the second data source to the ETL job, be sure that the node you just created isn’t selected.

  1. Choose the (+) icon.
  2. On the Node properties tab, for Name, enter Trials.
  3. For Node type, choose S3.
  4. On the Data Source properties -S3 tab, for Database, choose yyz-tickets.
  5. For Table, choose trials.
  6. For Partition predicate (optional), leave blank.

You now have two AWS Glue tables as the data sources for the AWS Glue job.

Adding transforms

A transform is the AWS Glue Studio component were the data is modified. You have the option of using different transforms that are part of this service or custom code. To add transforms, complete the following steps:

  1. Choose the Tickets node.
  2. Choose the (+) icon.

  3. On the Node properties tab, for Name, enter Ticket_Mapping.
  4. For Node type, choose ApplyMapping.
  5. For Node parents, choose Tickets.
  6. On the Transform tab, change the ticket_number data type from decimal to int.
  7. Drop the following columns:
    • Location1
    • Location2
    • Location3
    • Location4
    • Province

Now you add a second ApplyMapping transform to modify the Trials data source.

  1. Choose the Trials data source node.
  2. Choose the (+) icon.
  3. On the Node properties tab, for Name, enter Trial_Mapping.
  4. For Node type, choose ApplyMapping.
  5. For Node parents, leave at default value (Trials).
  1. On the Transform tab, change the parking_ticket_number data type from long to int.

Now that you have set the right data types and removed some of the columns, it’s time to join the data sources using the Join transform.

  1. Choose the Ticket_Mapping transform.
  2. Choose the (+) icon.

  3. On the Node properties tab, for Name, enter Join_Ticket_Trial.
  4. For Node type, choose Join.
  5. For Node parents, choose Ticket_Mapping and Trial_Mapping.
  6. On the Transform tab, for Join type, choose Inner join.
  7. For Join conditions, choose Add condition.
  8. For Ticket_Mapping, choose ticket_number.
  9. For Trial_Mapping, choose parking_ticket_number.

Now the two data sources are joined by the ticket_number and parking_ticket_number columns.

Performing data aggregation

In this step, you do some data aggregation to see the number of tickets handled per court in Toronto.

AWS Glue Studio offers the option of adding custom code for those use cases that need a more complex transformation. For this post, we use PySpark code to do the data transformation. It contains Sparksql code and a combination of dynamic frames and data frames.

  1. Choose the Join_Tickets_Trial transform.
  2. Choose the (+) icon.

  3. On the Node properties tab, for Name, enter Aggregate_Tickets.
  4. For Node type, choose Custom transform.
  5. For Node parents, leave Join_Ticket_Trial selected.
  6. On the Transform tab, for Code block, change the function name from MyTransform to Aggregate_Tickets.
  7. Enter the following code:
    selected = dfc.select(list(dfc.keys())[0]).toDF()
    selected.createOrReplaceTempView("ticketcount")
    totals = spark.sql("select court_location as location, infraction_description as infraction, count(infraction_code) as total  FROM ticketcount group by infraction_description, infraction_code, court_location order by court_location asc")
    results = DynamicFrame.fromDF(totals, glueContext, "results")
    return DynamicFrameCollection({"results": results}, glueContext)
    

After adding the custom transformation to the AWS Glue job, you want to store the result of the aggregation in the S3 bucket. To do this, you need a Select from collection transform to read the output from the Aggregate_Tickets node and send it to the destination.

  1. Choose the New node node.
  2. Leave the Transform tab with the default values.
  3. On the Node Properties tab, change the name of the transform to Select_Aggregated_Data.
  4. Leave everything else with the default values.
  5. Choose the Select_Aggregated_Data node.
  6. Choose the (+) icon.

  7. On the Node properties tab, for Name, enter Ticket_Count_Dest.
  8. For Node type, choose S3 in the Data target section.
  9. For Node parents, choose Select_Aggregated_Data.
  10. On the Data Target Properties-S3 tab, for Format, choose Parquet.
  11. For Compression Type, choose GZIP.
  12. For S3 Target Location, enter s3://glue-studio-blog-{Your Account ID as a 12-digit number}/parking_tickets_count/.

The job should look like the following screenshot.

You now have three transforms to do data mapping, filtering, and aggregation.

Configuring the job

When the logic behind the job is complete, you must set the parameters for the job run. In this section, you configure the job by selecting components such as the IAM role and the AWS Glue version you use to run the job.

  1. On the Job details tab, for Description, enter Glue Studio blog post job.
  2. For IAM Role, choose AWSGlueStudioRole (which the CloudFormation template created).
  3. For Job Bookmark, choose Disable.
  4. For Number of retries, optionally enter 1.
  5. Choose Save.
  6. When the job is saved, choose Run.

Monitoring the job

AWS Glue Studio offers a job monitoring dashboard that provides comprehensive information about your jobs. You can get job statistics and see detailed info about the job and the job status when running.

  1. In the AWS Glue Studio navigation panel, choose Monitoring.
  2. Choose the entry with the job name YYZ-Tickets_Job.
  3. For get more details about the job run, choose View run details.
  4. Wait until Run Status changes to Succeeded.

You can verify that the job ran successfully on the Amazon Athena console.

  1. On the Athena console, choose the yyz-tickets database.
  2. Choose the icon next to the parking_tickets_count table (which the CloudFormation template created).

For more information about creating AWS Glue tables, see Defining Tables in the AWS Glue Data Catalog.

  1. Choose Preview table.

As you can see in the following screenshot, the information that the job generated is available and you can query the number of tickets types per court issued in the city of Toronto in 2018.

Cleaning up

To avoid incurring future charges and to clean up unused roles and policies, delete the resources you created: the CloudFormation stack, S3 bucket, and AWS Glue job.

Conclusion

In this post, you learned how to use AWS Glue Studio to create an ETL job. You can use AWS Glue Studio to speed up the ETL job creation process and allow different personas to transform data without any previous coding experience. For more information about AWS Glue Studio, see the AWS Glue Studio documentation and What’s New with AWS.


About the author

Leonardo Gómez is a Senior Analytics Specialist Solution Architect at AWS. Based in Toronto, Canada, He works with customers across Canada to design and build big data solutions.

 

 

 

 

 

 

 

Building an AWS Glue ETL pipeline locally without an AWS account

Post Syndicated from Adnan Alvee original https://aws.amazon.com/blogs/big-data/building-an-aws-glue-etl-pipeline-locally-without-an-aws-account/

If you’re new to AWS Glue and looking to understand its transformation capabilities without incurring an added expense, or if you’re simply wondering if AWS Glue ETL is the right tool for your use case and want a holistic view of AWS Glue ETL functions, then please continue reading. In this post, we walk you through several AWS Glue ETL functions with supporting examples, using a local PySpark shell in a containerized environment with no AWS artifact dependency. If you’re already familiar with AWS Glue and Apache Spark, you can use this solution as a quick cheat sheet for AWS Glue PySpark validations.

You don’t need an AWS account to follow along with this walkthrough. We use small example datasets for our use case and go through the transformations of several AWS Glue ETL PySpark functions: ApplyMapping, Filter, SplitRows, SelectFields, Join, DropFields, Relationalize, SelectFromCollection, RenameField, Unbox, Unnest, DropNullFields, SplitFields, Spigot and Write Dynamic Frame.

This post provides an introduction of the transformation capabilities of AWS Glue and provides insights towards possible uses of the supported functions. The goal is to get up and running with AWS Glue ETL functions in the shortest possible time, at no cost and without any AWS environment dependency.

Prerequisites

To follow along, you should have the following resources:

  • Basic programming experience
  • Basic Python and Spark knowledge (not required but good to have)
  • A desktop or workstation with Docker installed and running

If you prefer to set up the environment locally outside of a Docker container, you can follow the instructions provided in the GitHub repo, which hosts libraries used in AWS Glue. These libraries extend Apache Spark with additional data types and operations for ETL workflows.

Setting up resources

For this post, we use the amazon/aws-glue-libs:glue_libs_1.0.0_image_01 image from Dockerhub. This image has only been tested for AWS Glue 1.0 spark shell (PySpark). Additionally, this image also supports Jupyter and Zeppelin notebooks and a CLI interpreter. For the purpose of this post, we use the CLI interpreter. For more information on the container, please read Developing AWS Glue ETL jobs locally using a container.

To pull the relevant image from the Docker repository, enter the following command in a terminal prompt:

docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

To test on the command prompt, enter the following code:

docker run -itd --name glue_without_notebook amazon/aws-glue-libs:glue_libs_1.0.0_image_01
docker exec -it glue_without_notebook bash
/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/pyspark

To test on Jupyter notebooks, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter \amazon/aws-glue-libs:glue_libs_1.0.0_image_01 \
/home/jupyter/jupyter_start.sh

Browse to ‘localhost:8888’ in a browser to open Jupyter notebooks.

Importing GlueContext

To get started, enter the following import statements in the PySpark shell. We import GlueContext, which wraps the Spark SQLContext, thereby providing mechanisms to interact with Apache Spark:

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import *
from pyspark.sql import Row
glueContext = GlueContext(SparkContext.getOrCreate())

Dataset 1

We first generate a Spark DataFrame consisting of dummy data of an order list for a fictional company. We process the data using AWS Glue PySpark functions.

Enter the following code into the shell:

order_list = [
               ['1005', '623', 'YES', '1418901234', '75091'],\
               ['1006', '547', 'NO', '1418901256', '75034'],\
               ['1007', '823', 'YES', '1418901300', '75023'],\
               ['1008', '912', 'NO', '1418901400', '82091'],\
               ['1009', '321', 'YES', '1418902000', '90093']\
             ]

# Define schema for the order_list
order_schema = StructType([  
                      StructField("order_id", StringType()),
                      StructField("customer_id", StringType()),
                      StructField("essential_item", StringType()),
                      StructField("timestamp", StringType()),
                      StructField("zipcode", StringType())
                    ])

# Create a Spark Dataframe from the python list and the schema
df_orders = spark.createDataFrame(order_list, schema = order_schema)

The following .show() command allows us to view the DataFrame in the shell:

df_orders.show()

# Output
+--------+-----------+--------------+----------+-------+
|order_id|customer_id|essential_item| timestamp|zipcode|
+--------+-----------+--------------+----------+-------+
|    1005|        623|           YES|1418901234|  75091|
|    1006|        547|            NO|1418901256|  75034|
|    1007|        823|           YES|1418901300|  75023|
|    1008|        912|            NO|1418901400|  82091|
|    1009|        321|           YES|1418902000|  90093|
+--------+-----------+--------------+----------+-------+

DynamicFrame

A DynamicFrame is similar to a DataFrame, except that each record is self-describing, so no schema is required initially. Instead, AWS Glue computes a schema on-the-fly when required. We convert the df_orders DataFrame into a DynamicFrame.

Enter the following code in the shell:

dyf_orders = DynamicFrame.fromDF(df_orders, glueContext, "dyf") 

Now that we have our Dynamic Frame, we can start working with the datasets with AWS Glue transform functions.

ApplyMapping

The columns in our data might be in different formats, and you may want to change their respective names. ApplyMapping is the best option for changing the names and formatting all the columns collectively. For our dataset, we change some of the columns to Long from String format to save storage space later. We also shorten the column zipcode to zip. See the following code:

# Input 
dyf_applyMapping = ApplyMapping.apply( frame = dyf_orders, mappings = [ 
  ("order_id","String","order_id","Long"), 
  ("customer_id","String","customer_id","Long"),
  ("essential_item","String","essential_item","String"),
  ("timestamp","String","timestamp","Long"),
  ("zipcode","String","zip","Long")
])

dyf_applyMapping.printSchema()

# Output
root
|-- order_id: long
|-- customer_id: long
|-- essential_item: string
|-- timestamp: long
|-- zip: long

Filter

We now want to prioritize our order delivery for essential items. We can achieve that using the Filter function:

# Input 
dyf_filter = Filter.apply(frame = dyf_applyMapping, f = lambda x: x["essential_item"] == 'YES')

dyf_filter.toDF().show()

# Output 
+--------------+-----------+-----+----------+--------+
|essential_item|customer_id|  zip| timestamp|order_id|
+--------------+-----------+-----+----------+--------+
|           YES|        623|75091|1418901234|    1005|
|           YES|        823|75023|1418901300|    1007|
|           YES|        321|90093|1418902000|    1009|
+--------------+-----------+-----+----------+--------+

Map

Map allows us to apply a transformation to each record of a Dynamic Frame. For our case, we want to target a certain zip code for next day air shipping. We implement a simple “next_day_air” function and pass it to the Dynamic Frame:

# Input 

# This function takes in a dynamic frame record and checks if zipcode # 75034 is present in it. If present, it adds another column 
# “next_day_air” with value as True

def next_day_air(rec):
  if rec["zip"] == 75034:
    rec["next_day_air"] = True
  return rec

mapped_dyF =  Map.apply(frame = dyf_applyMapping, f = next_day_air)

mapped_dyF.toDF().show()

# Output
+--------------+-----------+-----+----------+--------+------------+
|essential_item|customer_id|  zip| timestamp|order_id|next_day_air|
+--------------+-----------+-----+----------+--------+------------+
|           YES|        623|75091|1418901234|    1005|        null|
|            NO|        547|75034|1418901256|    1006|        TRUE|
|           YES|        823|75023|1418901300|    1007|        null|
|            NO|        912|82091|1418901400|    1008|        null|
|           YES|        321|90093|1418902000|    1009|        null|
+--------------+-----------+-----+----------+--------+------------+

Dataset 2

To ship essential orders to the appropriate addresses, we need customer data. We demonstrate this by generating a custom JSON dataset consisting of zip codes and customer addresses. In this use case, this data represents the customer data of the company that we want to join later on.

We generate JSON strings consisting of customer data and use the Spark json function to convert them to a JSON structure (enter each jsonStr variable one at a time in case the terminal errors out):

# Input 
jsonStr1 = u'{ "zip": 75091, "customers": [{ "id": 623, "address": "108 Park Street, TX"}, { "id": 231, "address": "763 Marsh Ln, TX" }]}'
jsonStr2 = u'{ "zip": 82091, "customers": [{ "id": 201, "address": "771 Peek Pkwy, GA" }]}'
jsonStr3 = u'{ "zip": 75023, "customers": [{ "id": 343, "address": "66 P Street, NY" }]}'
jsonStr4 = u'{ "zip": 90093, "customers": [{ "id": 932, "address": "708 Fed Ln, CA"}, { "id": 102, "address": "807 Deccan Dr, CA" }]}'
df_row = spark.createDataFrame([
  Row(json=jsonStr1),
  Row(json=jsonStr2),
  Row(json=jsonStr3),
  Row(json=jsonStr4)
])

df_json = spark.read.json(df_row.rdd.map(lambda r: r.json))
df_json.show()

# Output
+-----------------------------------------------------+-----+
|customers                                            |zip  |
+-----------------------------------------------------+-----+
|[[108 Park Street, TX, 623], [763 Marsh Ln, TX, 231]]|75091|
|[[771 Peek Pkwy, GA, 201]]                           |82091|
|[[66 P Street, NY, 343]]                             |75023|
|[[708 Fed Ln, CA, 932], [807 Deccan Dr, CA, 102]]    |90093|
+-----------------------------------------------------+-----+
# Input
df_json.printSchema()

# Output
root
 |-- customers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |-- zip: long (nullable = true)

To convert the DataFrame back to a DynamicFrame to continue with our operations, enter the following code:

# Input
dyf_json = DynamicFrame.fromDF(df_json, glueContext, "dyf_json")

SelectFields

To join with the order list, we don’t need all the columns, so we use the SelectFields function to shortlist the columns we need. In our use case, we need the zip code column, but we can add more columns as the argument paths accepts a list:

# Input
dyf_selectFields = SelectFields.apply(frame = dyf_filter, paths=['zip'])

dyf_selectFields.toDF().show()

# Output
+-----+
|  zip|
+-----+
|75091|
|75023|
|90093|
+-----+

Join

The Join function is straightforward and manages duplicate columns. We had two columns named zip from both datasets. AWS Glue added a period (.) in one of the duplicate column names to avoid errors:

# Input
dyf_join = Join.apply(dyf_json, dyf_selectFields, 'zip', 'zip')
dyf_join.toDF().show()

# Output
+--------------------+-----+-----+
|           customers| .zip|  zip|
+--------------------+-----+-----+
|[[108 Park Street...|75091|75091|
|[[66 P Street, NY...|75023|75023|
|[[708 Fed Ln, CA,...|90093|90093|
+--------------------+-----+-----+

DropFields

Because we don’t need two columns with the same name, we can use DropFields to drop one or multiple columns all at once. The backticks (`) around .zip inside the function call are needed because the column name contains a period (.):

# Input
dyf_dropfields = DropFields.apply(
  frame = dyf_join,
  paths = "`.zip`"
)

dyf_dropfields.toDF().show()

# Output
+--------------------+-----+
|           customers|  zip|
+--------------------+-----+
|[[108 Park Street...|75091|
|[[66 P Street, NY...|75023|
|[[708 Fed Ln, CA,...|90093|
+--------------------+-----+

Relationalize

The Relationalize function can flatten nested structures and create multiple dynamic frames. Our customer column from the previous operation is a nested structure, and Relationalize can convert it into multiple flattened DynamicFrames:

# Input
dyf_relationize = dyf_dropfields.relationalize("root", "/home/glue/GlueLocalOutput")

To see the DynamicFrames, we can’t run a .show() yet because it’s a collection. We need to check what keys are present. See the following code:

# Input
dyf_relationize.keys()

# Output
dict_keys(['root', 'root_customers'])

In the follow-up function in the next section, we show how to pick the DynamicFrame from a collection of multiple DynamicFrames.

SelectFromCollection

The SelectFromCollection function allows us to retrieve the specific DynamicFrame from a collection of DynamicFrames. For this use case, we retrieve both DynamicFrames from the previous operation using this function.

To retrieve the first DynamicFrame, enter the following code:

# Input
dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'root')

dyf_selectFromCollection.toDF().show()

# Output
+---------+-----+
|customers|  zip|
+---------+-----+
|        1|75091|
|        2|75023|
|        3|90093|
+---------+-----+

To retrieve the second DynamicFrame, enter the following code:

# Input
dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'root_customers')

dyf_selectFromCollection.toDF().show()

# Output
+---+-----+---------------------+----------------+
| id|index|customers.val.address|customers.val.id|
+---+-----+---------------------+----------------+
|  2|    0|      66 P Street, NY|             343|
|  3|    0|       708 Fed Ln, CA|             932|
|  3|    1|    807 Deccan Dr, CA|             102|
|  1|    0|  108 Park Street, TX|             623|
|  1|    1|     763 Marsh Ln, TX|             231|
+---+-----+---------------------+----------------+

RenameField

The second DynamicFrame we retrieved from the previous operation introduces a period (.) into our column names and is very lengthy. We can change that using the RenameField function:

# Input
dyf_renameField_1 = RenameField.apply(dyf_selectFromCollection, "`customers.val.address`", "address")

dyf_renameField_2 = RenameField.apply(dyf_renameField_1, "`customers.val.id`", "cust_id")

dyf_dropfields_rf = DropFields.apply(
  frame = dyf_renameField_2,
  paths = ["index", "id"]
)

dyf_dropfields_rf.toDF().show()

# Output
+-------------------+-------+
|            address|cust_id|
+-------------------+-------+
|    66 P Street, NY|    343|
|     708 Fed Ln, CA|    932|
|  807 Deccan Dr, CA|    102|
|108 Park Street, TX|    623|
|   763 Marsh Ln, TX|    231|
+-------------------+-------+

ResolveChoice

ResloveChoice can gracefully handle column type ambiguities. For more information about the full capabilities of ResolveChoice, see the GitHub repo.

# Input
dyf_resolveChoice = dyf_dropfields_rf.resolveChoice(specs = [('cust_id','cast:String')])

dyf_resolveChoice.printSchema()

# Output
root
|-- address: string
|-- cust_id: string

Dataset 3

We generate another dataset to demonstrate a few other functions. In this use case, the company’s warehouse inventory data is in a nested JSON structure, which is initially in a String format. See the following code:

# Input
warehouse_inventory_list = [
              ['TX_WAREHOUSE', '{\
                          "strawberry":"220",\
                          "pineapple":"560",\
                          "mango":"350",\
                          "pears":null}'
               ],\
              ['CA_WAREHOUSE', '{\
                         "strawberry":"34",\
                         "pineapple":"123",\
                         "mango":"42",\
                         "pears":null}\
              '],
    		   ['CO_WAREHOUSE', '{\
                         "strawberry":"340",\
                         "pineapple":"180",\
                         "mango":"2",\
                         "pears":null}'
              ]
            ]


warehouse_schema = StructType([StructField("warehouse_loc", StringType())\
                              ,StructField("data", StringType())])

df_warehouse = spark.createDataFrame(warehouse_inventory_list, schema = warehouse_schema)
dyf_warehouse = DynamicFrame.fromDF(df_warehouse, glueContext, "dyf_warehouse")

dyf_warehouse.printSchema()

# Output
root
|-- warehouse_location: string
|-- data: string

Unbox

We use Unbox to extract JSON from String format for the new data. Compare the preceding printSchema() output with the following code:

# Input
dyf_unbox = Unbox.apply(frame = dyf_warehouse, path = "data", format="json")
dyf_unbox.printSchema()
# Output
root
|-- warehouse_loc: string
|-- data: struct
|    |-- strawberry: int
|    |-- pineapple: int
|    |-- mango: int
|    |-- pears: null

# Input 
dyf_unbox.toDF().show()

# Output
+-------------+----------------+
|warehouse_loc|            data|
+-------------+----------------+
| TX_WAREHOUSE|[220, 560, 350,]|
| CA_WAREHOUSE|  [34, 123, 42,]|
| CO_WAREHOUSE|  [340, 180, 2,]|
+-------------+----------------+

Unnest

Unnest allows us to flatten a single DynamicFrame to a more relational table format. We apply Unnest to the nested structure from the previous operation and flatten it:

# Input
dyf_unnest = UnnestFrame.apply(frame = dyf_unbox)

dyf_unnest.printSchema()

# Output 
root
|-- warehouse_loc: string
|-- data.strawberry: int
|-- data.pineapple: int
|-- data.mango: int
|-- data.pears: null

dyf_unnest.toDF().show()

# Output
+-------------+---------------+--------------+----------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|data.pears|
+-------------+---------------+--------------+----------+----------+
| TX_WAREHOUSE|            220|           560|       350|      null|
| CA_WAREHOUSE|             34|           123|        42|      null|
| CO_WAREHOUSE|            340|           180|         2|      null|
+-------------+---------------+--------------+----------+----------+

DropNullFields

The DropNullFields function makes it easy to drop columns with all null values. Our warehouse data indicated that it was out of pears and can be dropped. We apply the DropNullFields function on the DynamicFrame, which automatically identifies the columns with null values and drops them:

# Input
dyf_dropNullfields = DropNullFields.apply(frame = dyf_unnest)

dyf_dropNullfields.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| TX_WAREHOUSE|            220|           560|       350|
| CA_WAREHOUSE|             34|           123|        42|
| CO_WAREHOUSE|            340|           180|         2|
+-------------+---------------+--------------+----------+

SplitFields

SplitFields allows us to split a DyanmicFrame into two. The function takes the field names of the first DynamicFrame that we want to generate followed by the names of the two DynamicFrames:

# Input
dyf_splitFields = SplitFields.apply(frame = dyf_dropNullfields, paths = ["`data.strawberry`", "`data.pineapple`"], name1 = "a", name2 = "b")

For the first DynamicFrame, see the following code:

# Input
dyf_retrieve_a = SelectFromCollection.apply(dyf_splitFields, "a")
dyf_retrieve_a.toDF().show()

# Output
+---------------+--------------+
|data.strawberry|data.pineapple|
+---------------+--------------+
|            220|           560|
|             34|           123|
|            340|           180|
+---------------+--------------+

For the second Dynamic Frame, see the following code:

# Input
dyf_retrieve_b = SelectFromCollection.apply(dyf_splitFields, "b")
dyf_retrieve_b.toDF().show()

# Output
+-------------+----------+
|warehouse_loc|data.mango|
+-------------+----------+
| TX_WAREHOUSE|       350|
| CA_WAREHOUSE|        42|
| CO_WAREHOUSE|         2|
+-------------+----------+

SplitRows

SplitRows allows us to filter our dataset within a specific range of counts and split them into two DynamicFrames:

# Input
dyf_splitRows = SplitRows.apply(frame = dyf_dropNullfields, comparison_dict = {"`data.pineapple`": {">": "100", "<": "200"}}, name1 = 'pa_200_less', name2 = 'pa_200_more')

For the first Dynamic Frame, see the following code:

# Input
dyf_pa_200_less = SelectFromCollection.apply(dyf_splitRows, 'pa_200_less')
dyf_pa_200_less.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| CA_WAREHOUSE|             34|           123|        42|
| CO_WAREHOUSE|            340|           180|         2|
+-------------+---------------+--------------+----------+

For the second Dynamic Frame, see the following code:

# Input
dyf_pa_200_more = SelectFromCollection.apply(dyf_splitRows, 'pa_200_more')
dyf_pa_200_more.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| TX_WAREHOUSE|            220|           560|       350|
+-------------+---------------+--------------+----------+

Spigot

Spigot allows you to write a sample dataset to a destination during transformation. For our use case, we write the top 10 records locally:

# Input
dyf_splitFields = Spigot.apply(dyf_pa_200_less, '/home/glue/GlueLocalOutput/Spigot/', 'top10')

Depending on your local environment configuration, Spigot may run into errors. Alternatively, you can use an AWS Glue endpoint or an AWS Glue ETL job to run this function.

Write Dynamic Frame

The write_dynamic_frame function writes a DynamicFrame using the specified connection and format. For our use case, we write locally (we use a connection_type of S3 with a POSIX path argument in connection_options, which allows writing to local storage):

# Input
glueContext.write_dynamic_frame.from_options(\
frame = dyf_splitFields,\
connection_options = {'path': '/home/glue/GlueLocalOutput/'},\
connection_type = 's3',\
format = 'json')

Conclusion

This article discussed the PySpark ETL capabilities of AWS Glue. Further testing with an AWS Glue development endpoint or directly adding jobs in AWS Glue is a good pivot to take the learning forward. For more information, see General Information about Programming AWS Glue ETL Scripts.


About the Authors

Adnan Alvee is a Big Data Architect for AWS ProServe Remote Consulting Services. He helps build solutions for customers leveraging their data and AWS services. Outside of AWS, he enjoys playing badminton and drinking chai.

 

 

Imtiaz (Taz) Sayed is the World Wide Tech Leader for Data Analytics at AWS. He is an ardent data engineer and relishes connecting with the data analytics community.

 

Developing AWS Glue ETL jobs locally using a container

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/developing-aws-glue-etl-jobs-locally-using-a-container/

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. In the fourth post of the series, we discussed optimizing memory management. In this post, we focus on writing ETL scripts for AWS Glue jobs locally. AWS Glue is built on top of Apache Spark and therefore uses all the strengths of open-source technologies. AWS Glue comes with many improvements on top of Apache Spark and has its own ETL libraries that can fast-track the development process and reduce boilerplate code.

The AWS Glue team released the AWS Glue binaries and let you set up an environment on your desktop to test your code. We have used these libraries to create an image with all the right dependencies packaged together. The image has AWS Glue 1.0, Apache Spark, OpenJDK, Maven, Python3, the AWS Command Line Interface (AWS CLI), and boto3. We have also bundled Jupyter and Zeppelin notebook servers in the image so you don’t have to configure an IDE and can start developing AWS Glue code right away.

The AWS Glue team will release new images for various AWS Glue updates. The tags of the new images will follow the following convention: glue_libs_<glue-version>_image_<image-version>. For example, glue_libs_1.0.0_image_01. In this name, 1.0 is the AWS Glue major version, .0 is the patch version, and 01 is the image version. The patch version will be incremented for updates to the AWS Glue libraries of a major release. Image version will be incremented for the release of a new image of a major AWS Glue release. Both these increments will be reset with every major AWS Glue release. So, the first image released for AWS Glue 2.0 will be glue_libs_2.0.0_image_01.

We recommend pulling the highest image version for an AWS Glue major version to get the latest updates.

Prerequisites

Before you start, make sure that Docker is installed and the Docker daemon is running. For installation instructions, see the Docker documentation for Mac, Windows, or Linux. The machine running the Docker hosts the AWS Glue container. Also make sure that you have at least 7 GB of disk space for the image on the host running the Docker.

For more information about restrictions when developing AWS Glue code locally, see Local Development Restrictions.

Solution overview

In this post, we use amazon/aws-glue-libs:glue_libs_1.0.0_image_01 from Docker Hub. This image has only been tested for an AWS Glue 1.0 Spark shell (both for PySpark and Scala). It hasn’t been tested for an AWS Glue 1.0 Python shell.

We organize this post into the following three sections. You only have to complete one of the three sections (not all three) depending on your requirement:

  • Setting up the container to use Jupyter or Zeppelin notebooks
  • Setting up the Docker image with PyCharm Professional
  • Running against the CLI interpreter

This post uses the following two terms frequently:

  • Client – The system from which you access the notebook. You open a web browser on this system and put the notebook URL.
  • Host – The system that hosts the Docker daemon. The container runs on this system.

Sometimes, your client and host can be the same system.

Setting up the container to use Jupyter or Zeppelin notebooks

Setting up the container to run PySpark code in a notebook includes three high-level steps:

  1. Pulling the image from Docker Hub.
  2. Running the container.
  3. Opening the notebook.

Pulling the image from Docker Hub

If you’re running Docker on Windows, choose the Docker icon (right-click) and choose Switch to Linux containers… before pulling the image.

Open cmd on Windows or terminal on Mac and run the following command:

docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

Running the container

We pulled the image from Docker Hub in the previous step. We now run a container using this image.

The general format of the run command is:

docker run -itd -p <port_on_host>:<port_on_container_either_8888_or_8080> -p 4040:4040 <credential_setup_to_access_AWS_resources> --name <container_name> amazon/aws-glue-libs:glue_libs_1.0.0_image_01 <command_to_start_notebook_server>

The code includes the following information:

  • <port_on_host> – The local port of your host that is mapped to the port of the container. For our use case, the container port is either 8888 (for a Jupyter notebook) or 8080 (for a Zeppelin notebook). To keep things simple, we use the same port number as the notebook server ports on the container in the following examples.
  • <port_on_container_either_8888_or_8080> – The port of the notebook server on the container. The default port of Jupyter is 8888; the default port of Zeppelin is 8080.
  • 4040:4040 – This is required for SparkUI. 4040 is the default port for SparkUI. For more information, see Web Interfaces.
  • <credential_setup_to_access_AWS_resources> – In this section, we go with the typical case of mounting the host’s directory, containing the credentials. We assume that your host has the credentials configured using aws configure. The flow chart in the Appendix section explains various ways to set the credentials if the assumption doesn’t hold for your environment.
  • <container_name> – The name of the container. You can use any text here.

  • amazon/aws-glue-libs:glue_libs_1.0.0_image_01 – The name of the image that we pulled in the previous step.
  • <command_to_start_notebook_server> – We run /home/zeppelin/bin/zeppelin.sh for a Zeppelin notebook and /home/jupyter/jupyter_start.sh for a Jupyter notebook. If you want to run your code against the CLI interpreter, you don’t need a notebook server and can leave this argument blank.
The following example code starts a Jupyter notebook and passes read-only credentials from a Mac or Linux host:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh
The following example code starts a Jupyter notebook and passes read-write credentials from a Windows host:

docker run -itd -p 8888:8888 -p 4040:4040 -v %UserProfile%\.aws:/root/.aws:rw --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

To run a Zeppelin notebook, replace 8888:8888 with 8080:8080, glue_jupyter with glue_zeppelin, and /home/jupyter/jupyter_start.sh with /home/zeppelin/bin/zeppelin.sh. For example, the following command starts a Zeppelin notebook server and passes read-only credentials from a Mac or Linux host:

docker run -itd -p 8080:8080 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_zeppelin amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/zeppelin/bin/zeppelin.sh

You can now run the following command to make sure that the container is running:

docker ps

The Jupyter notebook is configured to allow connections from all IP addresses without authentication, and the Zeppelin notebook is configured to use anonymous access. This configuration makes sure that you can start working on your local machine with just two commands (docker pull and docker run). If your scenario mandates a different configuration, run the container without running the notebook startup script (/home/jupyter/jupyter_start.sh or /home/zeppelin/bin/zeppelin.sh). This starts the container but not the notebook server. You can then run the bash shell on the container using the following command, edit the required notebook configurations, and start the notebook server:

docker exec -it <container_name> bash

For example,

docker exec -it glue_jupyter bash.

The following example code is the docker run command without the notebook server startup:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01

If you’re running the container on Amazon Elastic Compute Cloud (Amazon EC2) instance, you have to set up your inbound rules in the security group to allow communication on the ports used by the notebook server. A broad inbound rule can create security risks. For more information, see AWS Security Best Practices.

Opening the notebook

If your client and host are the same machine, enter the following URL for Jupyter: http://localhost:8888.

You can write PySpark code in the notebook as shown here. You can also use SQL magic (%%sql) to directly write SQL against the tables in the AWS Glue Data Catalog. If your catalog table is on top of JSON data, you have to place json-serde.jar in the /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/jars directory of the container and restart the kernel in your Jupyter notebook. You can place the jar in this directory by first running the bash shell on the container using the following command:

docker exec -it <container_name> bash

If you have a local directory that holds your notebooks, you can mount it to /home/jupyter/jupyter_default_dir using the -v option. These notebooks are available to you when you open the Jupyter notebook URL. For example, see the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro -v C:\Users\admin\Documents\notebooks:/home/jupyter/jupyter_default_dir --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

The URL for Zeppelin is http://localhost:8080.

For Zeppelin notebooks, include %spark.pyspark on the top to run PySpark code.

If your host is Amazon EC2 and your client is your laptop, replace localhost in the preceding URLs with your host’s public IP.

Depending on your network or if you’re on a VPN, you might have to set an SSH tunnel. The general format of the tunnel is the following code:

ssh -i <absolute_path_to_your_private_key_for_EC2> -v -N -L <port_on_client>:<ip_of_the_container>:<port_8888_or_8080> [email protected]<public_ip_address_of_ec2_host>

Your security group controlling the EC2 instance should allow inbound on port 22 from the client. A broad inbound rule can create security risks. For more information, see AWS Security Best Practices.

You can get the <ip_of_the_container> under the IPAddress field when you run docker inspect <container_name>. For example: docker inspect glue_jupyter.

If you set up the tunnel, the URL to access the notebook is: http://localhost:<port_on_client>.

Use 8888 or 8080 for <port_8888_or_8080>, depending on if you’re running a Jupyter or Zeppelin notebook.

You can now use the following sample code to test your notebook:

from pyspark import SparkContext
from awsglue.context import GlueContext

glueContext = GlueContext(SparkContext.getOrCreate()) 
inputDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://awsglue-datasets/examples/us-legislators/all/memberships.json"]}, format = "json")
inputDF.toDF().show()

Although awsglue-datasets is a public bucket, you at least need the following permissions, attached to the AWS Identity and Access Management (IAM) user used for your container, to view the data:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "S3ReadOnly",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::awsglue-datasets/*"
        }
    ]
}

You can also see the databases in your AWS Glue Data Catalog using the following code:

spark.sql("show databases").show()

You need AWS Glue permissions to run the preceding command. The following are the minimum permissions required to run the code. Replace <account_number> with your account number and <region> with your Region:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "GlueAccess",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetDatabases"
            ],
            "Resource": [
                "arn:aws:glue:<region>:<account_number>:database/*",
                "arn:aws:glue:<region>:<account_number>:catalog"
            ]
        }
    ]
}

Similarly, you can query the AWS Glue Data Catalog tables too. If your host is Amazon EC2 instance, you see the catalog of the Region of your EC2 instance. If your host is local, you see the catalog of the Region set in your aws configure or your AWS_REGION variable.

You can stop here if you want to develop AWS Glue code locally using only notebooks.

Setting up the Docker image with PyCharm Professional

This section talks about setting up PyCharm Professional to use the image. For this post, we use Windows. There may be a few differences when using PyCharm on a Mac.

  1. Open cmd (or terminal for Mac) and pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01 using the following command:
    docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

    If you’re running Docker on Windows, choose the Docker icon (right-click) and choose Switch to Linux containers… before pulling the image.

  2. Choose the Docker icon (right-click) and choose Settings (this step isn’t required for Mac or Linux).
  3. In the General section, select Expose daemon on tcp://localhost:2375 without TLS (this step isn’t required for Mac or Linux). Note the warning listed under the checkbox. This step is based on PyCharm documentation.
  4. Choose Apply & Restart (this step isn’t required for Mac or Linux).
  5. Choose the Docker icon (right-click) and choose Restart… if the Docker doesn’t restart automatically (this step isn’t required for Mac or Linux).
  6. Open PyCharm and create a Pure Python project (if you don’t have one).
  7. Under File, choose Settings… (for Mac, under PyCharm, choose Preferences).
  8. Under Settings, choose Project Interpreter. In the following screenshot, GlueProject is the name of my project. Your project name might be different.
  9. Choose Show All… from the drop-down menu.
  10. Choose the + icon.

  11. Choose Docker.
  12. Choose New.
  13. For Name, enter a name (for example, Docker-Glue).
  14. Keep other settings at their default.
  15. If running on Windows, for Connect to Docker daemon with, select TCP socket and enter the Engine API URL.
    For this post, we enter tcp://localhost:2375 because Docker and PyCharm are on the same Windows machine.
    If running on a Mac, select Docker for Mac. No API URL is required.
  16. Make sure you see the message Connection successful.

For Windows, if you don’t see this message, Docker may not have restarted after you changed the settings in Step 4. Restart the Docker and repeat these steps again. For more information about connection settings, see PyCharm documentation.

The following screenshots show steps 13-16 in Windows and Mac.

  1. Choose OK.

You should now see the image listed in the drop-down menu.

  1. Choose the image that you pulled from Docker Hub (amazon/aws-glue-libs:glue_libs_1.0.0_image_01).
  2. Choose OK.

You now see the interpreter listed.

  1. Choose OK.

This lists all the packages in the image.

  1. Choose OK.

Steps 22-27 help you get AWS Glue-related code completion suggestions from PyCharm.

  1. Download the following file: https://s3.amazonaws.com/aws-glue-jes-prod-us-east-1-assets/etl-1.0/python/PyGlue.zip.
  2. Under File, choose Settings (for Mac, under PyCharm, choose Preferences).
  3. Under Project: <Project name>, choose Project Structure.
  4. Choose Add Content Root.
  5. Choose the newly downloaded PyGlue.zip file.
  6. In the Settings window, choose OK.
  7. Choose the project (right-click) and choose New, Python File.
  8. Enter a name for the Python file and press Enter.
  9. Enter the following code in the file and save it. For more information about the minimum permissions required to run this code, see this section.
    from pyspark import SparkContext
    from awsglue.context import GlueContext
    
    glueContext = GlueContext(SparkContext.getOrCreate()) 
    inputDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://awsglue-datasets/examples/us-legislators/all/memberships.json"]}, format = "json")
    inputDF.toDF().show()
    

  10. Choose Add Configuration.
  11. Choose the +icon.
  12. Under Add New Configuration, choose Python.
  13. For Name, enter a name.
  14. For Environment variables, enter the following:
    PYTHONPATH=/home/aws-glue-libs/awsglue.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/lib/pyspark.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/lib/py4j-0.10.7-src.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python

  15. For Script path, select the newly created script in Step 29.
  16. For Python interpreter, choose the newly created interpreter.
  17. Choose Docker Container Settings.
  18. Under Volume bindings, choose the +icon.
  19. For Host path, add the absolute path .aws folder that holds the credentials and the config files.
  20. For Container path, add /root/.aws.
  21. Choose OK.
  22. For Run/Debug Configurations, choose OK.
  23. Run the code by choosing the green button on the top right.

You can also see the databases in your AWS Glue Data Catalog using the following code. For more information about the minimum permissions required to run this code, see this section.

spark.sql("show databases").show()

Similarly, you can also query the catalog tables. If your host is Amazon EC2 instance, you see the catalog of the Region of your EC2 instance. If your host is local, you see the catalog of the Region set in your aws configure or your AWS_REGION variable.

PyCharm gives code completion suggestions for AWS Glue (see the following screenshot). This is possible because of the steps you completed earlier.

Running against the CLI interpreter

You can always run the bash shell on the container and run your PySpark code directly against the CLI interpreter in the container.

  1. Complete Pulling the image from Docker Hub step and Running the container step in the section Setting up the container to use Jupyter of Zeppelin notebooks.
  2. Run the bash shell on the container by entering the following code. Replace <container_name> with the name (--name argument) you used earlier.
    docker exec -it <container_name> bash

  3. Run one of the following commands:
    1. For PySpark, enter the following code:
      /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/pyspark

    2. For Scala, enter the following code:
      /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/spark-shell

Conclusion

In this post, we learned about a three-step process to get started on AWS Glue and Jupyter or Zeppelin notebook. Although notebooks are a great way to get started and a great asset to data scientists and data wranglers, data engineers generally have a source control repository, an IDE, and a well-defined CI/CD process. Because PyCharm is a widely used IDE for PySpark development, we showed how to use the image with PyCharm Professional. You can develop your code locally in your IDE and test it locally using the container, and your CI/CD process can run as it does with any other IDE and source control tool in your organization. Although we showed integration with PyCharm, you can similarly integrate the container with any IDE that you use to complete your CI/CD story with AWS Glue.


Appendix

The following section discusses various ways to set the credentials to access AWS resources (such as Amazon Simple Storage Service (Amazon S3), AWS Step Functions, and more) from the container.

You need to provide your AWS credentials to connect to an AWS service from the container. The AWS SDKs and CLIs use provider chains to look for AWS credentials in several different places, including system or user environment variables and in local AWS configuration files. For more information about how to set up credentials, see https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/credentials.html. To generate the credentials using the AWS Management Console, see Managing Access Keys (Console). For instructions on generating credentials with the AWS CLI, see create-access-key. For more information about generating credentials with an API, see CreateAccessKey.

The following flow chart shows the various ways to set up AWS credentials for the container. Most of these mechanisms don’t work with PyCharm because we use the image there and not the container. You can use the container as an SSH interpreter in PyCharm and then use one of the credential setting mechanisms listed here. However, that discussion is out of the scope of this post.

Note that the numbers, in brackets, match the code snippets that follow the chart.

(1) To find more info about the syntax of setting up the tunnel, see this.

(2) To set credentials using the docker cp command to copy credentials from the Windows host to the container, enter the following code (this example code uses the container name glue_jupyter):

docker cp %UserProfile%\.aws\.  glue_jupyter:/root/.aws

(3) To mount the host’s .aws directory on the container with rw option, see this.

(4) To mount the host’s .aws directory on the container with ro option, see this.

(5) To set the credentials in a file, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 --env-file /datalab_pocs/glue_local/env_variables.txt --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

/datalab_pocs/glue_local/env_variables.txt is the absolute path of the file holding the environment variables. The file should have the following variables:

  • AWS_ACCESS_KEY_ID=<Access_id>
  • AWS_SECRET_ACCESS_KEY=<Access_key>
  • AWS_REGION=<Region>

For more information about Regions, see Regions, Availability Zones, and Local Zones.

(6) To set the credentials in the docker run command, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -e AWS_ACCESS_KEY_ID=<ID> -e AWS_SECRET_ACCESS_KEY=<Key> -e AWS_REGION=<Region>  --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

(7) To set credentials using aws configure on the container, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh
docker exec -it glue_jupyter bash
aws configure


About the Author

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with the customers on their use cases, architects a solution to solve their business problems and helps the customers build an scalable prototype. Prior to his journey in AWS, Vishal helped customers implement BI, DW and DataLake projects in US and Australia.

 

 

 

How Aruba Networks built a cost analysis solution using AWS Glue, Amazon Redshift, and Amazon QuickSight

Post Syndicated from Siddharth Thacker original https://aws.amazon.com/blogs/big-data/how-aruba-networks-built-a-cost-analysis-solution-using-aws-glue-amazon-redshift-and-amazon-quicksight/

This is a guest post co-written by Siddharth Thacker and Swatishree Sahu from Aruba Networks.

Aruba Networks is a Silicon Valley company based in Santa Clara that was founded in 2002 by Keerti Melkote and Pankaj Manglik. Aruba is the industry leader in wired, wireless, and network security solutions. Hewlett-Packard acquired Aruba in 2015, making it a wireless networking subsidiary with a wide range of next-generation network access solutions.

Aruba Networks provides cloud-based platform called Aruba Central for network management and AI Ops. Aruba cloud platform supports thousands of workloads to support customer facing production environment and also a separate development platform for Aruba engineering.

The motivation to build the solution presented in this post was to understand the unit economics of the AWS resources used by multiple product lines across different organization pillars. Aruba wanted a faster, effective, and reliable way to analyze cost and usage data and visualize that into a dashboard. This solution has helped Aruba in multiple ways, including:

  • Visibility into costs – Multiple Aruba teams can now analyze the cost of their application via data surfaced with this solution
  • Cost optimization – The solution helps teams identify new cost-optimization opportunities by making them aware of the higher-cost resources with low utilization so they can optimize accordingly
  • Cost management – The Cloud DevOps organization, the group who built this solution, can effectively plan at the application level and have a direct positive impact on gross margins
  • Cost savings – With daily cost data available, engineers can see the monetary impact of right-sizing compute and other AWS resources almost immediately
  • Big picture as well as granular – Users can visualize cost data from the top down and track cost at a business level and a specific resource level

Overview of the solution

This post describes how Aruba Networks automated the solution, from generating the AWS Cost & Usage Report (AWS CUR) to its final visualization on Amazon QuickSight. In this solution, they start by configuring the CUR on their primary payer account, which publishes the billing reports to an Amazon Simple Storage Service (Amazon S3) bucket. Then they use an AWS Glue crawler to define and catalog the CUR data. As the new CUR data is delivered daily, the data catalog is updated, and the data is loaded into an Amazon Redshift database using Amazon Redshift Spectrum and SQL. The reporting and visualization layer is built using QuickSight. Finally, the entire pipeline is automated by using AWS Data Pipeline.

The following diagram illustrates this architecture.

Aruba prefers the AWS CUR Report to AWS Cost Explorer because AWS Cost Explorer provides usage information at a high level, and not enough granularity for detailed operations, such as data transfer cost. AWS CUR provides the most detailed information available about your AWS costs and usage at an hourly granularity. This allows the Aruba team to drill down the costs by the hour or day, product or product resource, or custom tags, enabling them to achieve their goals.

Aruba implemented the solution with the following steps:

  1. Set up the CUR delivery to a primary S3 bucket from the billing dashboard.
  2. Use Amazon S3 replication to copy the primary payer S3 bucket to the analytics bucket. Having a separate analytics account helps prevent direct access to the primary account.
  3. Create and schedule the crawler to crawl the CUR data. This is required to make the metadata available in the Data Catalog and update it quickly when new data arrives.
  4. Create respective Amazon Redshift schema and tables.
  5. Orchestrate an ETL flow to load data to Amazon Redshift using Data Pipeline.
  6. Create and publish dashboards using QuickSight for executives and stakeholders.

Insights generated

The Aruba DevOps team built various reports that provide the cost classifications on AWS services, weekly cost by applications, cost by product, infrastructure, resource type, and much more using the detailed CUR data as shown by the following screenshot.

For example, using the following screenshot, Aruba can conveniently figure out that compute cost is the biggest contributor compared to other costs. To reduce the cost, they can consider using various cost-optimization methods like buying reserved instances, savings plans, or Spot Instances wherever applicable.

Similarly, the following screenshot highlights the cost doubled compared to the first week of April. This helps Aruba to identify anomalies quickly and make informed decisions.

Setting up the CUR delivery

For instructions on setting up a CUR, see Creating Cost and Usage Reports.

To reduce complexity in the workflow, Aruba chose to create resources in the same region with hourly granularity, mainly to see metrics more frequently.

To lower the storage costs for data files and maximize the effectiveness of querying data with serverless technologies like Amazon Athena, Amazon Redshift Spectrum, and Amazon S3 data lake, save the CUR in Parquet format. The following screenshot shows the configuration for delivery options.

The following table shows some example CUR data.

bill_payer_account_idline_item_usage_account_idline_item_usage_start_dateline_item_usage_end_dateline_item_product_codeline_item_usage_typeline_item_operation
12345678911122233344400:00.000:00.0AmazonEC2USW2-EBS:VolumeP-IOPS.piopsCreateVolume-P-IOPS
12345678911122233344400:00.000:00.0AmazonEC2USW2-APN1-AWS-In-BytesLoadBalancing-PublicIP-In
12345678911122233344400:00.000:00.0AmazonEC2USW2-DataProcessing-BytesLoadBalancing
12345678911122233344400:00.000:00.0AmazonEC2USW2-EBS:SnapshotUsageCreateSnapshot
12345678955566677788800:00.000:00.0AmazonEC2USW2-EBS:SnapshotUsageCreateSnapshot
12345678955566677788800:00.000:00.0AmazonEC2USW2-EBS:SnapshotUsageCreateSnapshot
12345678955566677788800:00.000:00.0AmazonEC2USW2-DataTransfer-Regional-BytesInterZone-In
12345678955566677788800:00.000:00.0AmazonS3USW2-Requests-Tier2ReadLocation
12345678955566677788800:00.000:00.0AmazonEC2USW2-DataTransfer-Regional-BytesInterZone-In

Replicating the CUR data to your analytics account

For security purposes, other teams aren’t allowed to access the primary (payer) account, and therefore can’t access CUR data generated from that account. Aruba replicated the data to their analytics account and build the cost analysis solution there. Other teams can access the cost data without getting access permission for the primary account. The data is replicated across accounts by adding an Amazon S3 replication rule in the bucket. For more information, see Adding a replication rule when the destination bucket is in a different AWS account.

Cataloging the data with a crawler and scheduling it to run daily

Because AWS delivers all daily reports in a report date range report-prefix/report-name/yyyymmdd-yyyymmdd folder, Aruba uses AWS Glue crawlers to crawl through the data and update the catalog.

AWS Glue is a fully managed ETL service that makes it easy to prepare and load the data for analytics. Once the AWS Glue is pointed to the data stored on AWS, it discovers the data and stores the associated metadata (such as table definition and schema) in the Data Catalog. After the data is cataloged, the data is immediately searchable, queryable, and available for ETL. For more information, see Populating the AWS Glue Data Catalog.

The following screenshot shows the crawler created on Amazon S3 location of the CUR data.

The following code is an example table definition populated by the crawler.:

CREATE EXTERNAL TABLE `cur_parquet`(
  `identity_line_item_id` string, 
  `identity_time_interval` string, 
  `bill_invoice_id` string, 
………
………
  `resource_tags_user_infra_role` string)

PARTITIONED BY ( 
  `year` string, 
  `month` string )

ROW FORMAT SERDE  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://curS3bucket/Parquet/'

Transforming and loading using Amazon Redshift

Next in the analytics service, Aruba chose Amazon Redshift over Athena. Aruba has a use case to integrate cost data together with other tables already present in Amazon Redshift and hence using the same service makes it easy to integrate with their existing data. To further filter and transform data at the same time, and simplify the multi-step ETL, Aruba chose Amazon Redshift Spectrum. It helps to efficiently query and load CUR data from Amazon S3. For more information, see Getting started with Amazon Redshift Spectrum.

Use the following query to create an external schema and map it to the AWS Glue database created earlier in the Data Catalog:

--Choose a schema name of your choice, cur_redshift_external_schema name is just an example--
 create external schema cur_redshift_spectrum_external_schema from data catalog database 
 'aruba_curr_db' iam_role 'arn:aws:iam::xxxxxxxxxxxxx:role/redshiftclusterrole' 
 create external database if not exists;

The table created in the Data Catalog appears under the Amazon Redshift Spectrum schema. The schema, table, and records created can be verified with the following SQL code:

SELECT Count(*) 
FROM   cur_redshift_spectrum_external_schema.<TABLE>; 

--Query the right partition, year=2020 and month=2 is used an example
SELECT Count(*) 
FROM   cur_redshift_spectrum_external_schema.<TABLE> 
WHERE  year=2020 
AND    month=2;

Next, transform and load the data into the Amazon Redshift table. Aruba started by creating an Amazon Redshift table to contain the data. The following SQL code can be used to create the production table with the desired columns:

CREATE TABLE redshift_schema.redshift_table 
  ( 
     usage_start_date TIMESTAMP, 
     usage_end_date   TIMESTAMP, 
     service_region   VARCHAR (256), 
     service_az       VARCHAR (256), 
     aws_resource_id  VARCHAR (256), 
     usage_amount     FLOAT (17), 
     charge_currency  VARCHAR (256), 
     aws_product_name VARCHAR (256), 
     instance_family  VARCHAR (256), 
     instance_type    VARCHAR (256), 
     unblended_cost   FLOAT (17), 
     usage_cost       FLOAT (17)
  ); 

CUR is dynamic in nature, which means that some columns may appear or disappear with each update. When creating the table, we take static columns only. For more information, see Line item details.

Next, insert and update to ingest the data from Amazon S3 to the Amazon Redshift table. Each CUR update is cumulative, which means that each version of the CUR includes all the line items and information from the previous version.

The reports generated throughout the month are estimated and subject to change during the rest of the month. AWS finalizes the report at the end of each month. Finalized reports have the calculations for the blended and unblended costs, and cover all the usage for the month. For this use case, Aruba updates the last 45 days of data to make sure the finalized cost is captured. The below sample query can be used to verify the updated data:

-- Create Table Statement
 INSERT INTO redshift_schema.redshift_table
            (usage_start_date, 
             usage_end_date, 
             service_region, 
             service_az, 
             aws_resource_id, 
             usage_amount, 
             charge_currency, 
             aws_product_name, 
             instance_family, 
             instance_type, 
             unblended_cost,
             Usage_Cost ) 
 SELECT line_item_usage_start_date, 
       line_item_usage_end_date, 
       line_item_operation, 
       line_item_availability_zone, 
       line_item_resource_id, 
       line_item_usage_amount, 
       line_item_currency_code, 
       product_product_name, 
       product_instance_family, 
       product_instance_type, 
       line_item_unblended_cost,
       case when line_item_type='Usage' then line_item_unblended_cost
            else 0
            end as usage_cost 
 FROM   cur_redshift_external_schema.cur_parquet_parquet
 WHERE  line_item_usage_start_date >= date_add('day', -45, getdate()) 
       AND line_item_usage_start_date < date_add('day', 1, getdate()); 

Using Data Pipeline to orchestrate the ETL workflow

To automate this ETL workflow, Aruba chose Data Pipeline. Data Pipeline helps to reliably process and move data between different AWS compute and storage services, as well as on-premises data sources. With Data Pipeline, Aruba can regularly access their data where it’s stored, transform and process it at scale, and efficiently transfer the results to AWS services such as Amazon S3, Amazon Relational Database Service (Amazon RDS), Amazon DynamoDB, and Amazon EMR. Although the detailed steps of setting up this pipeline are out of scope for this blog, there is a sample workflow definition JSON file, which can be imported after making the necessary changes.

Data Pipeline workflow

The following screenshot shows the multi-step ETL workflow using Data Pipeline. Data Pipeline is used to run the INSERT query daily, which inserts and updates the latest CUR data into our Amazon Redshift table from the external table.

In order to copy data to Amazon Redshift,  RedshiftDataNode and RedshiftCopyActivity can be used, and then scheduled to run periodically.

Sharing metrics and creating visuals with QuickSight

To share the cost and usage with other teams, Aruba choose QuickSight using Amazon Redshift as the data source. QuickSight is a native AWS service that seamlessly integrates with other AWS services such as Amazon Redshift, Athena, Amazon S3, and many other data sources.

As a fully managed service, QuickSight lets Aruba to easily create and publish interactive dashboards that include ML Insights. In addition to building powerful visualizations, QuickSight provides data preparation tools that makes it easy to filter and transform the data into the exact needed dataset. As a cloud-native service, dashboards can be accessed from any device and embedded into applications and portals, allowing other teams to monitor their resource usage easily. For more information about creating a dataset, see Creating a Dataset from a Database. Quicksight Visuals can then be created from this dataset.

The following screenshot shows a visual comparison of device cost and count to help find the cost per device. This visual helped Aruba quickly identify the cost per device increase in April and take necessary actions.

Similarly, the following visualization helped Aruba identify an increase in data transfer cost and helped them decide to invest in rearchitecting their application.

The following visualization classifies the cost spend per resource.

Conclusion

In this post, we discussed how Aruba Networks was able to successfully achieve the following:

  • Generate CUR and use AWS Glue to define data, catalog the data, and update the metadata
  • Use Amazon Redshift Spectrum to transform and load the data to Amazon Redshift tables
  • Query, visualize, and share the data stored using QuickSight
  • Automate and orchestrate the entire solution using Data Pipeline

Aruba use this solution to automatically generate a daily cost report and share it with their stakeholders, including executives and cloud operations team.

 


About the Authors

Siddharth Thacker works in Business & Finance Strategy in Cloud Software division at Aruba Networks. Siddharth has Master’s in Finance with experience in industries like banking, investment management, cloud software and focuses on business analytics, margin improvement and strategic partnerships at Aruba. In his spare time, he likes exploring outdoors and participate in team sports.

Swatishree Sahu is a Technical Data Analyst at Aruba Networks. She has lived and worked in India for 7 years as an SME for SOA-based integration tools before coming to US to pursue her master’s in Business Analytics from UT Dallas. Breaking down and analyzing data is her passion. She is a Star Wars geek, and in her free time, she loves gardening, painting, and traveling.

Ritesh Chaman is a Technical Account Manager at Amazon Web Services. With 10 years of experience in the IT industry, Ritesh has a strong background in Data Analytics, Data Management, and Big Data systems. In his spare time, he loves cooking (spicy Indian food), watching sci-fi movies, and playing sports.

 

 

 

Kunal Ghosh is a Solutions Architect at AWS. His passion is to build efficient and effective solutions on the cloud, especially involving Analytics, AI, Data Science, and Machine Learning. Besides family time, he likes reading and watching movies, and is a foodie.

Optimize Python ETL by extending Pandas with AWS Data Wrangler

Post Syndicated from Satoshi Kuramitsu original https://aws.amazon.com/blogs/big-data/optimize-python-etl-by-extending-pandas-with-aws-data-wrangler/

Developing extract, transform, and load (ETL) data pipelines is one of the most time-consuming steps to keep data lakes, data warehouses, and databases up to date and ready to provide business insights. You can categorize these pipelines into distributed and non-distributed, and the choice of one or the other depends on the amount of data you need to process.

Apache Spark is widely used to build distributed pipelines, whereas Pandas is preferred for lightweight, non-distributed pipelines. With the second use case in mind, the AWS Professional Service team created AWS Data Wrangler, aiming to fill the integration gap between Pandas and several AWS services, such as Amazon Simple Storage Service (Amazon S3), Amazon Redshift, AWS Glue, Amazon Athena, Amazon Aurora, Amazon QuickSight, and Amazon CloudWatch Log Insights.

AWS Data Wrangler is an open-source Python library that enables you to focus on the transformation step of ETL by using familiar Pandas transformation commands and relying on abstracted functions to handle the extraction and load steps.

You can use AWS Data Wrangler in different environments on AWS and on premises (for more information, see Install). This post focuses on data preparation for a data science project on Jupyter. By the end of this walkthrough, you will be able to set up AWS Data Wrangler on your Amazon SageMaker notebook.

Use case overview

In the following walkthrough, you use data stored in the NOAA public S3 bucket. For more information, see NOAA Global Historical Climatology Network Daily. The objective is to convert 10 CSV files (approximately 240 MB total) to a partitioned Parquet dataset, store its related metadata into the AWS Glue Data Catalog, and query the data using Athena to create a data analysis.

Configuring Amazon S3

Your first step is to create an S3 bucket to store the Parquet dataset.

  1. On the Amazon S3 console, choose Create bucket.
  2. For Bucket name, enter a name for your bucket.

  1. Choose Create.

Creating a new database in the Data Catalog

The Data Catalog is an Apache Hive-compatible managed metadata storage that lets you store, annotate, and share metadata on AWS.

For this use case, you use it to store the metadata associated with your Parquet dataset. The Data Catalog is integrated with many analytics services, including Athena, Amazon Redshift Spectrum, and Amazon EMR (Apache Spark, Apache Hive, and Presto).

  1. On the AWS Glue console, choose Databases.
  2. Choose Add database.
  3. For Database name, enter awswrangler_test.
  4. Choose Create.

Launching an Amazon SageMaker notebook

An Amazon SageMaker notebook is a managed instance running the Jupyter Notebook app. For this use case, you use it to write and run your code.

  1. On the Amazon SageMaker console, choose Notebook instance.
  2. Choose Create a notebook instance.
  3. For Notebook instance name, enter a name.
  4. For IAM role, choose an existing AWS Identity and Access Management (IAM) role or create a role that allows you to run Amazon SageMaker and grants access to Amazon S3, Athena, and AWS Glue for the related resources.

  1. Wait for the notebook status to show as InService.
  2. Choose Open Jupyter from the notebook instance you created.

Exploring the data

This section walks you through several notebook paragraphs to expose how to install and use AWS Data Wrangler.

  1. On Jupyter console, under New, choose conda_python3.
  2. To install AWS Data Wrangler, enter the following code:
    !pip install awswrangler

  3. To avoid dependency conflicts, restart the notebook kernel by choosing kernel -> Restart.
  4. Import the library given the usual alias wr:
    import awswrangler as wr

  5. List all files in the NOAA public bucket from the decade of 1880:
    wr.s3.list_objects("s3://noaa-ghcn-pds/csv/188")

The following screenshot shows the output.

  1. Load the whole decade (10 files) into a Pandas DataFrame using the Amazon S3 prefix s3://noaa-ghcn-pds/csv/188:
    col_names = ["id", "dt", "element", "value", "m_flag", "q_flag", "s_flag", "obs_time"]
    
    df = wr.s3.read_csv(
        path="s3://noaa-ghcn-pds/csv/188",
        names=col_names,
        parse_dates=["dt", "obs_time"]  # Hint to parse these columns as date instead of strings
    )

    The following screenshot shows the output.

  1. Create a new column extracting the year from the dt column (the new column is useful for creating partitions in the Parquet dataset):
    df["year"] = df["dt"].dt.year

The following screenshot shows the output.

  1. Store the Pandas DataFrame in the S3 bucket you created in the beginning of this post (replace the [BUCKET] placeholder in the code with your bucket name):
    wr.s3.to_parquet(
        df=df,
        path="s3://[BUCKET]/noaa/",
        dataset=True,
        database="awswrangler_test",
        table="noaa",
        partition_cols=["year"]
    );

The preceding code creates the table noaa in the awswrangler_test database in the Data Catalog.

  1. After processing this, you can confirm the Parquet files exist in Amazon S3 and the table noaa is in AWS Glue data catalog. See the following code:
    wr.s3.list_objects("s3://[BUCKET]/noaa/")
    wr.catalog.table(database="awswrangler_test", table="noaa")

The following screenshot shows the output.

  1. Run a SQL query from Athena that filters only the US maximum temperature measurements of the last 3 years (1887–1889) and receive the result as a Pandas DataFrame:
    sql = """
    SELECT
        dt,
        (value / 10.0) AS temperature  -- Converting tenths of degrees C to regular degrees C
    FROM noaa
    WHERE year BETWEEN 1887 AND 1889  -- Only last 3 years (PARTITION filter)
    AND substr(id, 1, 2)='US'  -- Only U.S. stations
    AND element='TMAX'  -- Only Maximum temperature elements
    AND q_flag is NULL  -- Only HIGH quality measurement
    """
    
    df = wr.athena.read_sql_query(sql, database="awswrangler_test")

The following screenshot shows the output.

The following two queries illustrate how you can visualize the data.

  1. To plot the average maximum temperature measured in the tracked station, enter the following code:
    %matplotlib inline
    df.groupby("dt").mean().plot();

The following screenshot shows the output.

  1. To plot a moving average of the previous metric with a 30-day window, enter the following code:
    %matplotlib inline
    df.groupby("dt").mean().rolling(window=30, center=True).mean().plot();

The following screenshot shows the output.

Cleaning up

To avoid incurring future charges, delete the resources from the following services:

  1. AWS Glue database
    • On the AWS Glue console, choose the database you created.
    • From the Actions drop-down menu, choose Delete database.
    • Choose Delete.
  2. Amazon SageMaker notebook
    • On the Amazon SageMaker console, choose the notebook instance you created.
    • From the Actions drop-down menu, choose Stop.
    • When the status shows as Stopped, choose Database.
    • Choose Delete.
  3. S3 bucket
    • On the Amazon S3 console, choose Buckets.
    • Choose the bucket you created.
    • Choose Empty and enter your bucket name.
    • Choose Confirm.
    • Choose Delete and enter your bucket name.
    • Choose Delete bucket.
  4. IAM Role
    • On the IAM console, choose Roles.
    • Choose the role you attached to Amazon SageMaker.
    • Choose Delete role.
    • Choose Yes.

Conclusion

Installing AWS Data Wrangler is a breeze. With a single command, you can connect ETL tasks to multiple data sources and different data services. The library is a work in progress, with new features and enhancements added regularly. For more tutorials, see the GitHub repo.

 


About the Authors

Satoshi Kuramitsu is a Solutions Architect in AWS. His favorite AWS services are AWS Glue, Amazon Kinesis, and Amazon S3.

 

 

 

 

 

Igor Tavares is a Data & Machine Learning Engineer in the AWS Professional Services team and the original creator of AWS Data Wrangler.

 

 

 

Stream Twitter data into Amazon Redshift using Amazon MSK and AWS Glue streaming ETL

Post Syndicated from Jobin George original https://aws.amazon.com/blogs/big-data/stream-twitter-data-into-amazon-redshift-using-amazon-msk-and-aws-glue-streaming-etl/

Real-time analytics provide a point-in-time view for a variety of use cases. At the heart of any real-time solution is streaming data processing, especially when dynamic new content is being continually regenerated. Organizations might start using streaming data for simple analytics from logs or basic arithmetic dashboards, but eventually develop applications to perform more sophisticated forms of analysis, including machine learning, and extract deeper insights.

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. In addition to capabilities such as a Data Catalog, automated schema discovery, automated code generation, and deduplication of data, AWS Glue is serverless, and you don’t have to provision resources while paying for what you use. AWS Glue recently released serverless streaming ETL, which makes it easy to set up continuous ingestion pipelines that stream data from various sources using Amazon Kinesis and Apache Kafka and load data to data lakes, data warehouses, and other data stores while cleaning and enriching the data as needed.

This post demonstrates how customers, system integrator (SI) partners, and developers can use the serverless streaming ETL capabilities of AWS Glue with Amazon Managed Streaming for Kafka (Amazon MSK) to stream data to a data warehouse such as Amazon Redshift. We also show you how to view Twitter streaming data on Amazon QuickSight via Amazon Redshift.

Background

Before AWS Glue streaming ETL, you had to stitch multiple components together. For example, to stream real-time data from a social media feed, you needed to use either Amazon MSK or Kinesis to load data, using a combination of AWS Lambda and Amazon Simple Storage Service (Amazon S3) with multiple staging buckets. With AWS Glue streaming ETL, you can now simplify your pipeline with reduced touchpoints that better allow you to focus on business outcomes rather than pipeline management.

For more information about how AWS Glue streaming ETL integrates with Amazon Kinesis, see New – Serverless Streaming ETL with AWS Glue.

The following architecture shows an end-to-end implementation of a streaming solution using Amazon MSK, AWS Glue streaming ETL, Amazon Redshift, and QuickSight.

To illustrate how to set up this architecture, we’ll walk you through the following steps:

  1. Deploying an AWS CloudFormation template to launch a three-node MSK cluster and a Kafka client with an instance of Apache NiFi running on it
  2. Creating a Kafka topic and accessing the Apache NiFi UI
  3. Configuring data streams from Twitter to Kafka using Apache NiFi
  4. Creating an Amazon Redshift cluster and a table to persist streaming data
  5. Creating Amazon MSK and Amazon Redshift tables on AWS Glue Data Catalog tables
  6. Authoring an AWS Glue streaming ETL job to load data to Amazon Redshift
  7. Visualizing data from Amazon Redshift in QuickSight
  8. Cleaning up your resources

By default, the CloudFormation template launches the cluster on kafka.m5.large nodes and the client instance on m5.2xlarge, but you may choose to configure it with the instance type appropriate for your use case.

Prerequisites

Make sure to complete the following steps as prerequisites:

Launching your CloudFormation stack

To create your resources for this use case, complete the following steps:

  1. Launch your stack in us-east-1:

  2. On the Quick create stack page, for Stack Name, enter Twitter-MSK-Glue-Redshift-Blog.
  3. For KeyName, choose KeyPair.
  4. For SSH location, enter your IP to log in to the Kafka client instance.

To find your IP, use checkip.amazonaws.com.

  1. Choose Create stack.

The stack creation can take up to 15 minutes to complete.

  1. When the stack creation is complete, on the Stack Outputs tab, record the values of the following:
    1. KafkaNiFiEC2Instance
    2. MSKSecurityGroupID
    3. PrivateSubnetOne
    4. RedshiftEndpoint

Creating a Kafka topic and accessing the NiFi UI

With the .pem key, you can now SSH to the NiFi node and do a local port forwarding to access the web interface on your local computer. We use Apache NiFi to easily configure and pull data from Twitter and publish it to Amazon MSK without dealing with coding. You may use any tool to poll data from Twitter and publish to Amazon MSK.

  1. To access the NiFi UI from your local system, use the following command to set up an SSH tunnel into the NiFi instance (replace KafkaNiFiEC2Instance with the information from the AWS CloudFormation output) running on port 8888:
ssh -i ~/Downloads/KeyPair.pem -L 8888:Localhost:8888 [email protected]: <KafkaNiFiEC2Instance> 

This command allows you to access NiFi via a browser running on your local system. Leave that terminal open to remain connected.

  1. To create a Kafka topic, enter the following code in the terminal (replace ZookeeperConnectString with your Amazon MSK cluster ZooKeeper URL):
/opt/kafka/bin/kafka-topics.sh --create --zookeeper <ZookeeperConnectString> --replication-factor 3 --partitions 1 --topic CovidTweets 

For instructions on finding your ZooKeeper URL, see Getting the Apache ZooKeeper Connection String for an Amazon MSK Cluster.

The following screenshot shows the resulting output.

  1. While the terminal is connected, launch your browser and use the following URL to access NiFi UI: http://localhost:8888/nifi/.

You should be able to view the NiFi cluster UI (see the following screenshot). You can see four processors already added on to the canvas.

NiFi supports user authentication via client certificates, username and password, Apache Knox, or OpenId Connect. For this post, we keep it open without security configuration, but make sure you have robust security in place for your NiFi instances when used for your own use cases. For more information, see Security Configuration.

Configuring data streams from Twitter to Amazon MSK using Apache NiFi

The following steps take you through connecting to Twitter and pulling data related to Twitter handles without coding.

  1. Choose the GetTwitter
  2. On the Properties tab, enter the following Twitter credentials:
    1. API key
    2. API secret key
    3. Access token
    4. Access token secret
  3. Choose Apply.

For security purposes, you can’t read the credentials entered. Additionally, in these configurations, you filter Tweets based on the term COVID19. You can add a comma-separated list if you want to customize these values.

  1. Choose the PublishKafka
  2. On the Properties tab, for Kafka Brokers, enter your comma-separated SSL Amazon MSK bootstrap URL (running on port 9094).

For instructions on finding the broker URL, see Getting the Bootstrap Brokers for an Amazon MSK cluster.

The Topic Name value is already set to CovidTweets, but you can change the topic name if you prefer a different name.

  1. Choose Apply.

You’re now ready to start the flow and stream data from the Twitter API into the MSK cluster. However, before you start streaming Twitter data, create the Data Catalog tables and author the AWS Glue streaming job.

Creating an Amazon Redshift cluster and target table

As part of the AWS CloudFormation deployment, you create a single-node Amazon Redshift cluster. To create the target table for storing relevant fields extracted from Tweets, connect to the cluster and complete the following steps:

  1. On the Amazon Redshift console, connect to the query editor.
  2. Enter the following credentials:
    1. Cluster – Choose the cluster with endpoint noted earlier
    2. Database namestreaming-data
    3. Database userawsuser
    4. Database passwordStr0ngPas$wd
  3. On the query editor page, enter the following DDL command to create a table named msk_tweets:
create table msk_tweets(created_at VARCHAR(max),id_str VARCHAR(100),text VARCHAR(max), source VARCHAR(max),user_location VARCHAR(1000),hashtags1 VARCHAR(1000),hashtags2 VARCHAR(1000), lang VARCHAR(max))

You need to modify these tables and fields per the use case.

Creating the Amazon MSK and Amazon Redshift Data Catalog tables in AWS Glue

This section walks you through creating your connections to Amazon MSK and Amazon Redshift, crawling Amazon Redshift, and creating Data Catalog tables to use as the target for the AWS Glue streaming ETL job.

Creating the Amazon MSK connection

To create your Amazon MSK connection, complete the following steps:

  1. On the AWS Glue console, choose Catalog.
  2. Choose Connection.
  3. Choose Add connection.
  4. On the Set up your connection’s properties page, for Connection name, enter MSK_Connection.
  5. For Connection type, choose Kafka.
  6. For Kafka bootstrap server URLs, enter your Amazon MSK SSL bootstrap URL running on port 9094.

For instructions on finding your broker URL, see Getting the Bootstrap Brokers for an Amazon MSK Cluster.

  1. Choose Next.

  1. On the Set up access to your data store page, for VPC, choose the VPC containing the name MSK-GLUE-VPC.
  2. For Subnet, choose the subnet containing the name MMPrivateSubnetOne.
  3. For Security groups, select the group with the prefix MSK-Glue-Redshift-MSKSecurityGroup.
  4. Choose Next.

  1. Review the information and choose Finish.

Creating the Amazon Redshift connection

You’re now ready to create the Amazon Redshift connection.

  1. On the AWS Glue Data Catalog Connection page, choose Add connection.
  2. For Connection name, enter Redshift_Connection.
  3. For Connection type, choose Amazon Redshift.
  4. Choose Next.
  5. On the next page, choose the cluster you created as part of the CloudFormation stack.
  6. Enter the following information:
    1. Database namestreaming-data
    2. Usernameawsuser
    3. Password – Str0ngPas$wd
  7. Choose Next.
  8. Review the details and choose Finish.

You can test the connection when creation is complete.

Crawling the database

You can now create an AWS Glue Data Catalog for your Amazon Redshift table by crawling the database using the connection you just created.

  1. On the AWS Glue Data Catalog Crawlers page, choose Add crawlers.
  2. For Crawler name, enter Redshift_Crawler.
  3. Choose Next.
  4. Choose Data stores.
  5. Chose Next.
  6. On the Add a data store page, for Choose a data store, choose JDBC.
  7. For Connection, choose Redshift_Connection.
  8. For Include path, enter streaming-data.
  9. Choose Next.

  1. On the page asking if you want to add additional data stores, choose No.
  2. Choose Next.
  3. On the Choose IAM role page, choose Glue_Service Role (you created this as part of the CloudFormation stack).
  4. Choose Next.
  5. For Frequency, choose Run on demand.
  6. Choose Next.
  7. On the next page, select an AWS Glue database and choose Next.

If you don’t have a database, choose Add database and create one.

  1. Review the information and choose Finish.

When you’re prompted to run the crawler, choose the prompt. It may take a few minutes for the crawler to finish, after which you can verify in the Data Catalog table section that you have a table called streaming_data_public_msk_tweets with Amazon Redshift classification.

Creating your table

You can now create a table for the Amazon MSK topic.

  1. On the Catalog page, choose Tables.
  2. Choose Add tables.
  3. Choose Add tables manually.
  4. For Table name, enter msk_covidtweets.
  5. Choose the database you created earlier.
  6. Choose Next.
  7. On the Add a data store page, for Select the type of source, select Kafka.
  8. For Topic name, enter CovidTweets.
  9. For Connection, enter MSK_Connection.
  10. Choose Next.

  1. On the next page, for Classification, choose JSON.
  2. Choose Next.
  3. On the Define schema page, choose Add column.
  4. Add the following commas, with Data type as string:
    1. id_str
    2. created_at
    3. source
    4. text
    5. location
    6. hashtags[0].text
    7. hashtags[1].text
    8. lang

  1. Choose Next.
  2. Review and choose Finish.

Authoring an AWS Glue streaming ETL job

In the following steps, you author a streaming ETL job.

  1. On the AWS Glue console, choose Jobs.
  2. Choose Add job.
  3. For Name, enter MSK-Glue-Redshift.
  4. For IAM role¸ choose Glue_Service_role.
  5. For Type, choose Spark Streaming.
  6. For This job runs, select A proposed script generated by AWS Glue.
  7. Leave other fields at their default.
  8. Choose Next.

  1. On the Choose a data source page, select msk_covidtweets.

  1. Choose Next.
  2. On the Choose a data target page, select streaming_data_public_msk_tweets.

  1. Choose Next.
  2. On the map source columns to target, verify that the columns are mapped correctly.
  3. Choose Save job and edit script.
  4. On the next page, verify that on the last line of the script, windowSize is set to 5 seconds.
  5. Choose Save.
  6. Choose Run job.

The AWS Glue streaming ETL job may take a few minutes to start running, after which the streaming from Amazon MSK starts.

While you’re waiting, you can start the NiFi Twitter flow to publish messages to Amazon MSK.

Starting the NiFi flow to stream Twitter data

To start your NiFi flow to stream data from Twitter and publish it to Amazon MSK, complete the following steps:

  1. Navigate back to the NiFi UI running on http://localhost:8888/nifi/.
  2. Choose the canvas (right-click) and choose Start.

After the NiFi flow starts, you can see that the Twitter data is flowing from GetTwitter and is pushed to the MSK cluster using the PublishKafka processor. When the publish is successful, the data is pending in the success queue and is truncated after 60 seconds.

After the flow begins, data is published to Amaon MSK. The AWS Glue streaming ETL job loads the data into the Amazon Redshift table msk_tweets. You may notice that the data is being queued up in the success connection of Publish_to_MSK, indicating that the data was successfully published to Amazon MSK.

Visualizing Twitter data from Amazon Redshift using QuickSight

This section reviews the steps to visualize the data from the Twitter feed.

  1. Create a new analysis in QuickSight.
  2. Create a new dataset with Amazon Redshift as the source.
  3. Choose msk_tweets as the Amazon Redshift table.
  4. Choose the Custom SQL
  5. Enter the following query:
select cast(created_at as timestamp) as create_timestamp, extract(minute from cast(created_at as timestamp)) as extracted_minutes,id_str,text,split_part(split_part(source,'>',2),'<',1) as formatted_source,user_location,hashtags1,hashtags2,lang from msk_tweets;
  1. Choose the Directly query your data option to query real-time data directly from the database.
  2. Choose Visualize.

You can select various visual types such as stacked area line chart, pie chart, hash cloud, and bar charts on QuickSight to build the following dashboard.

For instructions on building a QuickSight dashboard, see Tutorial: Create a Dashboard. For more information about improving dashboard performance, see Speed up your ELT and BI queries with Amazon Redshift materialized views.

Cleaning up

To clean up your resources, delete the AWS Glue database, tables, crawlers, and job, and service role.

Additionally, be sure to clean up all other AWS resources that you created using AWS CloudFormation. You can delete these resources on the AWS CloudFormation console or via the AWS Command Line Interface (AWS CLI) by deleting the stack named Twitter-MSK-Glue-Redshift-Blog.

Conclusion

In this post, we demonstrated a use case for building a serverless and cost-effective ETL pipeline for streaming, which allows you focus on the outcomes of your analytics. The CloudFormation template gives you an easy way to set up the process, which you can further modify to meet your specific use case needs. You can also modify your serverless AWS Glue ETL code with transformations and mappings to ensure that only valid data gets loaded to your data store. With this solution, you can use AWS Glue streaming as a mechanism to solve your streaming ETL use cases.

Please let us know if you have comments about this post!

 


About the Authors

Jobin George is a Sr. Partner Solutions Architect at AWS. He has more than a decade of experience with designing and implementing large scale Big Data and Analytics solutions. He provides technical guidance, design advice and thought leadership to some of the key AWS customers and Big Data partners.

 

 

 

 

Mahesh Goyal is a Data Architect in Big Data at AWS. He works with customers in their journey to the cloud with a focus on big data and data warehouses. In his spare time, Mahesh likes to listen to music and explore new food places with his family.

 

 

 

 

 

Dilip Rajan is a Partner Solutions Architect at AWS. His role is to help partners and customers design and build solutions at scale on AWS. Before AWS, he helped Amazon Fulfillment Operations migrate their Oracle Data Warehouse to Redshift while designing the next generation big data analytics platform using AWS technologies.

AWS Glue version 2.0 featuring 10x faster job start times and 1-minute minimum billing duration

Post Syndicated from Harunobu Kameda original https://aws.amazon.com/blogs/aws/aws-glue-version-2-0-featuring-10x-faster-job-start-times-and-1-minute-minimum-billing-duration/

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy for customers to prepare and load their data for analytics. Glue is “serverless” – you don’t need to provision or manage any resources and you only pay for resources when Glue is actively running.

AWS Glue version 2.0 is now generally available and features Spark ETL jobs that start 10x faster. This reduction in startup latencies reduces overall job completion times, supports customers with micro-batching and time-sensitive workloads, and increases business productivity by enabling interactive script development and data exploration.

AWS Glue version 2.0 featuring 10x faster Spark ETL job start times, is now generally available. With Glue version 2.0, job startup delay is more predictable and has less overhead. In addition, AWS Glue version 2.0 Spark jobs will be billed in 1-second increments with a 10x lower minimum billing duration—from a 10-minute minimum to a 1-minute minimum. As a result, customers can now run micro-batch, deadline sensitive, interactive workloads more cost effectively. Customers can run micro-batch jobs to quickly load data lakes, data warehouses, and databases and enable real-time analytics. With faster job start times, customers can run SLA driven data pipelines more reliably. Faster job start times also enable interactive data exploration and experimentation. Glue version 2.0 also provides a new capability to install Python modules from a wheel file or from a repository.

How it works

Let’s see how it works on the AWS Management Console. Benefiting from this new feature is easy—you can create new Glue Spark ETL jobs or move your existing Glue Spark ETL jobs to Glue version 2.0 as shown below.

I created a simple Glue job to copy a .csv file across different Amazon S3 buckets.

Glue version 1.0

Glue version 2.0

You can see that the startup time for Glue version 2.0 is 10x faster.

Available Today

This feature is now available in US East (N. Virginia, Ohio, N.California, and Oregon), Europe (Frankfurt, Ireland, London, Paris, and Stockholm), Asia Pacific (Hong Kong, Mumbai, Seoul, Singapore, Sydney, and Tokyo), Canada (Central), Middle East (Bahrain) and South America (Sao Paulo). Please check out our latest documentation and pricing pages for more details.

– Kame;

ICYMI: Serverless Q2 2020

Post Syndicated from Moheeb Zara original https://aws.amazon.com/blogs/compute/icymi-serverless-q2-2020/

Welcome to the 10th edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all of the most recent product launches, feature enhancements, blog posts, webinars, Twitch live streams, and other interesting things that you might have missed!

In case you missed our last ICYMI, checkout what happened last quarter here.

AWS Lambda

AWS Lambda functions can now mount an Amazon Elastic File System (EFS). EFS is a scalable and elastic NFS file system storing data within and across multiple Availability Zones (AZ) for high availability and durability. In this way, you can use a familiar file system interface to store and share data across all concurrent execution environments of one, or more, Lambda functions. EFS supports full file system access semantics, such as strong consistency and file locking.

Using different EFS access points, each Lambda function can access different paths in a file system, or use different file system permissions. You can share the same EFS file system with Amazon EC2 instances, containerized applications using Amazon ECS and AWS Fargate, and on-premises servers.

Learn how to create an Amazon EFS-mounted Lambda function using the AWS Serverless Application Model in Sessions With SAM Episode 10.

With our recent launch of .NET Core 3.1 AWS Lambda runtime, we’ve also released version 2.0.0 of the PowerShell module AWSLambdaPSCore. The new version now supports PowerShell 7.

Amazon EventBridge

At AWS re:Invent 2019, we introduced a preview of Amazon EventBridge schema registry and discovery. This is a way to store the structure of the events (the schema) in a central location. It can simplify using events in your code by generating the code to process them for Java, Python, and TypeScript. In April, we announced general availability of EventBridge Schema Registry.

We also added support for resource policies. Resource policies allow sharing of schema repository across different AWS accounts and organizations. In this way, developers on different teams can search for and use any schema that another team has added to the shared registry.

Ben Smith, AWS Serverless Developer Advocate, published a guide on how to capture user events and monitor user behavior using the Amazon EventBridge partner integration with Auth0. This enables better insight into your application to help deliver a more customized experience for your users.

AWS Step Functions

In May, we launched a new AWS Step Functions service integration with AWS CodeBuild. CodeBuild is a fully managed continuous integration service that compiles source code, runs tests, and produces packages that are ready for deployment. Now, during the execution of a state machine, you can start or stop a build, get build report summaries, and delete past build executions records.

With the new AWS CodePipeline support to invoke Step Functions you can customize your delivery pipeline with choices, external validations, or parallel tasks. Each of those tasks can now call CodeBuild to create a custom build following specific requirements. Learn how to build a continuous integration workflow with Step Functions and AWS CodeBuild.

Rob Sutter, AWS Serverless Developer Advocate, has published a video series on Step Functions. We’ve compiled a playlist on YouTube to help you on your serverless journey.

AWS Amplify

The AWS Amplify Framework announced in April that they have rearchitected the Amplify UI component library to enable JavaScript developers to easily add authentication scenarios to their web apps. The authentication components include numerous improvements over previous versions. These include the ability to automatically sign in users after sign-up confirmation, better customization, and improved accessibility.

Amplify also announced the availability of Amplify Framework iOS and Amplify Framework Android libraries and tools. These help mobile application developers to easily build secure and scalable cloud-powered applications. Previously, mobile developers relied on a combination of tools and SDKS along with the Amplify CLI to create and manage a backend.

These new native libraries are oriented around use-cases, such as authentication, data storage and access, machine learning predictions etc. They provide a declarative interface that enables you to programmatically apply best practices with abstractions.

A mono-repository is a repository that contains more than one logical project, each in its own repository. Monorepo support is now available for the AWS Amplify Console, allowing developers to connect Amplify Console to a sub-folder in your mono-repository. Learn how to set up continuous deployment and hosting on a monorepo with the Amplify Console.

Amazon Keyspaces (for Apache Cassandra)

Amazon Managed Apache Cassandra Service (MCS) is now generally available under the new name: Amazon Keyspaces (for Apache Cassandra). Amazon Keyspaces is built on Apache Cassandra and can be used as a fully managed serverless database. Your applications can read and write data from Amazon Keyspaces using your existing Cassandra Query Language (CQL) code, with little or no changes. Danilo Poccia explains how to use Amazon Keyspace with API Gateway and Lambda in this launch post.

AWS Glue

In April we extended AWS Glue jobs, based on Apache Spark, to run continuously and consume data from streaming platforms such as Amazon Kinesis Data Streams and Apache Kafka (including the fully-managed Amazon MSK). Learn how to manage a serverless extract, transform, load (ETL) pipeline with Glue in this guide by Danilo Poccia.

Serverless posts

Our team is always working to build and write content to help our customers better understand all our serverless offerings. Here is a list of the latest published to the AWS Compute Blog this quarter.

Introducing the new serverless LAMP stack

Ben Smith, AWS Serverless Developer Advocate, introduces the Serverless LAMP stack. He explains how to use serverless technologies with PHP. Learn about the available tools, frameworks and strategies to build serverless applications, and why now is the right time to start.

 

Building a location-based, scalable, serverless web app

James Beswick, AWS Serverless Developer Advocate, walks through building a location-based, scalable, serverless web app. Ask Around Me is an example project that allows users to ask questions within a geofence to create an engaging community driven experience.

Building well-architected serverless applications

Julian Wood, AWS Serverless Developer Advocate, published two blog series on building well-architected serverless applications. Learn how to better understand application health and lifecycle management.

Device hacking with serverless

Go beyond the browser with these creative and physical projects. Moheeb Zara, AWS Serverless Developer Advocate, published several serverless powered device hacks, all using off the shelf parts.

April

May

June

Tech Talks and events

We hold AWS Online Tech Talks covering serverless topics throughout the year. You can find these in the serverless section of the AWS Online Tech Talks page. We also regularly join in on podcasts, and record short videos you can find to learn in quick bite-sized chunks.

Here are the highlights from Q2.

Innovator Island Workshop

Learn how to build a complete serverless web application for a popular theme park called Innovator Island. James Beswick created a video series to walk you through this popular workshop at your own pace.

Serverless First Function

In May, we held a new virtual event series, the Serverless-First Function, to help you and your organization get the most out of the cloud. The first event, on May 21, included sessions from Amazon CTO, Dr. Werner Vogels, and VP of Serverless at AWS, David Richardson. The second event, May 28, was packed with sessions with our AWS Serverless Developer Advocate team. Catch up on the AWS Twitch channel.

Live streams

The AWS Serverless Developer Advocate team hosts several weekly livestreams on the AWS Twitch channel covering a wide range of topics. You can catch up on all our past content, including workshops, on the AWS Serverless YouTube channel.

Eric Johnson hosts “Sessions with SAM” every Thursday at 10AM PST. Each week, Eric shows how to use SAM to solve different serverless challenges. He explains how to use SAM templates to build powerful serverless applications. Catch up on the last few episodes.

James Beswick, AWS Serverless Developer Advocate, has compiled a round-up of all his content from Q2. He has plenty of videos ranging from beginner to advanced topics.

AWS Serverless Heroes

We’re pleased to welcome Kyuhyun Byun and Serkan Özal to the growing list of AWS Serverless Heroes. The AWS Hero program is a selection of worldwide experts that have been recognized for their positive impact within the community. They share helpful knowledge and organize events and user groups. They’re also contributors to numerous open-source projects in and around serverless technologies.

Still looking for more?

The Serverless landing page has much more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more getting started tutorials.

Follow the AWS Serverless team on our new LinkedIn page we share all the latest news and events. You can also follow all of us on Twitter to see latest news, follow conversations, and interact with the team.

Chris Munns: @chrismunns
Eric Johnson: @edjgeek
James Beswick: @jbesw
Moheeb Zara: @virgilvox
Ben Smith: @benjamin_l_s
Rob Sutter: @rts_rob
Julian Wood: @julian_wood

How Wind Mobility built a serverless data architecture

Post Syndicated from Pablo Giner original https://aws.amazon.com/blogs/big-data/how-wind-mobility-built-a-serverless-data-architecture/

Guest post by Pablo Giner, Head of BI, Wind Mobility.

Over the past few years, urban micro-mobility has become a trending topic. With the contamination indexes hitting historic highs, cities and companies worldwide have been introducing regulations and working on a wide spectrum of solutions to alleviate the situation.

We at Wind Mobility strive to make commuters’ life more sustainable and convenient by bringing short distance urban transportation to cities worldwide.

At Wind Mobility, we scale our services at the same pace as our users demand them, and we do it in an economically and environmentally viable way. We optimize our fleet distribution to avoid overcrowding cities with more scooters than those that are actually going to be used, and we position them just meters away from where our users need them and at the time of the day when they want them.

How do we do that? By optimizing our operations to their fullest. To do so, we need to be very well informed about our users’ behavior under varying conditions and understand our fleet’s potential.

Scalability and flexibility for rapid growth

We knew that before we could solve this challenge, we needed to collect data from many different sources, such as user interactions with our application, user demand, IoT signals from our scooters, and operational metrics. To analyze the numerous datasets collected and extract actionable insights, we needed to build a data lake. While the high-level goal was clear, the scope was less so. We were working hard to scale our operation as we continued to launch new markets. The rapid growth and expansion made it very difficult to predict the volume of data we would need to consume. We were also launching new microservices to support our growth, which resulted in more data sources to ingest. We needed an architecture that allowed us to be agile and quickly adopt to meet our growth. It became clear that a serverless architecture was best positioned to meet those needs, so we started to design our 100% serverless infrastructure.

The first challenge was ingesting and storing data from our scooters in the field, events from our mobile app, operational metrics, and partner APIs. We use AWS Lambda to capture changes in our operational databases and mobile app and push the events to Amazon Kinesis Data Streams, which allows us to take action in real time. We also use Amazon Kinesis Data Firehose to write the data to Amazon Simple Storage Service (Amazon S3), which we use for analytics.

After we were in Amazon S3 and adequately partitioned as per its most common use cases (we partition by date, region, and business line, depending on the data source), we had to find a way to query this data for both data profiling (understanding structure, content, and interrelationships) and ad hoc analysis. For that we chose AWS Glue crawlers to catalog our data and Amazon Athena to read from the AWS Glue Data Catalog and run queries. However, ad hoc analysis and data profiling are relatively sporadic tasks in our team, because most of the data processing computing hours are actually dedicated to transforming the multiple data sources into our data warehouse, consolidating the raw data, modeling it, adding new attributes, and picking the data elements, which constitute 95% of our analytics and predictive needs.

This is where all the heavy lifting takes place. We parse through millions of scooter and user events generated daily (over 300 events per second) to extract actionable insight. We selected AWS Glue to perform this task. Our primary ETL job reads the newly added raw event data from Amazon S3, processes it using Apache Spark, and writes the results to our Amazon Redshift data warehouse. AWS Glue plays a critical role in our ability to scale on demand. After careful evaluation and testing, we concluded that AWS Glue ETL jobs meet all our needs and free us from procuring and managing infrastructure.

Architecture overview

The following diagram represents our current data architecture, showing two serverless data collection, processing, and reporting pipelines:

  • Operational databases from Amazon Relational Database Service (Amazon RDS) and MongoDB
  • IoT and application events, followed by Athena for data profiling and Amazon Redshift for reporting

Our data is curated and transformed multiple times a day using an automated pipeline running on AWS Glue. The team can now focus on analyzing the data and building machine learning (ML) applications.

We chose Amazon QuickSight as our business intelligence tool to help us visualize and better understand our operational KPIs. Additionally, we use Amazon Elastic Container Registry (Amazon ECR) to store our Docker images containing our custom ML algorithms and Amazon Elastic Container Service (Amazon ECS) where we train, evaluate, and host our ML models. We schedule our models to be trained and evaluated multiple times a day. Taking as input curated data about demand, conversion, and flow of scooters, we run the models to help us optimize fleet utilization for a particular city at any given time.

The following diagram represents how data from the data lake is incorporated into our ML training, testing, and serving system. First, our developers work in the application code and commit their changes, which are built into new Docker images by our CI/CD pipeline and stored in the Amazon ECR registry. These images are pushed into Amazon ECS and tested in DEV and UAT environments before moving to PROD (where they are triggered by the Amazon ECS task scheduler). During their execution, the Amazon ECS tasks (some train the demand and usage forecasting models, some produce the daily and hourly predictions, and others optimize the fleet distribution to satisfy the forecast) read their configuration and pull data from Amazon S3 (which has been previously produced by scheduled AWS Glue jobs), finally storing their results back into Amazon S3. Executions of these pipelines are tracked via MLFlow (in a dedicated Amazon Elastic Compute Cloud (Amazon EC2) server) and the final result indicating the fleet operations required is fit into a Kepler map, which is then consumed by the operators on the field.

Conclusion

We at Wind Mobility place data at the forefront of our operations. For that, we need our data infrastructure to be as flexible as the industry and the context we operate in, which is why we chose serverless. Over the course of a year, we have built a data lake, a data warehouse, a BI suite, and a variety of (production) data science applications. All of that with a very small team.

Also, within the last 12 months, we have scaled up several of our data pipelines by a factor of 10, without slowing our momentum or redesigning any part of our architecture. When it came to double our fleet in 1 week and increase the frequency at which we capture data from scooters by a factor of 10, our serverless data architecture scaled with no issues. This allowed us to focus on adding value by simplifying our operation, reacting to changes quickly, and delighting our users.

We have measured our success in multiple dimensions:

  • Speed – Serverless is faster to deploy and expand; we believe we have reduced our time to market for the entire infrastructure by a factor of 2
  • Visibility – We have 360 degree visibility of our operations worldwide, accessible by our city managers, finance team, and management board
  • Optimized fleet deployment – We know, at any minute of the day, the number of scooters that our customers need over the next few hours, which reduces unsatisfied demand by more than 50%

If you face a similar challenge, our advice is clear: go fully serverless and use the spectrum of solutions available from AWS.

Follow us and discover more about Wind Mobility on Facebook, Instagram and LinkedIn.

 


About the Author

Pablo Giner is Head of BI at Wind Mobility. Pablo’s background is in wheels (motorcycle racing > vehicle engineering > collision insurance > eScooters sharing…) and for the last few years he has specialized in forming and developing data teams. At Wind Mobility, he leads the data function (data engineering + analytics + data science), and the project he is most proud of is what they call smart fleet rebalancing, an AI backed solution to reposition their fleet in real-time. “In God we trust. All others must bring data.” – W. Edward Deming

 

 

 

Process data with varying data ingestion frequencies using AWS Glue job bookmarks

Post Syndicated from Dipankar Kushari original https://aws.amazon.com/blogs/big-data/process-data-with-varying-data-ingestion-frequencies-using-aws-glue-job-bookmarks/

We often have data processing requirements in which we need to merge multiple datasets with varying data ingestion frequencies. Some of these datasets are ingested one time in full, received infrequently, and always used in their entirety, whereas other datasets are incremental, received at certain intervals, and joined with the full datasets to generate output. To address this requirement, this post demonstrates how to build an extract, transform, and load (ETL) pipeline using AWS Glue.

Using AWS Glue

AWS Glue provides a serverless environment to extract, transform, and load a large number of datasets from several sources for analytics purposes. It has a feature called job bookmarks to process incremental data when rerunning a job on a scheduled interval. A job bookmark is composed of the states for various job elements, such as sources, transformations, and targets. This is done by persisting state information from a job run that helps AWS Glue prevent reprocessing old data.

For this use case, we use an AWS Glue job with job bookmarks enabled to process files received in varying frequencies (a full dataset signifying files that are received one time, and incremental datasets signifying files that are received in certain regular intervals). These files are merged together. In addition to enabling job bookmarks, we also use an optional parameter transformation_ctx (transformation context) in an AWS Glue PySpark dynamic frame. This acts as a unique identifier for the ETL operator instance to identify state information within a job bookmark for a given operator. AWS Glue uses transformation_ctx to index the key to the bookmark state.

You can capture and maintain state information for incremental datasets and avoid reprocessing by using transformation context. Transformation context is omitted for the full dataset file, which results in the job run state information not getting captured for the full dataset and allowing it to participate in the next processing event in its entirety. Even though the job bookmark flag is enabled at the AWS Glue job level, because transformation context is omitted for the full dataset, every time the job runs, the entire data from the full dataset is used as part of the job. In contrast, only the newly arrived datasets are processed for the incremental datasets.

Solution overview

To demonstrate the job bookmark utility of AWS Glue, we use TLC Trip Record Data datasets. We use NYC yellow taxi trip data monthly files as the incremental dataset, and NYC taxi zone lookup as the full dataset. The monthly yellow taxi trip data has a field named PULocationID (where a customer was picked up), which is joined with the LocationID field from the NYC taxi zone lookup file to create an output dataset that contains Borough, Zone, and service_zone from the NYC taxi zone lookup dataset and all the fields (except the PULocationID field) from the monthly NYC taxi trip data file.

The following diagram depicts a high-level architecture of the process.

Descriptions of Diagram

  • Two Amazon S3 Raw bucket locations are used for storing incoming CSV source data (NYC taxi monthly files (Incremental Dataset) and NYC Taxi lookup file (Full Dataset)).
  • A Bookmark enabled glue Job joins data between monthly trip data file and the taxi zone lookup file to generate output parquet files and creates NYC taxi trip table in Glue Data Catalog and Redshift database.
  • S3 Curated Bucket is used to store NYC Taxi monthly processed parquet files.

Creating the AWS CloudFormation stack

You use the following AWS CloudFormation template to create the below mentioned resources in your preferred AWS account and Region:

Additionally, make sure you have an Amazon EC2 key pair created in the account and Region you’re launching the stack from.

To provide the stack parameters, complete the following steps:

  1. For Stack name, enter BigDataBlog-GlueBookmark-Stack.

  1. For RedshiftClusterIdentifier, enter bigdatablogrscluster.
  2. For NodeType, choose large.
  3. For NumberOfNodes, choose 2.
  4. For DatabaseName, enter bigdatablogdev.

  1. For MasterUserName, enter bigdatabloguser.
  2. For MasterUserPassword, enter a password for the master user account.
  3. For Maintenancewindow, enter sun:05:00-sun:05:30.
  4. For EC2InstanceType, choose micro.
  5. For SubscriptionEmail, enter your preferred email.
  6. For MyIPAddressCidr, enter your IP address.

You can find your IP Address by browsing https://www.whatismyip.com/ and looking up the value for My Public IPv4 is:. Add /32 at the end to make it CIDR-compatible and most restrictive.

  1. For DestinationPrefixListId, enter your prefix list ID.

To find your ID, set AWS credentials by entering aws configure in the command prompt. Run aws ec2 describe-prefix-lists to get the PrefixListId where PrefixListName is com.amazonaws.<<AWS region>>.s3 from the output.

  1. For NewS3BucketName, enter the name of your S3 bucket.

  1. For gluedatabase, enter bigdatabloggluedb.
  2. For EC2KeyName, enter the name of your key pair.

For instructions on creating a stack, see Creating a Stack on the AWS CloudFormation Console.

Make sure the stack is complete before moving to the next steps.

Creating the AWS Glue job

To create your AWS Glue job, complete the following steps:

  1. Download NYC yellow monthly trip data for October 2019 and November 2019 and save them under the s3://<<Your S3 Bucket>>/tripdata/ prefix.
  2. Download the NYC Taxi Zone lookup table and save it under the s3://<<Your S3 Bucket>>/tripdata-lookup/ prefix.
  3. Use the following PySpark script and change the piece of the code enclosed inside <<…>>.

You can find the values for the following keys on the Outputs tab for the CloudFormation stack:

    • S3Bucket
    • Snstopic

You can find the values for the following keys on the Parameters tab for the CloudFormation stack:

    • EC2KeyName
    • MyIPAddressCidr
    • NewS3BucketName
    • SubscriptionEmail

  1. When the AWS Glue script is ready, upload it to the S3 bucket under the s3://<<Your S3 Bucket>>/glue-script/ prefix.

You refer to this when you create the AWS Glue job.

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Create job.
  3. For Name, enter a name for the job. For more information about AWS Glue job naming, see Jobs.
  4. For IAM role, choose the role the CloudFormation template created. Use the value for the key Glueaccessrole from the stack outputs.
  5. For Type, choose Spark.
  6. For Glue version, choose Spark 2.4, Python 3 (Glue Version 1.0).
  7. For This job runs, choose An existing script that you provide.
  8. For S3 path where the script is stored, choose the script file that you saved earlier under the s3://<<Your S3 Bucket>>/Glue-script/ prefix.
  9. In the Advanced properties section, for Job bookmark, choose Enable.
  10. For Catalog options, select Use Glue Data Catalog as the Hive metastore.
  11. For Connections, enter the value of the key GlueConnection from the stack outputs.
  12. Choose Save job and edit script.

Creating an Amazon Redshift database schema

Before you run the AWS Glue job, you need to connect to the Amazon Redshift cluster and create an Amazon Redshift database schema named Glue_bookmark_redshift_schema. To connect to the cluster, use one of the JDBC client-based SQL tools, such as SQL Workbench/J. For instructions, see How can I access a private Amazon Redshift cluster from my local machine?

To access the cluster, you use the Amazon Redshift master user bigdatabloguser (the value for MasterUserName on the Parameters tab of the CloudFormation stack) and the password you provided when creating the stack.

Running AWS Glue job

The Glue Job takes only one argument; name of the file being processed. Pass the file name, such as yellow_tripdata_2019-10.csv, while processing that file. This enables you to track the records that belong to a specific file so that it’s easier to evaluate the result of multiple job runs using different files.

When the Glue job run is successful, you can see the output Parquet files under the /tripdata-joined-output/ prefix inside the S3 bucket you created by running the CloudFormation template. You can also use Amazon Athena to query the data from the table created in the Data Catalog. For more information, see Running SQL Queries Using Amazon Athena.

Query the Amazon Redshift database table named redshift_bookmark_table and review the output.

Explaining the solution

A bookmark-enabled AWS Glue job (in PySpark) is created that reads the NYC yellow taxi trip’s monthly file, joins it with NYC taxi zone lookup file, produces files in Parquet format, and saves them in an Amazon s3 location.

A Data Catalog table is created that refers to the Parquet files’ location in Amazon S3. The resulting dataset is also loaded into an Amazon Redshift table using the AWS Glue PySpark job.

The AWS Glue job bookmark transformation context is used while the AWS Glue dynamic frame is created by reading a monthly NYC taxi file, whereas the transformation context is disabled while reading and creating the dynamic frame for the taxi zone lookup file (because the entire file is required for processing each monthly trip file). This allows you to process each monthly trip file exactly one time and reuse the entire taxi zone lookup file as many times as required because the missing transformation context for the lookup file doesn’t allow the bookmark context to be set for that file.

When a new NYC trip data monthly file arrives and the AWS Glue job runs, it only processes the newly arrived monthly file and ignores any previously processed monthly files. Similarly, when the Data Catalog table data is copied into Amazon Redshift, it only copies the newly processed underlying Parquet files’ data and appends it to the Amazon Redshift table. At this time the transformation context is enabled to utilize the job bookmark, and the AWS Glue dynamic frame is created by reading the Data Catalog table.

The following PySpark code uses the transformation context to create an AWS Glue dynamic frame while reading the monthly incremental file:

taxidata = GlueContext.create_dynamic_frame_from_options(connection_type="s3",connection_options = {"paths": [InputDir]},format="csv",format_options={"withHeader": True,"separator": ",","quoteChar": '"',"escaper": '"'},transformation_ctx = "taxidata")

However, the following code omits transformation context when creating the AWS Glue dynamic frame for the lookup file:

Lookupdata  = GlueContext.create_dynamic_frame_from_options(connection_type="s3",connection_options = {"paths":[InputLookupDir]},format="csv",format_options={"withHeader": True,"separator": ",","quoteChar": '"',"escaper": '"'})

Additionally, the following code uses the transformation context while reading the Data Catalog table, which is loaded into an Amazon Redshift table:

datasource0 = GlueContext.create_dynamic_frame.from_catalog(database = Glue_catalog_database, table_name = Glue_table_name, transformation_ctx = "datasource0")

You can see in the screenshot below that the 2019 October yellow taxi trip data file has arrived for processing (the incremental dataset).

To process each month’s data, you need the taxi zone lookup (full dataset).

The following screenshot shows the output of the AWS Glue job after processing the 2019 October trip data, saved in Parquet format.

The following two screenshots show the Amazon Redshift table, displaying the count of records for the October 2019 taxi data and only October 2019 taxi data file has been processed so far, respectively

The following screenshot shows that the November 2019 NYC taxi data file has arrived for processing.

The following screenshot shows the output of the AWS Glue job after processing the 2019 November trip data, saved in Parquet format. The job only processed the November data and ignored the October data (to be reprocessed) because the job bookmark and transformation context was enabled.

The following screenshot shows that the Amazon Redshift table now has both October and November data and shows the total record count.

The following screenshot shows individual record count for each month.

Querying with Athena

You can also review the dataset in Athena, which uses the same Glue Data Catalog. The following screenshot of an Athena query shows the Data Catalog table has both October and November data, with the total record count.

The following screenshot of an Athena query shows the individual record count for each month.

The following screenshot shows the location information, including borough, zone, and service zone, which is available in the taxi zone lookup and is joined with the October taxi trip data.

The following screenshot shows the output for the same query on the November data.

Cleaning up

When you’re done using this solution, you should delete the CloudFormation stack to avoid incurring any further charges.

Conclusion

This post describes how you can merge datasets received in different frequencies as part of your ETL pipeline processing using AWS Glue job bookmarks. The use case demonstrated how to use job bookmarks and transformation context to build an ETL pipeline for processing several incremental datasets.

 


About the Authors

Dipankar is a Senior Data Architect with AWS Professional Services, helping customers build analytics platform and solutions. He has a keen interest in distributed computing. Dipankar enjoys spending time playing chess and watching old Hollywood movies.

 

 

 

Ashok Padmanabhan is a big data consultant with AWS Professional Services, helping customers build big data and analytics platform and solutions. When not building and designing data lakes, Ashok enjoys spending time at beaches near his home in Florida.

Moovit embraces data lake architecture by extending their Amazon Redshift cluster to analyze billions of data points every day

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/moovit-embraces-data-lake-architecture-by-extending-their-amazon-redshift-cluster-to-analyze-billions-of-data-points-every-day/

Amazon Redshift is a fast, fully managed, cloud-native data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing business intelligence tools.

Moovit is a leading Mobility as a Service (MaaS) solutions provider and maker of the top urban mobility app. Guiding over 800 million users in more than 3,200 cities across 103 countries to get around town effectively and conveniently, Moovit has experienced exponential growth of their service in the last few years. The company amasses up to 6 billion anonymous data points a day to add to the world’s largest repository of transit and urban mobility data, aided by Moovit’s network of more than 685,000 local editors that help map and maintain local transit information in cities that would otherwise be unserved.

Like Moovit, many companies today are using Amazon Redshift to analyze data and perform various transformations on the data. However, as data continues to grow and become even more important, companies are looking for more ways to extract valuable insights from the data, such as big data analytics, numerous machine learning (ML) applications, and a range of tools to drive new use cases and business processes. Companies are looking to access all their data, all the time, by all users and get fast answers. The best solution for all those requirements is for companies to build a data lake, which is a centralized repository that allows you to store all your structured, semi-structured, and unstructured data at any scale.

With a data lake built on Amazon Simple Storage Service (Amazon S3), you can easily run big data analytics using services such as Amazon EMR and AWS Glue. You can also query structured data (such as CSV, Avro, and Parquet) and semi-structured data (such as JSON and XML) by using Amazon Athena and Amazon Redshift Spectrum. You can also use a data lake with ML services such as Amazon SageMaker to gain insights.

Moovit uses an Amazon Redshift cluster to allow different company teams to analyze vast amounts of data. They wanted a way to extend the collected data into the data lake and allow additional analytical teams to access more data to explore new ideas and business cases.

Additionally, Moovit was looking to manage their storage costs and evolve to a model that allowed cooler data to be maintained at the lowest cost in S3, and maintain the hottest data in Redshift for the most efficient query performance. The proposed solution implemented a hot/cold storage pattern using Amazon Redshift Spectrum and reduced the local disk utilization on the Amazon Redshift cluster to make sure costs are maintained. Moovit is currently evaluating the new RA3 node with managed storage as an additional level of flexibility that will allow them to easily scale the amount of hot/cold storage without limit.

In this post we demonstrate how Moovit, with the support of AWS, implemented a lake house architecture by employing the following best practices:

  • Unloading data into Amazon Simple Storage Service (Amazon S3)
  • Instituting a hot/cold pattern using Amazon Redshift Spectrum
  • Using AWS Glue to crawl and catalog the data
  • Querying data using Athena

Solution overview

The following diagram illustrates the solution architecture.

The solution includes the following steps:

  1. Unload data from Amazon Redshift to Amazon S3
  2. Create an AWS Glue Data Catalog using an AWS Glue crawler
  3. Query the data lake in Amazon Athena
  4. Query Amazon Redshift and the data lake with Amazon Redshift Spectrum

Prerequisites

To complete this walkthrough, you must have the following prerequisites:

  1. An AWS account.
  2. An Amazon Redshift cluster.
  3. The following AWS services and access: Amazon Redshift, Amazon S3, AWS Glue, and Athena.
  4. The appropriate AWS Identity and Access Management (IAM) permissions for Amazon Redshift Spectrum and AWS Glue to access Amazon S3 buckets. For more information, see IAM policies for Amazon Redshift Spectrum and Setting up IAM Permissions for AWS Glue.

Walkthrough

To demonstrate the process Moovit used during their data architecture, we use the industry-standard TPC-H dataset provided publicly by the TPC organization.

The Orders table has the following columns:

ColumnType
O_ORDERKEYint4
O_CUSTKEYint4
O_ORDERSTATUSvarchar
O_TOTALPRICEnumeric
O_ORDERDATEdate
O_ORDERPRIORITYvarchar
O_CLERKvarchar
O_SHIPPRIORITYint4
O_COMMENTvarchar
SKIPvarchar

Unloading data from Amazon Redshift to Amazon S3

Amazon Redshift allows you to unload your data using a data lake export to an Apache Parquet file format. Parquet is an efficient open columnar storage format for analytics. Parquet format is up to twice as fast to unload and consumes up to six times less storage in Amazon S3, compared with text formats.

To unload cold or historical data from Amazon Redshift to Amazon S3, you need to run an UNLOAD statement similar to the following code (substitute your IAM role ARN):

UNLOAD ('select o_orderkey, o_custkey, o_orderstatus, o_totalprice, o_orderdate, o_orderpriority, o_clerk, o_shippriority, o_comment, skip
FROM tpc.orders
ORDER BY o_orderkey, o_orderdate') 
TO 's3://tpc-bucket/orders/' 
CREDENTIALS 'aws_iam_role=arn:aws:iam::<account_number>:role/>Role<'
FORMAT AS parquet allowoverwrite PARTITION BY (o_orderdate);

It is important to define a partition key or column that minimizes Amazon S3 scans as much as possible based on the query patterns intended. The query pattern is often by date ranges; for this use case, use the o_orderdate field as the partition key.

Another important recommendation when unloading is to have file sizes between 128 MB and 512 MB. By default, the UNLOAD command splits the results to one or more files per node slice (virtual worker in the Amazon Redshift cluster) which allows you to use the Amazon Redshift MPP architecture. However, this can potentially cause files created by every slice to be small. In Moovit’s use case, the default UNLOAD using PARALLEL ON yielded dozens of small (MBs) files. For Moovit, PARALLEL OFF yielded the best results because it aggregated all the slices’ work into the LEADER node and wrote it out as a single stream controlling the file size using the MAXFILESIZE option.

Another performance enhancement applied in this use case was the use of Parquet’s min and max statistics. Parquet files have min_value and max_value column statistics for each row group that allow Amazon Redshift Spectrum to prune (skip) row groups that are out of scope for a query (range-restricted scan). To use row group pruning, you should sort the data by frequently-used columns. Min/max pruning helps scan less data from Amazon S3, which results in improved performance and reduced cost.

After unloading the data to your data lake, you can view your Parquet file’s content in Amazon S3 (assuming it’s under 128 MB). From the Actions drop-down menu, choose Select from.

You’re now ready to populate your Data Catalog using an AWS Glue crawler.

Creating a Data Catalog with an AWS Glue crawler

To query your data lake using Athena, you must catalog the data. The Data Catalog is an index of the location, schema, and runtime metrics of the data.

An AWS Glue crawler accesses your data store, extracts metadata (such as field types), and creates a table schema in the Data Catalog. For instructions, see Working with Crawlers on the AWS Glue Console.

Querying the data lake in Athena

After you create the crawler, you can view the schema and tables in AWS Glue and Athena, and can immediately start querying the data in Athena. The following screenshot shows the table in the Athena Query Editor.

Querying Amazon Redshift and the data lake using a unified view with Amazon Redshift Spectrum

Amazon Redshift Spectrum is a feature of Amazon Redshift that allows multiple Redshift clusters to query from same data in the lake. It enables the lake house architecture and allows data warehouse queries to reference data in the data lake as they would any other table. Amazon Redshift clusters transparently use the Amazon Redshift Spectrum feature when the SQL query references an external table stored in Amazon S3. Large multiple queries in parallel are possible by using Amazon Redshift Spectrum on external tables to scan, filter, aggregate, and return rows from Amazon S3 back to the Amazon Redshift cluster.

Following best practices, Moovit decided to persist all their data in their Amazon S3 data lake and only store hot data in Amazon Redshift. They could query both hot and cold datasets in a single query with Amazon Redshift Spectrum.

The first step is creating an external schema in Amazon Redshift that maps a database in the Data Catalog. See the following code:

CREATE EXTERNAL SCHEMA spectrum 
FROM data catalog 
DATABASE 'datalake' 
iam_role 'arn:aws:iam::<account_number>:role/mySpectrumRole'
CREATE EXTERNAL DATABASE IF NOT EXISTS;

After the crawler creates the external table, you can start querying in Amazon Redshift using the mapped schema that you created earlier. See the following code:

SELECT * FROM spectrum.orders;

Lastly, create a late binding view that unions the hot and cold data:

CREATE OR REPLACE VIEW lake_house_joint_view AS (SELECT * FROM public.orders WHERE o_orderdate >= dateadd(‘day’,-90,date_trunc(‘day’,getdate())) 
UNION ALL SELECT * FROM spectrum.orders WHERE o_orderdate < dateadd(‘day’,-90,date_trunc(‘day’,getdate())) WITH NO SCHEMA BINDING;

Summary

In this post, we showed how Moovit unloaded data from Amazon Redshift to a data lake. By doing that, they exposed the data to many additional groups within the organization and democratized the data. These benefits of data democratization are substantial because various teams within Moovit can access the data, analyze it with various tools, and come up with new insights.

As an additional benefit, Moovit reduced their Amazon Redshift utilized storage, which allowed them to maintain cluster size and avoid additional spending by keeping all historical data within the data lake and only hot data in the Amazon Redshift cluster. Keeping only hot data on the Amazon Redshift cluster prevents Moovit from deleting data frequently, which saves IT resources, time, and effort.

If you are looking to extend your data warehouse to a data lake and leverage various tools for big data analytics and machine learning (ML) applications, we invite you to try out this walkthrough.

 


About the Authors

Yonatan Dolan is a Business Development Manager at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value.

 

 

 

 

Alon Gendler is a Startup Solutions Architect at Amazon Web Services. He works with AWS customers to help them architect secure, resilient, scalable and high performance applications in the cloud.

 

 

 

 

Vincent Gromakowski is a Specialist Solutions Architect for Amazon Web Services.

 

 

Build an end to end, automated inventory forecasting capability with AWS Lake Formation and Amazon Forecast

Post Syndicated from Syed Jaffry original https://aws.amazon.com/blogs/big-data/build-an-end-to-end-automated-inventory-forecasting-capability-with-aws-lake-formation-and-amazon-forecast/

Amazon Forecast is a fully managed service that uses machine learning (ML) to generate highly accurate forecasts without requiring any prior ML experience. Forecast is applicable in a wide variety of use cases, including estimating product demand, inventory planning, and workforce planning.

With Forecast, there are no servers to provision or ML models to build manually. Additionally, you only pay for what you use, and there is no minimum fee or upfront commitment. To use Forecast, you only need to provide historical data for what you want to forecast, and any additional related data that may influence your forecasts. The latter may include both time-varying data, such as price, events, and weather, and categorical data, such as color, genre, or region. The service automatically trains and deploys ML models based on your data and provides you with a custom API to retrieve forecasts.

This post demonstrates how you can automate the data extraction, transformation, and use of Forecast for the use case of a retailer that requires recurring replenishment of inventory. You achieve this by using AWS Lake Formation to build a secure data lake and ingest data into it, orchestrate the data transformation using an AWS Glue workflow, and visualize the forecast results in Amazon QuickSight.

Use case background

Retailers have a recurring need to replenish inventory. For example, consider a clothing retailer that typically sells through its e-commerce and physical store channels. You need to maintain optimum levels of inventory on hand to meet demand while minimizing warehouse costs. Some of the common questions you need to answer for effective inventory management are:

  • What is the optimal quantity of inventory to reorder from my supplier for my next sales cycle?
  • What should the composition of product SKUs be in the purchase order to the supplier?
  • How do I most effectively determine the right mix and quantity of products to stock at individual retail store locations?

You can use Forecast to answer these questions. You extract data from the source systems, apply transformations to make the data ready for use in Forecast, and use Forecast to load, train, and forecast.

The following diagram shows the end-to-end system architecture of the proposed solution using Lake Formation, AWS Glue, and Amazon QuickSight.

You use Lake Formation to manage governance and access control on the data lake. Additionally, you use the following resources:

  1. Lake Formation blueprint to ingest sales data into a data lake
  2. AWS Lambda and Amazon S3 event notification to trigger an AWS Glue workflow
  3. AWS Glue workflow to trigger the execution of the data transform AWS Glue job
  4. AWS Glue workflow to orchestrate the three steps within Forecast (load, train, forecast)
  5. Forecast to export the forecast results into the data lake
  6. AWS Glue to trigger a crawler on the exported forecast results
  7. Amazon Athena and Amazon QuickSight to visualize the exported forecast results

Setting up the required IAM policies

Before you get started, you need to set up the required IAM policies. Complete the following steps:

  1. Sign in to the IAM console as a user with the AdministratorAccess AWS managed policy.
  2. Create an IAM user named report_builder to use when building your Amazon QuickSight analysis report and dashboard for visualization.
  3. Grant the AmazonAthenaFullAccess policy to the user.In the following steps, you create an IAM role for the AWS Glue jobs, crawler, and workflow to assume during their execution.
  4. On the IAM console, choose Roles.
  5. Choose Create role.
  6. On the Create role page, choose AWS service, and then choose Glue.
  7. Choose Next: Permissions.
  8. From the list of available policies, search for the AWSGlueServiceRole policy and select it.
  9. Name the role GLUE_WORKFLOW_ROLE.
  10. Choose Create role.
  11. On the Roles page, search for and choose GLUE_WORKFLOW_ROLE.
  12. On the Trust relationships tab, choose Edit trust relationship.
  13. Add the following assume role for Forecast :
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "forecast.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }

  14. Choose Update Trust Policy.
  15. On the role Summary page, on the Permissions tab, choose Add inline policy.
  16. Add the following inline policy:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "lakeformation:GetDataAccess",
                    "lakeformation:GrantPermissions"
                ],
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:DeleteObject",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::forecast-blog-processed/*",
                    "arn:aws:s3:::forecast-blog-published/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
      "forecast:CreateDataset",
      "forecast:CreateDatasetGroup",
      "forecast:CreateDatasetImportJob",
      "forecast:CreateForecast",
      "forecast:CreateForecastExportJob",
      "forecast:CreatePredictor",
      "forecast:DescribeDataset",
      "forecast:DescribeDatasetGroup",
      "forecast:DescribeDatasetImportJob",
      "forecast:DescribeForecast",
      "forecast:DescribeForecastExportJob",
      "forecast:DescribePredictor",
      "forecast:ListDatasetGroups",
      "forecast:ListDatasetImportJobs",
      "forecast:ListDatasets",
      "forecast:ListForecastExportJobs",
      "forecast:ListForecasts",
      "forecast:ListPredictors",
      "forecast:UpdateDatasetGroup"
                ],
                "Resource": "*"
            },
            {
                "Effect": "Allow",
                "Action": [
                    "iam:PassRole",
                    "iam:GetRole"
                ],
                "Resource": [
                    "arn:aws:iam::*:role/AWSGlueServiceRole*",
                    "arn:aws:iam::[your aws account id]:role/GLUE_WORKFLOW_ROLE"
                ],
                "Condition": {
                    "StringLike": {
                        "iam:PassedToService": [
                            "glue.amazonaws.com"
                        ]
                    }
                }
            },
            {
                "Effect": "Allow",
                "Action": [
                    "iam:PassRole",
                    "iam:GetRole"
                ],
                "Resource": "arn:aws:iam::[your aws account id]:role/GLUE_WORKFLOW_ROLE",
                "Condition": {
                    "StringEquals": {
                        "iam:PassedToService": "forecast.amazonaws.com"
                    }
                }
            }
        ]
    }

Setting up your data lake storage

Next, you need a data lake to use in this automation. You create S3 buckets for data storage and apply appropriate security and governance. If you already have a data lake on Amazon S3, you can continue to use those S3 buckets with Lake Formation.

On the Amazon S3 console, create the following buckets:

  • forecast-blog-landing (for raw data ingest)
  • forecast-blog-processed (for the transformed data)
  • forecast-blog-published (for end consumers to access results)

This post includes walkthrough instructions for the landing folder, which you can repeat for the other two folders.

Enabling centralized access control on your data lake

You use Lake Formation’s centralized access control to enable access to the underlying S3 buckets for users and roles. With this model, you don’t need to create any additional IAM access policies or S3 bucket policies for your users and roles.

  1. Sign in to the AWS Lake Formation console as a data lake administrator IAM user.For instructions on setting up a data lake administrator user, see Create a Data Lake Administrator.To manage access control from Lake Formation, you register the three S3 buckets you created earlier as data lake locations with Lake Formation.
  1. On the Lake Formation dashboard, choose Register Location.
  2. For Amazon S3 path, enter your S3 bucket location (s3://forecast-blog-landing).
  3. Choose Register location.
  4. Repeat these steps for the processed and published.

Setting up the Lake Formation data catalog for your data lake

You create three databases in the Lake Formation data catalog, one for each S3 bucket you created earlier. All transformations done via AWS Glue operate on the databases in this catalog.

  1. On the Lake Formation console, under Data catalog, choose Databases.
  2. Choose Create database.
  3. In the Database details section, for Name, enter the name for the database (forecast-blog-landing-db).
  4. For Location, enter the location to the corresponding S3 bucket.
  5. For Description, add an optional description.
  6. Deselect Use only IAM access control for new tables in this database.
  7. Choose Create database.
  8. Repeat the above steps for the processed and published.
  9. On the Databases page, select the database you just created.
  10. From the Actions drop-down menu, choose Grant.
  11. For IAM users and roles, choose the GLUE_WORKFLOW_ROLE you created earlier.
  12. For Database permissions, select Create table.
  13. Choose Grant.At this stage, you have set up your data lake with Lake Formation. The following diagram illustrates your resources so far.

You’re now ready to ingest sales data into your data lake.

Ingesting data

This post uses an example MySQL table called sales, which contains 2 years of sales history of a single product. The schema of the table is as follows:

Column NameColumn Type
InvoiceNoint
StockCodeint
Descriptionvarchar(200)
Quantityint
InvoiceDatevarchar(50)
UnitPricefloat
CustomerIDint
StoreLocationvarchar(100)
CustomerNamevarchar(200)
  1. To begin the ingestion, create the source database.The following CloudFormation template creates a free-tier RDS MySQL instance with a database named sales_schema within a new VPC with the required sample data for this post. The template deploys in the us-west-2 Region.

  1. Create a new Glue connection for the source database.
  2. On the Lake Formation console, choose Blueprints.
  3. Choose Use a blueprint.
  4. Select Incremental database as the blueprint type for a regular ingest of sales data from the source relational database.
  5. Follow the prompts to complete the incremental blueprint setup.
  6. For Database connection, choose forecast-blog-db.
  7. For Source data path, enter sales_schema/sales.
  8. For Target database, choose forecast-blog-landing-db.
  9. For Target storage location, enter s3://forecast-blog-landing.
  10. For Data format, choose Parquet.
  11. For Workflow name, enter forecast-blog-wf.
  12. For IAM role, choose GLUE_WORKFLOW_ROLE.
  13. For Table prefix, enter blog.

The target location must be the landing S3 bucket that you created earlier, and table prefix must be set to blog. This is the raw data that subsequent AWS Glue jobs transform and process. For information about using an incremental database blueprint, see Importing Data Using Workflows.

Start the blueprint after you create it.

Orchestrating data transformation and forecast generation

When you have the raw sales data ingested into the data lake landing bucket, you execute a custom AWS Glue workflow to orchestrate the automation of data transformation, AWS Forecast load/train/forecast execution, and making the forecasts available for business dashboards. Complete the following steps:

  1. Create an AWS Glue crawler to crawl the published S3 bucket that you created earlier.
    1. Use the GLUE_WORKFLOW_ROLE that you created earlier.
  2. Create the following AWS Glue jobs to use in the forecasting automation.
    1. Create the data transformation job as a Spark job, and create the remaining jobs as Python shell (Python 3) jobs.
    2. For IAM role, use GLUE_WORKFLOW_ROLE.
  3. On the Connections page, choose Save job and edit script without selecting any connections. The following jobs are available on GitHub:

Next, you create a new AWS Glue workflow to orchestrate the entire automation. The workflow lets you build and orchestrate a sequence of AWS Glue jobs and crawlers via triggers to complete a complex process.

  1. On the AWS Glue console, choose Workflows.
  2. Choose Add workflow.
  3. For Workflow name, enter AmazonForecastWorkflow.
  4. For Description, add an optional description.
  5. For Default run properties, enter the keys and values in the following table.
KeyValue
landingDBforecast-blog-landing-db
landingDBTableblog_sales_schema_sales
processedBucketforecast-blog-processed
publishedBucketforecast-blog-published
  1. Choose Add workflow.
    After you create the workflow, you add triggers, jobs, and a crawler in your workflow.
  1. Choose the workflow you created earlier.
  2. Choose Add trigger.
  3. For name, enter StartWorkflow.
  4. For Trigger type, select On demand.
  5. Choose Add.
  6. On the Jobs tab, choose the job you created earlier.
  7. Choose Add.
  8. Choose Add node.
  9. Create a new trigger to watch the end of the transform job and start the data import job.
  10. For Name, enter StartDataImport.
  11. For Trigger type, select Event.
  12. For Trigger logic, leave as Start after ANY watched event.
  13. Choose Add.
  1. Choose Add node to the left of the trigger and choose the data transformation job you created earlier.
  2. Choose Add node to the right of the trigger and choose the import data job you created earlier.
  1. Repeat these steps to create the following triggers:
TriggerGlue job to completeGlue job to start
CheckImportTrigger

 

Importing data into Forecast

 

Checking the load data job status
StartPredictorTrigger

Checking the load data job status

 

Training the Forecast predictor

 

CheckPredictorTrigger

 

Training the Forecast predictor

 

Checking the train predictor job status
GenerateForecastTriggerChecking the train predictor job statusGenerating forecast
CheckForecastTriggerGenerating forecastChecking the forecast job status
ExportForecastTriggerChecking the forecast job statusExporting forecast to data lake published bucket
CheckExportTriggerExporting forecast to data lake published bucket

Checking the export forecast job status

 

StartCrawlerTrigger

 

Checking the export forecast job status

 

Crawler you created to crawl the published S3 bucket
  1. From the Actions drop-down menu, choose Run.

This starts the end-to-end forecasting process.

Visualizing your forecasts

To provide your users with a dashboard that refreshes regularly with new forecasts, set up an Amazon QuickSight report and a dashboard and connection to the data lake via Athena. You can use Amazon QuickSight and the Athena data source to access the forecast data and make visualizations.

First, you need to grant access to the forecast data to QuickSight. Identify the Amazon QuickSight service role in your account. Amazon QuickSight assumes the service role (aws-quicksight-service-role-v0) to interact with other AWS services. The service role is automatically created when you start using Amazon QuickSight.

  1. As the data lake administrator user, on the Lake Formation console, locate the table created by the AWS Glue workflow under your published-db.This is the table that has the exported forecast data to visualize.
  2. Grant Select access on this table to the Amazon QuickSight service role using the Grant action within Lake Formation.For more information, see Granting Data Catalog Permissions.You now create a dashboard to visualize the forecast data in Amazon QuickSight.
  3. On the Amazon QuickSight console, choose Manage data.
  4. Create a data source for Athena.
  5. For Data source name, enter forecast-blog-published-db.
  6. Choose Create data source.
  7. For Database, choose forecast-blog-published-db.
  8. Select the table that your crawler created for the exported forecast (forecast_blog_published).
  9. Choose Select.
  10. Choose Visualize.
  11. Create a new analysis on the dataset.
  12. Publish a visualization dashboard.The following screenshot is a QuickSight dashboard displaying your exported forecast. The dashboard was created from a line chart analysis with the columns p10, p50, and p90 values selected for the y-axis and date selected for the x-axis.

Forecast generates probabilistic forecasts so you can generate forecasts at different percentiles depending on your specific use case (for example, if under-stocking or over-stocking is critical to the business). The preceding graph represents the upper and lower bands of forecasted inventory values for the product in your sample data and a selected location (applied as a filter) for a 10-day period. Using this, you can decide on the optimum levels of stock to hold or order for that week.

The p10 is the lower boundary, meaning that there’s only a 10% chance that the actual value is below this line. However, P90 is the upper bound, meaning that there’s a 90% chance that the actual value is below this line. As your training data becomes more comprehensive, the p10 and p90 start to converge. You can also generate forecasts on custom quantiles of your choosing.

For conservative planning, choose a value closer to the p90, which means you’re willing to purchase more inventory than what you actually sell. For aggressive planning, choose a value closer to the p10, which means that you’re willing to accept the risk of running out of inventory.

Conclusion

In this post, you learned how to build an automated inventory forecasting capability for your business on AWS using AI through Forecast and Lake Formation. You learned how to set up a data lake on AWS with the required security governance using Lake Formation. You also learned how to automate the end-to-end process of ingesting sales data into your data lake and automating the data transformation; loading, training, and generating forecasts with Forecast; and making the forecasts accessible to your end-users via Amazon QuickSight visualizations.

 


About the Author

Syed Jaffry is a solutions architect with Amazon Web Services. He works with Financial Services customers to help them deploy secure, resilient, scalable and high performance applications in the cloud.

 

 

Build an AWS Well-Architected environment with the Analytics Lens

Post Syndicated from Nikki Rouda original https://aws.amazon.com/blogs/big-data/build-an-aws-well-architected-environment-with-the-analytics-lens/

Building a modern data platform on AWS enables you to collect data of all types, store it in a central, secure repository, and analyze it with purpose-built tools. Yet you may be unsure of how to get started and the impact of certain design decisions. To address the need to provide advice tailored to specific technology and application domains, AWS added the concept of well-architected lenses 2017. AWS now is happy to announce the Analytics Lens for the AWS Well-Architected Framework. This post provides an introduction of its purpose, topics covered, common scenarios, and services included.

The new Analytics Lens offers comprehensive guidance to make sure that your analytics applications are designed in accordance with AWS best practices. The goal is to give you a consistent way to design and evaluate cloud architectures, based on the following five pillars:

  • Operational excellence
  • Security
  • Reliability
  • Performance efficiency
  • Cost optimization

The tool can help you assess the analytics workloads you have deployed in AWS by identifying potential risks and offering suggestions for improvements.

Using the Analytics Lens to address common requirements

The Analytics Lens models both the data architecture at the core of the analytics applications and the application behavior itself. These models are organized into the following six areas, which encompass the vast majority of analytics workloads deployed on AWS:

  1. Data ingestion
  2. Security and governance
  3. Catalog and search
  4. Central storage
  5. Processing and analytics
  6. User access

The following diagram illustrates these areas and their related AWS services.

There are a number of common scenarios where the Analytics Lens applies, such as the following:

  • Building a data lake as the foundation for your data and analytics initiatives
  • Efficient batch data processing at scale
  • Building a platform for streaming ingest and real-time event processing
  • Handling big data processing and streaming
  • Data-preparation operations

Whichever of these scenarios fits your needs, building to the principles of the Analytics Lens in the AWS Well-Architected Framework can help you implement best practices for success.

The Analytics Lens explains when and how to use the core services in the AWS analytics portfolio. These include Amazon Kinesis, Amazon Redshift, Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. It also explains how Amazon Simple Storage Service (Amazon S3) can serve as the storage for your data lake and how to integrate with relevant AWS security services. With reference architectures, best practices advice, and answers to common questions, the Analytics Lens can help you make the right design decisions.

Conclusion

Applying the lens to your existing architectures can validate the stability and efficiency of your design (or provide recommendations to address the gaps that are identified). AWS is committed to the Analytics Lens as a living tool; as the analytics landscape evolves and new AWS services come on line, we’ll update the Analytics Lens appropriately. Our mission will always be to help you design and deploy well-architected applications.

For more information about building your own Well-Architected environment using the Analytics Lens, see the Analytics Lens whitepaper.

Special thanks to the following individuals who contributed to building this resource, among many others who helped with review and implementation: Radhika Ravirala, Laith Al-Saadoon, Wallace Printz, Ujjwal Ratan, and Neil Mukerje.

Are there questions you’d like to see answered in the tool? Share your thoughts and questions in the comments.

 


About the Authors

Nikki Rouda is the principal product marketing manager for data lakes and big data at Amazon Web Services. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 


Radhika Ravirala is a specialist solutions architect at Amazon Web Services, where she helps customers craft distributed analytics applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley.

Optimize memory management in AWS Glue

Post Syndicated from Mohit Saxena original https://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/

AWS Glue provides a serverless environment to prepare and process datasets for analytics using the power of Apache Spark. In the third post of the series, we discussed how AWS Glue can automatically generate code to perform common data transformations. We also looked at how you can use AWS Glue Workflows to build data pipelines that enable you to easily ingest, transform and load data for analytics.

Apache Spark provides several knobs to control how memory is managed for different workloads. However, this is not an exact science and applications may still run into a variety of out of memory (OOM) exceptions because of inefficient transformation logic, unoptimized data partitioning or other quirks in the underlying Spark engine. In this post of the series, we will go deeper into the inner working of a Glue Spark ETL job, and discuss how we can combine AWS Glue capabilities with Spark best practices to scale our jobs to efficiently handle the variety and volume of our data.

Scaling the Apache Spark driver

Apache Spark driver is responsible for analyzing the job, coordinating, and distributing work to tasks to complete the job in the most efficient way possible. In majority of ETL jobs, the driver is typically involved in listing table partitions and the data files in Amazon S3 before it compute file splits and work for individual tasks. The driver then coordinates tasks running the transformations that will process each file split. In addition, the driver needs to keep track of the progress of each task is making and collect the results at the end. The Spark driver may become a bottleneck when a job needs to process large number of files and partitions. AWS Glue offers five different mechanisms to efficiently manage memory on the Spark driver when dealing with a large number of files.

  1. Push down predicates: Glue jobs allow the use of push down predicates to prune the unnecessary partitions from the table before the underlying data is read. This is useful when you have a large number of partitions in a table and you only want to process a subset of them in your Glue ETL job. Pruning catalog partitions reduces both the memory footprint of the driver and the time required to list the files in the pruned partitions. Push down predicates are applied first to ignore unnecessary partitions before the job bookmark and other exclusions can further filter the list of files to be read from each partition. Below is an example to how to use push down predicates to only process data for events logged only on weekends.
    partitionPredicate ="date_format(to_date(concat(year, '-', month, '-', day)), 'E') in ('Sat', 'Sun')"
    
    datasource = glue_context.create_dynamic_frame.from_catalog(
        database = "githubarchive_month", 
        table_name = "data", 
        push_down_predicate = partitionPredicate)

  2. Glue S3 Lister: AWS Glue provides an optimized mechanism to list files on S3 while reading data into a DynamicFrame. The Glue S3 Lister can be enabled by setting the DynamicFram’s additional_options parameter useS3ListImplementation to True. The Glue S3 Lister offers advantage over the default S3 list implementation by strictly iterating over the final list of filtered files to be read.
    datasource = glue_context.create_dynamic_frame.from_catalog(
        database = "githubarchive_month", 
        table_name = "data", 
        push_down_predicate = partitionPredicate,
        additional_options = {"useS3ListImplementation":True}
    )  

  3. Grouping: AWS Glue allows you to consolidate multiple files per Spark task using the file grouping feature. Grouping files together reduces the memory footprint on the Spark driver as well as simplifying file split orchestration. Without grouping, a Spark application must process each file using a different Spark task. Each task must then send mapStatus object containing the location information to the Spark driver. In our testing using AWS Glue standard worker type, we found that Spark applications processing more than roughly 650,000 files often cause the Spark driver to crash with an out of memory exception as shown by the following error message:
    # java.lang.OutOfMemoryError: Java heap space
    # -XX:OnOutOfMemoryError="kill -9 %p"
    # Executing /bin/sh -c "kill -9 12039"...

    • groupFiles allows you to group files within a Hive-style S3 partition (inPartition) and across S3 partitions (acrossPartition). groupSize is an optional field that allows you to configure the amount of data to be read from each file and processed by individual Spark tasks.
      dyf = glueContext.create_dynamic_frame_from_options("s3",
          {'paths': ["s3://input-s3-path/"],
          'recurse':True,
          'groupFiles': 'inPartition',
          'groupSize': '1048576'}, 
          format="json")

  4. Exclusions for S3 Paths: To further aid in filtering out files that are not required by the job, AWS Glue introduced a mechanism for users to provide a glob expression for S3 paths to be excluded. This speeds job processing while reducing the memory footprint on the Spark driver. The following code snippet shows how to exclude all objects ending with _metadata in the selected S3 path.
    dyf = glueContext.create_dynamic_frame_from_options("s3",
        {'paths': ["s3://input-s3-path/"],
        'exclusions': "\"[\"input-s3-path/**_metadata\"]\""}, 
        format="json")

    Exclusions for S3 Storage Classes: AWS Glue offers the ability to exclude objects based on their underlying S3 storage class. As the lifecycle of data evolve, hot data becomes cold and automatically moves to lower cost storage based on the configured S3 bucket policy, it’s important to make sure ETL jobs process the correct data. This is particularly useful when working with large datasets that span across multiple S3 storage classes using Apache Parquet file format where Spark will try to read the schema from the file footers in these storage classes. Amazon S3 offers 5 different storage classes which are STANDARD, INTELLIGENT_TIERING, STANDARD_IA, ONEZONE_IA, GLACIER, DEEP_ARCHIVE and REDUCED_REDUNDANCY. When reading data using DynamicFrames, you can specify a list of S3 storage classes you want to exclude. This feature leverages the optimized AWS Glue S3 Lister. The following example shows how to exclude files stored in GLACIER and DEEP_ARCHIVE storage classes.

    glueContext.create_dynamic_frame.from_catalog(
        database = "my_database",
        tableName = "my_table_name",
        redshift_tmp_dir = "",
        transformation_ctx = "my_transformation_context",
        additional_options = {
            "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"]
        )
    )

    GLACIER and DEEP_ARCHIVE storage classes only allow listing files and require an asynchronous S3 restore process to read the actual data. The following is the exception you will see when trying to access Glacier and Deep Archive storage classes from your Glue ETL job:

    java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception:
    The operation is not valid for the object's storage class (Service: Amazon S3; Status Code: 403; 
    Error Code: InvalidObjectState; Request ID: ), S3 Extended Request ID: (1)

  5. Optimize Spark queries: Inefficient queries or transformations can have a significant impact on Apache Spark driver memory utilization.Common examples include:
    • collect is a Spark action that collects the results from workers and return them back to the driver. In some cases the results may be very large overwhelming the driver. It is recommended to be careful while using collect as it can frequently cause Spark driver OOM exceptions as shown below:
      An error occurred while calling 
      z:org.apache.spark.api.python.PythonRDD.collectAndServe.
      Job aborted due to stage failure:
      Total size of serialized results of tasks is bigger than spark.driver.maxResultSize

    • Shared Variables: Apache Spark offers two different ways to share variables between Spark driver and executors: broadcast variables and accumulators. Broadcast variables are useful to provide a read-only copy of data or fact tables shared across Spark workers to improve map-side joins. Accumulators are useful to provide a writeable copy to implement distributed counters across Spark executors. Both should be used carefully and destroyed when no longer needed as they can frequently result in Spark driver OOM exceptions.

Scaling Apache Spark executors

Apache Spark executors process data in parallel. However, un-optimized reads from JDBC sources, unbalanced shuffles, buffering of rows with PySpark UDFs, exceeding off-heap memory on each Spark worker, skew in size of partitions, can all result in Spark executor OOM exceptions. We list below some of the best practices with AWS Glue and Apache Spark for avoiding these conditions that result in OOM exceptions.

  1. JDBC Optimizations: Apache Spark uses JDBC drivers to fetch data from JDBC sources such as MySQL, PostgresSQL, Oracle.
    • Fetchsize: By default, the Spark JDBC drivers configure the fetch size to zero. This means that the JDBC driver on the Spark executor tries to fetch all the rows from the database in one network round trip and cache them in memory, even though Spark transformation only streams through the rows one at a time. This may result in the Spark executor running out of memory with the following exception:
      WARN YarnAllocator: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
      WARN YarnSchedulerBackend$YarnSchedulerEndpoint: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
      ERROR YarnClusterScheduler: Lost executor 4 on ip-10-1-2-96.ec2.internal: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.
      WARN TaskSetManager: Lost task 0.3 in stage 0.0 (TID 3, ip-10-1-2-96.ec2.internal, executor 4): ExecutorLostFailure (executor 4 exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

      In Spark, you can avoid this scenario by explicitly setting the fetch size parameter to a non-zero default value. With AWS Glue, Dynamic Frames automatically use a fetch size of 1,000 rows that bounds the size of cached rows in JDBC driver and also amortizes the overhead of network round-trip latencies between the Spark executor and database instance. The example below shows how to read from a JDBC source using Glue dynamic frames.

      val (url, database, tableName) = {
       ("jdbc_url", "db_name", "table_name")
       } 
      val source = glueContext.getSource(format, sourceJson)
      val dyf = source.getDynamicFrame

  • Spark’s Read Partitioning: Apache Spark by default uses only one executor to open up a JDBC connection with the database and read the entire table into a Spark dataframe. This can result in an unbalanced distribution of data processed across different executors. As a result, it is usually recommended to use a partitionColumn, lowerBound, upperBound, and numPartitions to enable reading in parallel from different executors. This allows for more balanced partitioning if there exists a column that has a uniform value distribution. However, Apache Spark restricts the partitionColumn to be one of numeric, date, or timestamp data types. For example:
    val df = spark.read.jdbc(url=jdbcUrl, 
        table="employees", partitionColumn="emp_no", 
        lowerBound=1L, upperBound=100000L, numPartitions=100, 
        fetchsize=1000, connectionProperties=connectionProperties)

  • Glue’s Read Partitioning: AWS Glue enables partitioning JDBC tables based on columns with generic types, such as string. This enables you to read from JDBC sources using non-overlapping parallel SQL queries executed against logical partitions of your table from different Spark executors. You can control partitioning by setting a hashfield or hashexpression. You can also control the number of parallel reads that are used to access your data by specifying hashpartitions. For best results, this column should have an even distribution of values to spread the data between partitions. For example, if your data is evenly distributed by month, you can use the month column to read each month of data in parallel. Based on the database instance type, you may like to tune the number of parallel connections by adjusting the hashpartitions. For example:
    glueContext.create_dynamic_frame.from_catalog(
        database = "my_database",
        tableName = "my_table_name",
        transformation_ctx = "my_transformation_context",
        additional_options = {
            'hashfield': 'month',
            'hashpartitions': '5'
        )
    )

  • Bulk Inserts: AWS Glue offers parallel inserts for speeding up bulk loads into JDBC targets. The following example uses a bulk size of two, which allows two inserts to happen in parallel. This is helpful for improving the performance of writes into databases such as Aur.
    val optionsMap = Map(
      "user" -> user,
      "password" -> pwd,
      "url" -> postgresEndpoint,
      "dbtable" -> table,
      "bulkSize" -> "2")
    val options = JsonOptions(optionsMap)
    val jdbcWrapper = JDBCWrapper(glueContext, options)
    glueContext.getSink("postgresql", options).writeDynamicFrame(dyf)

  1. Join Optimizations: One common reason for Apache Spark applications running out of memory is the use of un-optimized joins across two or more tables. This is typically a result of data skew due to the distribution of join columns or an inefficient choice of join transforms. Additionally, ordering of transforms and filters in the user script may limit the Spark query planner’s ability to optimize. There are 3 popular approaches to optimize join’s on AWS Glue.
    • Filter tables before Join: You should pre-filter your tables as much as possible before joining. This helps to minimize the data shuffled between the executors over the network. You can use AWS Glue push down predicates for filtering based on partition columns, AWS Glue exclusions for filtering based on file names, AWS Glue storage class exclusions for filtering based on S3 storage classes, and use columnar storage formats such as Parquet and ORC that support discarding row groups based on column statistics such as min/max of column values.
    • Broadcast Small Tables: Joining tables can result in large amounts of data being shuffled or moved over the network between executors running on different workers. Because of this, Spark may run out of memory and spill the data to physical disk on the worker. This behavior can be observed in the following log message:
      INFO [UnsafeExternalSorter] — Thread 168 spilling sort data of 3.1 GB to disk (0 time so far)

      In cases where one of the tables in the join is small, few tens of MBs, we can indicate Spark to handle it differently reducing the overhead of shuffling data. This is performed by hinting Apache Spark that the smaller table should be broadcasted instead of partitioned and shuffled across the network. The Spark parameter spark.sql.autoBroadcastJoinThreshold configures the maximum size, in bytes, for a table that will be broadcast to all worker nodes when performing a join. Apache Spark will automatically broadcast a table when it is smaller than 10 MB. You can also explicitly tell Spark which table you want to broadcast as shown in the following example:

      val employeesDF = employeesRDD.toDF
      va departmentsDF = departmentsRDD.toDF
      
      // materializing the department data
      val tmpDepartments = broadcast(departmentsDF.as("departments"))
      
      val joinedDF = employeesDF.join(broadcast(tmpDepartments), 
         $"depId" === $"id",  // join by employees.depID == departments.id 
         "inner")
      
      // Show the explain plan and confirm the table is marked for broadcast
      joinedDF.explain()
      
      == Physical Plan ==
      *BroadcastHashJoin [depId#14L], [id#18L], Inner, BuildRight
      :- *Range (0, 100, step=1, splits=8)
      +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false]))
         +- *Range (0, 100, step=1, splits=8

  1. PySpark User Defined Functions (UDFs): Using PySpark UDFs can turn out to be costly for executor memory. This is because data must be serialized/deserialized when it is exchanged between the Spark executor JVM and the Python interpreter. The Python interpreter needs to process the serialized data in Spark executor’s off-heap memory. For datasets with large or nested records or when using complex UDFs, this processing can consume large amounts of off-heap memory and can lead to OOM exceptions resulting from exceeding the yarn memoryOverhead. Here what the error message looks like:
    ERROR YarnClusterScheduler: Lost executor 1 on ip-xxx:
    Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used.
    Consider boosting spark.yarn.executor.memoryOverhead

    Similarly, data serialization can be slow and often leads to longer job execution times. To avoid such OOM exceptions, it is a best practice to write the UDFs in Scala or Java instead of Python. They can be imported by providing the S3 Path of Dependent Jars in the Glue job configuration. Another optimization to avoid buffering of large records in off-heap memory with PySpark UDFs is to move select and filters upstream to earlier execution stages for an AWS Glue script.

  2. Incremental processing: Processing large datasets in S3 can result in costly network shuffles, spilling data from memory to disk, and OOM exceptions. To avoid these scenarios, it is a best practice to incrementally process large datasets using AWS Glue Job Bookmarks, Push-down Predicates, and Exclusions. Concurrent job runs can process separate S3 partitions and also minimize the possibility of OOMs caused due to large Spark partitions or unbalanced shuffles resulting from data skew. Vertical scaling with higher memory instances can also mitigate the chances of OOM exceptions because of insufficient off-heap memory or Apache Spark applications that can not be readily optimized.

You can also use Glue’s G.1X and G.2X worker types that provide more memory and disk space to vertically scale your Glue jobs that need high memory or disk space to store intermediate shuffle output. Vertical scaling for Glue jobs is discussed in our first blog post of this series.

Conclusion

In this post, we discussed a number of techniques to enable efficient memory management for Apache Spark applications when reading data from Amazon S3 and compatible databases using a JDBC connector. We described how Glue ETL jobs can utilize the partitioning information available from AWS Glue Data Catalog to prune large datasets, manage large number of small files, and use JDBC optimizations for partitioned reads and batch record fetch from databases. You can use some or all of these techniques to help ensure your ETL jobs perform well.

In the next post, we will describe how you can develop Apache Spark applications and ETL scripts locally from your laptop itself with the Glue Spark Runtime containing these optimizations. You can build against the Glue Spark Runtime available from Maven or using a Docker container for cross-platform support. You can develop using Jupyter/Zeppelin notebooks, or your favorite IDE such as PyCharm. Next, you can deploy those Spark applications on AWS Glue’s serverless Spark platform.

 


About the Author

Mohit Saxena is a technical lead manager at AWS Glue. His passion is building scalable distributed systems for efficiently managing data on cloud. He also enjoys watching movies, and reading about the latest technology.

 

 

Build an automatic data profiling and reporting solution with Amazon EMR, AWS Glue, and Amazon QuickSight

Post Syndicated from Francesco Marelli original https://aws.amazon.com/blogs/big-data/build-an-automatic-data-profiling-and-reporting-solution-with-amazon-emr-aws-glue-and-amazon-quicksight/

In typical analytics pipelines, one of the first tasks that you typically perform after importing data into your data lakes is data profiling and high-level data quality analysis to check the content of the datasets. In this way, you can enrich the basic metadata that contains information such as table and column names and their types.

The results of data profiling help you determine whether the datasets contain the expected information and how to use them downstream in your analytics pipeline. Moreover, you can use these results as one of the inputs to an optional data semantics analysis stage.

The great quantity and variety of data in modern data lakes make unstructured manual data profiling and data semantics analysis impractical and time-consuming. This post shows how to implement a process for the automatic creation of a data profiling repository, as an extension of AWS Glue Data Catalog metadata, and a reporting system that can help you in your analytics pipeline design process and by providing a reliable tool for further analysis.

This post describes in detail the application Data Profiler for AWS Glue Data Catalog and provides step-by-step instructions of an example implementation.

Overview and architecture

The following diagram illustrates the architecture of this solution.

Data Profiler for AWS Glue Data Catalog is an Apache Spark Scala application that profiles all the tables defined in a database in the Data Catalog using the profiling capabilities of the Amazon Deequ library and saves the results in the Data Catalog and an Amazon S3 bucket in a partitioned Parquet format. You can use other analytics services such as Amazon Athena and Amazon QuickSight to query and visualize the data.

For more information about the Amazon Deequ data library, see Test data quality at scale with Deequ or the source code on the GitHub repo.

Metadata can be defined as data about data. Metadata for a table contains information like the table name and other attributes, column names and types, and the physical location of the files that contain the data. The Data Catalog is the metadata repository in AWS, and you can use it with other AWS services like Athena, Amazon EMR, and Amazon Redshift.

After you create or update the metadata for tables in a database (for example, adding new data to the table), either with an AWS Glue crawler or manually, you can run the application to profile each table. The results are stored as new versions of the tables’ metadata in the Data Catalog, which you can view interactively via the AWS Lake Formation console or query programmatically via the AWS CLI for AWS Glue.

For more information about the Data Profiler, see the GitHub repo.

The Deequ library does not support tables with nested data (such as JSON). If you want to run the application on a table with nested data, this must be un-nested/flattened or relationalized before profiling. For more information about useful transforms for this task, see AWS Glue Scala DynamicFrame APIs or AWS Glue PySpark Transforms Reference.

The following table shows the profiling metrics the application computes for column data types Text and Numeric. The computation of some profiling metrics for Text columns can be costly and is disabled by default. You can enable it by setting the compExp input parameter to true (see next section).

MetricDescriptionData Type
ApproxCountDistinctApproximate number of distinct values, computed with HyperLogLogPlusPlus sketches.Text / Numeric
CompletenessFraction of non-null values in a column.Text / Numeric
DistinctnessFraction of distinct values of a column over the number of all values of a column. Distinct values occur at least one time. For example, [a, a, b] contains two distinct values a and b, so distinctness is 2/3.Text / Numeric
MaxLengthMaximum length of the column.Text
MinLengthMinimum length of the column.Text
CountDistinctExact number of distinct values.Text
EntropyEntropy is a measure of the level of information contained in an event (value in a column) when considering all possible events (values in a column). It is measured in nats (natural units of information). Entropy is estimated using observed value counts as the negative sum of (value_count/total_count) * log(value_count/total_count). For example, [a, b, b, c, c] has three distinct values with counts [1, 2, 2]. Entropy is then (-1/5*log(1/5)-2/5*log(2/5)-2/5*log(2/5)) = 1.055.Text
HistogramThe summary of values in a column of a table. Groups the given column’s values and calculates the number of rows with that specific value and the fraction of this value.Text
UniqueValueRatioFraction of unique values over the number of all distinct values of a column. Unique values occur exactly one time; distinct values occur at least one time. Example: [a, a, b] contains one unique value b, and two distinct values a and b, so the unique value ratio is 1/2.Text
UniquenessFraction of unique values over the number of all values of a column. Unique values occur exactly one time. Example: [a, a, b] contains one unique value b, so uniqueness is 1/3.Text
ApproxQuantilesApproximate quantiles of a distribution.Numeric
MaximumMaximum value of the column.Numeric
MeanMean value of the column.Numeric
MinimumMinimum value of the column.Numeric
StandardDeviationStandard deviation value of the column.Numeric
SumSum of the column.Numeric

Application description

You can run the application via spark-submit on a transient or permanent EMR cluster (see the “Creating an EMR cluster” section in this post for minimum version specification) with Spark installed and configured with the Data Catalog settings option Use for Spark table metadata enabled.

The following example code executes the application:

 $ spark-submit \
  --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
  --master yarn \
  --deploy-mode cluster \
  --name data-profiler-for-aws-glue-data-catalog \
  /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
  --dbName nyctlcdb \
  --region eu-west-1 \
  --compExp true \
  --statsPrefix DQP \
  --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
  --profileUnsupportedTypes true \
  --noOfBins 30 \
  --quantiles 10

The following table summarizes the input parameters that the application accepts.

NameTypeRequiredDefaultDescription
--dbName (-d)StringYesN/AData Catalog database name. The database must be defined in the Catalog owned by the same account where the application is executed.
--region (-r)StringYesN/AAWS Region endpoint where the Data Catalog database is defined, for example us-west-1 or us-east-1. For more information, see Regional Endpoints.
--compExp (-c)BooleanNofalseIf true, the application also executes “expensive” profiling analyzers on Text columns. These are CountDistinct, Entropy, Histogram, UniqueValueRatio, and Uniqueness. If false, only the following default analyzers are executed: ApproxCountDistinct, Completeness, Distinctness, MaxLength, MinLength. All analyzers for Numeric columns are always executed.
--statsPrefix (-p)StringNoDQPString prepended to the statistics names in the Data Catalog. The application also adds two underscores (__). This is useful to identify metrics calculated by the application.
--s3BucketPrefix (-s)StringNoblankFormat must be s3Buckename/prefix. If specified, the application writes Parquet files with metrics in the prefixes db_name=…/table_name=….
--profileUnsupportedTypes (-u)BooleanNofalseBy default, the Amazon Deequ library only supports Text and Numeric columns. If this parameter is set to true, the application also profiles columns of type Boolean and Date.
--noOfBins (-b)IntegerNo10When --compExp (-c) is true, sets the number of maximum values to create for the Histogram analyzer for String columns.
--quantiles (-q)IntegerNo10Sets the number of quantiles to calculate for the ApproxQuantiles analyzer for numeric columns.

Setting up your environment

The following walkthrough demonstrates how to create and populate a Data Catalog database with three tables, which simulates a process with monthly updates. For this post, you simulate three monthly runs: February 2, 2019, March 2, 2019, and April 2, 2019.

After table creation and after each monthly table update, you run the application to generate and update the profiling information for each table in the Data Catalog and Amazon S3 repository. The post also provides examples of how to query the data using the AWS CLI and Athena, and build a simple Amazon QuickSight dashboard.

This post uses the New York City Taxi and Limousine Commission (TLC) Trip Record Data on the Registry of Open Data on AWS. In particular, you use the tables yellow_tripdata, fhv_tripdata, and green_tripdata.

The following steps explain how to set up the environment.

Creating an EMR cluster

The first step is to create an EMR cluster. Connect to the cluster master node and execute the code via spark-submit. Make sure that the cluster version is at least 5.28.0 with at least Hadoop and Spark installed and that you use the Data Catalog as table metadata for Spark.

The master node should also be accessible via SSH. For instructions, see Connect to the Master Node Using SSH.

Downloading the application

You can download the source code and create a uber jar with all dependencies from the application GitHub repo. You can build the application as a uber jar with all dependencies using the Scala Build Tool (sbt) with the following commands (adjust the memory values according to your needs):

$ export SBT_OPTS="-Xms1G -Xmx3G -Xss2M -XX:MaxMetaspaceSize=3G" && sbt assembly

By default, the .jar file is created in the following path, relative to the project root directory:

./target/scala-2.11/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar

When the .jar file is available, copy it to the master node of the EMR cluster. One way to copy the file to the master node code is to copy it to Amazon S3 from the client where the file was created and download it from Amazon S3 to the master node.

For this post, copy the file in the /home/hadoop directory of the master node. The full path is /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar.

Setting up S3 buckets and copy initial data

You use an S3 bucket to store the data that you profile. For this post, the bucket name is

s3://aws-big-data-blog-samples/

You need to create a bucket with a unique name in your account and store the data there. When the bucket is created and available, use the AWS CLI to copy the first set of files for January 2019 (therefore simulating the February 2, 2019, run) from the s3://nyc-tlc/ bucket. See the following code:

$ DEST_BUCKET=aws-big-data-blog-samples
$ MONTH=2019-01
 
$ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
 
$ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
 
$ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

After you copy the data to your destination bucket, create a second bucket to store the files with the profiling metrics created by the application. This step is optional because you can write the metrics to a prefix in an existing bucket. See the following code:

$ aws s3 mb s3://deequ-profiler/

You are now ready to create the database and tables metadata in the Data Catalog.

Creating metadata in the Data Catalog

The first step is to create a database in AWS Glue. You can create the database using the AWS CLI. See the following code:

$ aws glue create-database \
    --database-input '{"Name": "nyctlcdb"}'

Alternatively, on the AWS Glue console, choose Databases, Add database.

After you create the database, create a new AWS Glue Crawler to infer the schema of the data in the files you copied in the previous step. Complete the following steps:

  1. On the AWS Glue console, choose Crawler.
  2. Choose Add crawler.
  3. For Crawler name, enter nyc-tlc-db-raw.
  4. Choose Next.
  5. For Choose a data store, choose S3.
  6. For Crawl data in, choose Specified path in my account.
  7. For Include path, enter the S3 bucket and prefix where you copied the data earlier.
  8. Choose Next.
  9. In the Choose an IAM role section, select Choose an existing IAM role.
  10. Choose an IAM role that provides access to the S3 bucket and allows writing to the Data Catalog, or create a new one while creating this crawler.
  11. Choose Next.
  12. Choose the database you created earlier to store the tables’ metadata.
  13. Review the crawler properties and choose Finish.
    You can run the crawler when it’s ready. It creates three new tables in the database. The following screenshot shows the update you receive that the crawler is complete.
  14. You can now use the Lake Formation console to check the tables are correct. See the following screenshot of the Tables.If you select one of the tables, the table version is now 0. See the following screenshot.You can also perform the same check using the AWS CLI. See the following code:
    $ aws glue get-table-versions \
    --database-name nyctlcdb \
    --table-name trip_data_yellow \
    --query 'TableVersions[*].VersionId' 

    [
        "0"
    ]

  15. Check the parameters in the table metadata to verify which values the crawler generated. See the following code:
    $ aws glue get-table \
    	--database-name nyctlcdb \
    	--name trip_data_yellow \
       	--query 'Table.Parameters' 

    {
        "CrawlerSchemaDeserializerVersion": "1.0",
        "CrawlerSchemaSerializerVersion": "1.0",
        "UPDATED_BY_CRAWLER": "nyc-tlc-db-raw",
        "areColumnsQuoted": "false",
        "averageRecordSize": "144",
        "classification": "csv",
        "columnsOrdered": "true",
        "compressionType": "none",
        "delimiter": ",",
        "objectCount": "1",
        "recordCount": "4771445",
        "sizeKey": "687088084",
        "skip.header.line.count": "1",
        "typeOfData": "file"
    }

  16. Check the metadata attributes for three columns in the same table. This post chooses the following columns because they have different data types, though any other column is suitable for this check. In the following code, the only attributes currently available for the columns are “Name” and “Type:”
    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]'

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string"
        }
    ]

    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`fare_amount`]'

    [
        {
            "Name": "fare_amount",
            "Type": "double"
        }
    ]

    $ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`passenger_count`]'

    [
        {
            "Name": "passenger_count",
            "Type": "bigint"
        }
    ]

You can display the same information via the Lake Formation console. See the following screenshot.

You are now ready to execute the application.

First application execution

Connect to the EMR cluster master node via SSH and run the application with the following code (change the input parameters as needed, especially the value for the s3BucketPrefix parameter):

$ spark-submit \
    --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
    --master yarn \
    --deploy-mode cluster \
    --name data-profiler-for-aws-glue-data-catalog \
    /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
    --dbName nyctlcdb \
    --region eu-west-1 \
    --compExp true \
    --statsPrefix DQP \
    --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
    --profileUnsupportedTypes true \
    --noOfBins 30 \
    --quantiles 10

Profiling information in the metadata in the Data Catalog

When the application is complete, you can recheck the metadata via the Lake Formation console for the tables and verify that a new table version was created. See the following screenshot.

You can verify the same information via the AWS CLI. See the following code:

$ aws glue get-table-versions \
    --database-name nyctlcdb \
    --table-name trip_data_yellow \
    --query 'TableVersions[*].VersionId'
[
    "1",
    "0"
]

Check the metadata for the table and verify that the profiling information the application generated was successfully stored. In the following code, the parameter “DQP__Size” was generated, which contains the number of records in the table as calculated by the Deequ library:

$ aws glue get-table \ 
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.Parameters'
{
    "CrawlerSchemaDeserializerVersion": "1.0-",
    "CrawlerSchemaSerializerVersion": "1.0",
    "DQP__Size": "7667793.0",
    "UPDATED_BY_CRAWLER": "nyc-tlc-db-raw",
    "areColumnsQuoted": "false",
    "averageRecordSize": "144",
    "classification": "csv",
    "columnsOrdered": "true",
    "compressionType": "none",
    "delimiter": ",",
    "objectCount": "1",
    "recordCount": "4771445",
    "sizeKey": "687088084",
    "skip.header.line.count": "1",
    "typeOfData": "file"
}

Similarly, you can verify that the metadata for the columns you checked previously contains the profiling information the application generated. This is stored in the “Parameters” object for each column. Each new attribute starts with the string “DQP” as specified in the statsPrefix input parameter. See the following code:

$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]' 
[
    {
        "Name": "store_and_fwd_flag",
        "Type": "string",
        "Parameters": {
            "DQP__ApproxCountDistinct": "3.0",
            "DQP__Completeness": "1.0",
            "DQP__CountDistinct": "3.0",
            "DQP__Distinctness": "3.912468685578758E-7",
            "DQP__Entropy": "0.03100483390393341",
            "DQP__Histogram.abs.N": "7630142.0",
            "DQP__Histogram.abs.Y": "37650.0",
            "DQP__Histogram.abs.store_and_fwd_flag": "1.0",
            "DQP__Histogram.bins": "3.0",
            "DQP__Histogram.ratio.N": "0.9950897213839758",
            "DQP__Histogram.ratio.Y": "0.004910148200401341",
            "DQP__Histogram.ratio.store_and_fwd_flag": "1.3041562285262527E-7",
            "DQP__MaxLength": "18.0",
            "DQP__MinLength": "1.0",
            "DQP__UniqueValueRatio": "0.3333333333333333",
            "DQP__Uniqueness": "1.3041562285262527E-7"
        }
    }
]
$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`fare_amount`]'
[
    {
        "Name": "fare_amount",
        "Type": "double",
        "Parameters": {
            "DQP__ApproxCountDistinct": "6125.0",
            "DQP__Completeness": "0.9999998695843771",
            "DQP__Distinctness": "8.187492802687814E-4",
            "DQP__Maximum": "623259.86",
            "DQP__Mean": "12.40940884025023",
            "DQP__Minimum": "-362.0",
            "DQP__StandardDeviation": "262.0720412055651",
            "DQP__Sum": "9.515276582999998E7",
            "DQP__name-0.1": "5.0",
            "DQP__name-0.2": "6.0",
            "DQP__name-0.3": "7.0",
            "DQP__name-0.4": "8.0",
            "DQP__name-0.5": "9.0",
            "DQP__name-0.6": "10.5",
            "DQP__name-0.7": "12.5",
            "DQP__name-0.8": "15.5",
            "DQP__name-0.9": "23.5",
            "DQP__name-1.0": "623259.86"
        }
    }
]

The parameters named “DQP__name-x.x” are the results of the ApproxQuantiles Deequ analyzer for numeric columns; the number of quantiles is set via the –quantiles (-q) input parameter of the application. See the following code:

$ aws glue get-table \
    --database-name nyctlcdb \
    --name trip_data_yellow \
    --query 'Table.StorageDescriptor.Columns[?Name==`passenger_count`]'
[
    {
        "Name": "passenger_count",
        "Type": "bigint",
        "Parameters": {
            "DQP__ApproxCountDistinct": "10.0",
            "DQP__Completeness": "0.9999998695843771",
            "DQP__Distinctness": "1.3041562285262527E-6",
            "DQP__Maximum": "9.0",
            "DQP__Mean": "1.5670782410373156",
            "DQP__Minimum": "0.0",
            "DQP__StandardDeviation": "1.2244305354114957",
            "DQP__Sum": "1.201603E7",
            "DQP__name-0.1": "1.0",
            "DQP__name-0.2": "1.0",
            "DQP__name-0.3": "1.0",
            "DQP__name-0.4": "1.0",
            "DQP__name-0.5": "1.0",
            "DQP__name-0.6": "1.0",
            "DQP__name-0.7": "1.0",
            "DQP__name-0.8": "2.0",
            "DQP__name-0.9": "3.0",
            "DQP__name-1.0": "9.0"
        }
    }
]

Profiling information in Amazon S3

You can now also verify that the profiling information was saved in Parquet format in the S3 bucket you specified in the s3BucketPrefix application input parameter. The following screenshot shows the buckets via the Amazon S3 console.

The data is stored using prefixes that are compatible with Apache Hive partitions. This is useful to optimize performance and costs when you use analytics services like Athena. The partitions are defined on db_name and table_name. The following screenshot shows the details of table_name=trip_data_yellow.

Each execution of the application generates one Parquet file appending data to the metrics table for each physical table.

Second execution after monthly table updates

To run the application after monthly table updates, complete the following steps:

  1. Copy the new files for February 2019 to simulate the March 2 monthly update of the system. See the following code:
    $ DEST_BUCKET=aws-big-data-blog-samples
    $ MONTH=2019-02
     
    $ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

  2. Run the nyc-tlc-db-raw crawler to update the table metadata to include the new files. The following screenshot shows that the three tables were updated successfully.
  3. Check that the crawler created a third version of the table. See the following code:
    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "2",
        "1",
        "0"
    ]

  4. Rerun the application to generate the new profiling metadata, entering the same code as before. To keep clean information, before storing new profiling information in the metadata, the application removes all custom attributes starting with the string specified in the “statsPrefix” See the following code:
    $ spark-submit \
        --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
        --master yarn \
        --deploy-mode cluster \
        --name data-profiler-for-aws-glue-data-catalog \
        /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
        --dbName nyctlcdb \
        --region eu-west-1 \
        --compExp true \
        --statsPrefix DQP \
        --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
        --profileUnsupportedTypes true \
        --noOfBins 30 \
        --quantiles 10

    Following a successful execution, a new version of the table was created. See the following code:

    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "3",
        "2",
        "1",
        "0"
    ]

  5. Check the value of the DQP__Size attribute; its value has changed. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query Table.Parameters.{'DQP__Size:DQP__Size}'

    {
        "DQP__Size": "1.4687169E7"
    }

  6. Check one of the columns you saw earlier to see the updated profiling properties values. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]'

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string",
            "Parameters": {
                "DQP__ApproxCountDistinct": "3.0",
                "DQP__Completeness": "1.0",
                "DQP__CountDistinct": "3.0",
                "DQP__Distinctness": "2.042599223853147E-7",
                "DQP__Entropy": "0.0317381414905775",
                "DQP__Histogram.abs.N": "1.4613018E7",
                "DQP__Histogram.abs.Y": "74149.0",
                "DQP__Histogram.abs.store_and_fwd_flag": "2.0",
                "DQP__Histogram.bins": "3.0",
                "DQP__Histogram.ratio.N": "0.9949513074984022",
                "DQP__Histogram.ratio.Y": "0.005048556328316233",
                "DQP__Histogram.ratio.store_and_fwd_flag": "1.361732815902098E-7",
                "DQP__MaxLength": "18.0",
                "DQP__MinLength": "1.0",
                "DQP__UniqueValueRatio": "0.0",
                "DQP__Uniqueness": "0.0"
            }
        }
    ]

Third execution after monthly tables updates

To run the application a third time, complete the following steps:

  1. Copy the new files for March 2019 to simulate the April 2 monthly update of the system. See the following code:
    $ DEST_BUCKET=aws-big-data-blog-samples
    $ MONTH=2019-03
     
    $ aws s3 cp "s3://nyc-tlc/trip data/yellow_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-yellow/yellow_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/fhv_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-fhv/fhv_tripdata_${MONTH}.csv"
     
    $ aws s3 cp "s3://nyc-tlc/trip data/green_tripdata_${MONTH}.csv" "s3://${DEST_BUCKET}/data/raw/nyc-tlc/trip-data-green/green_tripdata_${MONTH}.csv"

  2. Run the nyc-tlc-db-raw crawler to update the table metadata to include the new files. You now have five versions of the table metadata. See the following code:
    $ aws glue get-table-versions \
        --database-name nyctlcdb \
        --table-name trip_data_yellow \
        --query 'TableVersions[*].VersionId'

    [
        "4",
        "3",
        "2",
        "1",
        "0"
    ]

  3. Rerun the application to update the profiling information. See the following code:
    $ spark-submit \
        --class awsdataprofiler.DataProfilerForAWSGlueDataCatalog \
        --master yarn \
        --deploy-mode cluster \
        --name data-profiler-for-aws-glue-data-catalog \
        /home/hadoop/data-profiler-for-aws-glue-data-catalog-assembly-1.0.jar \
        --dbName nyctlcdb \
        --region eu-west-1 \
        --compExp true \
        --statsPrefix DQP \
        --s3BucketPrefix deequ-profiler/deequ-profiler-metrics \
        --profileUnsupportedTypes true \
        --noOfBins 30 \
        --quantiles 10

  4. Check the DQP__Size parameter to see its new updated value. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query Table.Parameters.{'DQP__Size:DQP__Size}'

    {
        "DQP__Size": "2.2519715E7"
    }

  5. Check one of the columns you saw earlier to the update profiling properties values. See the following code:
    $ aws glue get-table \
        --database-name nyctlcdb \
        --name trip_data_yellow \
        --query 'Table.StorageDescriptor.Columns[?Name==`store_and_fwd_flag`]' 

    [
        {
            "Name": "store_and_fwd_flag",
            "Type": "string",
            "Parameters": {
                "DQP__ApproxCountDistinct": "3.0",
                "DQP__Completeness": "1.0",
                "DQP__CountDistinct": "3.0",
                "DQP__Distinctness": "1.3321660598280218E-7",
                "DQP__Entropy": "0.030948463301702846",
                "DQP__Histogram.abs.N": "2.2409376E7",
                "DQP__Histogram.abs.Y": "110336.0",
                "DQP__Histogram.abs.store_and_fwd_flag": "3.0",
                "DQP__Histogram.bins": "3.0",
                "DQP__Histogram.ratio.N": "0.9951003376374878",
                "DQP__Histogram.ratio.Y": "0.004899529145906154",
                "DQP__Histogram.ratio.store_and_fwd_flag": "1.3321660598280218E-7",
                "DQP__MaxLength": "18.0",
                "DQP__MinLength": "1.0",
                "DQP__UniqueValueRatio": "0.0",
                "DQP__Uniqueness": "0.0"
            }
        }
    ]

You can view and manage the same values via the Lake Formation console. See the following screenshot of the Edit column section.

Data profiling reporting with Athena and Amazon QuickSight

As demonstrated earlier, the application can save profiling information in Parquet format to an S3 bucket and prefix into db_name and table_name partitions. See the following code:

$ aws s3 ls s3://deequ-profiler/deequ-profiler-metrics/ --recursive
2020-01-28 09:30:12          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/_SUCCESS
2020-01-28 09:17:15       6506 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-760dafb1-fc37-4700-a506-a9dc71b7a745-c000.snappy.parquet
2020-01-28 09:01:19       6498 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-78dd2c4a-83c2-44c4-aa71-30e7a9fb0089-c000.snappy.parquet
2020-01-28 09:30:11       6505 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_fhv/part-00000-cff4f2de-64b4-4338-a0f6-a50ed34a378f-c000.snappy.parquet
2020-01-28 09:30:08          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/_SUCCESS
2020-01-28 09:01:15       6355 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-0d5969c9-70a7-4cd4-ac64-8f16e35e23b5-c000.snappy.parquet
2020-01-28 09:17:11       6353 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-12a7b0b0-6a2a-45d5-a241-645148af41d7-c000.snappy.parquet
2020-01-28 09:30:08       6415 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_green/part-00000-adecccd6-a884-403f-aa80-c574647a10f9-c000.snappy.parquet
2020-01-28 09:29:56          0 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/_SUCCESS
2020-01-28 09:16:59       6408 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-2e5e3280-29db-41b9-be67-a68ef8cb9777-c000.snappy.parquet
2020-01-28 09:01:02       6424 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-c4972037-7d3c-4279-8b77-361741133816-c000.snappy.parquet
2020-01-28 09:29:55       6398 deequ-profiler-metrics/db_name=nyctlcdb/table_name=trip_data_yellow/part-00000-f2d6076e-7019-4b03-97ba-a6aab8a677b8-c000.snappy.parquet

The application generates one Parquet file per execution.

Preparing metadata for profiler metrics data

To prepare the metadata for profiler metrics data, complete the following steps:

  1. On the Lake Formation console, create a new database with the name deequprofilerdb to contain the metadata.
  2. On the AWS Glue console, create a new crawler with the name deequ-profiler-metrics to infer the schema of the profiling information stored in Amazon S3.

The following screenshot shows the properties of the new crawler.

After you run the crawler, one table with the name deequ_profiler_metrics was created in the database. The table has the following columns.

NameData TypePartitionDescription
instancestringColumn name the statistic in column “name” refers to. Set to “*” if entity is “Dataset”.
entitystringEntity the statistic refers to. Valid values are “Column” and “Dataset”.
namestringMetrics name, derived from the Deequ Analyzer used for the calculation.
valuedoubleValue of the metric.
typestringData type of the column if entity is “Column”, blank otherwise.
db_name_embedstringDatabase name, same values as in partition “db_name”.
table_name_embedstringTable name, same values as in partition “table_name”.
profiler_run_dtdateDate the profiler application was run.
profiler_run_tstimestampDate/time the profile application was run; it can also be used as execution identifier.
db_namestring1Database name.
table_namestring2Table name.

Reporting with Athena

You can use Athena to run a query that checks the statistics for a column in the database for the execution you ran in March 2019. See the following code:

SELECT db_name, 
    profiler_run_dt,
    profiler_run_ts,
    table_name, 
    entity,
    instance,
    name,
    value
FROM "deequprofilerdb"."deequ_profiler_metrics" 
WHERE db_name = 'nyctlcdb' AND
    table_name = 'trip_data_yellow' AND 
    entity = 'Column' AND
    instance = 'extra' AND
    profiler_run_dt = date_parse('2019-03-02','%Y-%m-%d')
ORDER BY name
;

The following screenshot shows the query results.

Reporting with Amazon QuickSight

To create a dashboard in Amazon QuickSight based on the profiling metrics data the application generated, complete the following steps:

  1. Create a new QuickSight dataset called deequ_profiler_metrics with Athena as the data source.
  2. In the Choose your table section, select the profiling metrics table that you created earlier.
  3. Import the data into SPICE.

After you create the dataset, you can view it and edit its properties. For this post, leave the properties unchanged.

You are now ready to build visualizations and dashboards.

The following images in this section show a simple analysis with controls that allow for the selection of the Database, Table profiled, Entity, Column, and Profiling Metric.

Control NameMapped Column
Databasedb_name
Tabletable_name
Entityentity
Columninstance
Profiling Metricname

For more information about adding controls, see Create Amazon QuickSight dashboards that have impact with parameters, on-screen controls, and URL actions.

For example, you can select the Size metric of a specific table to see how many records are available in the table after each monthly load. See the following screenshot.

Similarly, you can use the same analysis to see how a specific metric changes over time for a column. The following screenshot shows that the mean of the fare_amount column changes after each monthly load.

You can select any metric calculated on any column, which makes for a very flexible profiling data reporting system.

Conclusion

This post demonstrated how to extend the metadata contained in the Data Catalog with profiling information calculated with an Apache Spark application based on the Amazon Deequ library running on an EMR cluster.

You can query the Data Catalog using the AWS CLI. You can also build a reporting system with Athena and Amazon QuickSight to query and visualize the data stored in Amazon S3.

Special thanks go to Sebastian Schelter at Amazon Search and Sven Hansen and Vincent Gromakowski at AWS for their help and support

 


About the Author

Francesco Marelli is a senior solutions architect at Amazon Web Services. He has lived and worked in London for 10 years, after that he has worked in Italy, Switzerland and other countries in EMEA. He is specialized in the design and implementation of Analytics, Data Management and Big Data systems, mainly for Enterprise and FSI customers. Francesco also has a strong experience in systems integration and design and implementation of web applications. He loves sharing his professional knowledge, collecting vinyl records and playing bass.

Simplify data pipelines with AWS Glue automatic code generation and Workflows

Post Syndicated from Mohit Saxena original https://aws.amazon.com/blogs/big-data/simplify-data-pipelines-with-aws-glue-automatic-code-generation-and-workflows/

In the previous post of the series, we discussed how AWS Glue job bookmarks help you to incrementally load data from Amazon S3 and relational databases. We also saw how using the AWS Glue optimized Apache Parquet writer can help improve performance and manage schema evolution.

In the third post of the series, we’ll discuss three topics. First, we’ll look at how AWS Glue can automatically generate code to help transform data in common use cases such as selecting specific columns, flattening deeply nested records, efficiently parsing nested fields, and handling column data type evolution.

Second, we’ll outline how to use AWS Glue Workflows to build and orchestrate data pipelines using different Glue components such as Crawlers, Apache Spark and Python Shell ETL jobs.

Third, we’ll see how to leverage SparkSQL in your ETL jobs to perform SQL based transformations on datasets stored in Amazon S3 and relational databases.

Automatic Code Generation & Transformations: ApplyMapping, Relationalize, Unbox, ResolveChoice

AWS Glue can automatically generate code to help perform a variety of useful data transformation tasks. These transformations provide a simple to use interface for working with complex and deeply nested datasets. For example, some relational databases or data warehouses do not natively support nested data structures. AWS Glue can automatically generate the code necessary to flatten those nested data structures before loading them into the target database saving time and enabling non-technical users to work with data.

The following is a list of the popular transformations AWS Glue provides to simplify data processing:

  1. ApplyMapping is a transformation used to perform column projection and convert between data types. In this example, we use it to unnest several fields, such as action.id, which we map to the top-level action.id field. We also cast the id column to a long.
    medicare_output = medicare_src.apply_mapping(
        [('id, 'string', id, 'string'), 
        ('type, string, type', string),
        ('actor.id, 'int', actor.id', int),
        ('actor.login', 'string', actor.login', 'string'),
        ('actor.display_login', 'string', 'actor.display_login', 'string'),
        ('actor.gravatar_id', 'long', 'actor.gravatar_id', 'long'),
        ('actor.url', 'string','actor.url', 'string'),
        ('actor.avatar_url', 'string', 'actor.avatar_url', string)]
    )

  1. Relationalize converts a nested dataset stored in a DynamicFrameto a relational (rows and columns) format. Nested structures are unnested into top level columns and arrays decomposed into different tables with appropriate primary and foreign keys inserted. The result is a collection of DynamicFrames representing a set of tables that can be directly inserted into a relational database. More detail about relationalize can be found here.
    ## An example relationalizing and writing to Redshift
    dfc = history.relationalize("hist_root", redshift_temp_dir)
    ## Cycle through results and write to Redshift.
    for df_name in dfc.keys():
        df = dfc.select(df_name)
        print "Writing to Redshift table: ", df_name, " ..."
        glueContext.write_dynamic_frame.from_jdbc_conf(frame = df, 
            catalog_connection = "redshift3", 
            connection_options = {"dbtable": df_name, "database": "testdb"}, 
            redshift_tmp_dir = redshift_temp_dir)

  2. Unbox parses a string field of a certain type, such as JSON, into individual fields with their corresponding data types and store the result in a DynamicFrame. For example, you may have a CSV file with one field that is in JSON format {“a”: 3, “b”: “foo”, “c”: 1.2}. Unbox will reformat the JSON string into three distinct fields: an int, a string, and a double. The Unbox transformation is commonly used to replace costly Python User Defined Functions required to reformat data that may result in Apache Spark out of memory exceptions. The following example shows how to use Unbox:
    df_result = df_json.unbox('json', "json")

  3. ResolveChoice: AWS Glue Dynamic Frames support data where a column can have fields with different types. These columns are represented with Dynamic Frame’s choice type. For example, Dynamic Frame schema for the medicare dataset shows up as follows:
    root
     |-- drg definition: string
     |-- provider id: choice
     |    |-- long
     |    |-- string
     |-- provider name: string
     |-- provider street address: string

    This is because the “provider id” column could either be a long or string type. The Apache Spark Dataframe considers the whole dataset and is forced to cast it to the most general type, namely string. Dynamic Frames allow you to cast the type using the ResolveChoice transform. For example, you can cast the column to long type as follows.

    medicare_res = medicare_dynamicframe.resolveChoice(specs = [('provider id','cast:long')])
    
    medicare_res.printSchema()
     
    root
     |-- drg definition: string
     |-- provider id: long
     |-- provider name: string
     |-- provider street address: string

    This transform would also insert a null where the value was a string that could not be cast. As a result, the records with string type casted to null values can also be identified now. Alternatively, the choice type can also be cast to struct, which keeps values of both types.

Build and orchestrate data pipelines using AWS Glue Workflows

AWS Glue Workflows provide a visual tool to author data pipelines by combining Glue crawlers for schema discovery, and Glue Spark and Python jobs to transform the data. Relationships can be defined and parameters passed between task nodes to enable users to build pipelines of varying complexity. Workflows can be scheduled to run on a schedule or triggered programmatically. You can track the progress of each node independently or the entire workflow making it easier to troubleshoot your pipelines.

A typical workflow for ETL workloads is organized as follows:

  1. Glue Python command triggered manually, on a schedule, or on an external CloudWatch event. It would pre-process or list the partitions in Amazon S3 for a table under a base location. For example, a CloudTrail logs partition to process could be: s3://AWSLogs/ACCOUNTID/CloudTrail/REGION/YEAR/MONTH/DAY/HOUR/.The Python command can list all the regions and schedule crawlers to create different Glue Data Catalog tables on each region.
  2. Glue Crawlers triggered next to populate new partitions for every hour in Glue Data Catalog for recently ingested in Amazon S3.
  3. Concurrent Glue ETL jobs triggered to separately filter and process each partition or a group of partitions. For example, CloudTrail events corresponding to the last week can be read by a Glue ETL job by passing in the partition prefix as Glue job parameters and using Glue ETL push down predicates to just read all the partitions in that prefix.Partitioning and orchestrating concurrent Glue ETL jobs allows you to scale and reliably execute individual Apache Spark applications by processing only a subset of partitions in the Glue Data Catalog table. The transformed data can then be concurrently written back by all individual Glue ETL jobs to a common target table in Amazon S3 data lake, AWS Redshift or other databases.

Finally, a Glue Python command can be triggered to capture the completion status of the different Glue entities including Glue Crawlers, parallel Glue ETL jobs; and post-process or retry any failed components.

Executing SQL using SparkSQL in AWS Glue

AWS Glue Data Catalog as Hive Compatible Metastore

The AWS Glue Data Catalog is a managed metadata repository compatible with the Apache Hive Metastore API. You can follow the detailed instructions here to configure your AWS Glue ETL jobs and development endpoints to use the Glue Data Catalog. You also need to add the Hive SerDes to the class path of AWS Glue Jobs to serialize/deserialize data for the corresponding formats. You can then natively run Apache Spark SQL queries against your tables stored in the Data Catalog.

The following example assumes that you have crawled the US legislators dataset available at s3://awsglue-datasets/examples/us-legislators. We’ll use the Spark shell running on AWS Glue developer endpoint to execute SparkSQL queries directly on the legislators’ tables cataloged in the AWS Glue Data Catalog.

>>> spark.sql("use legislators")
DataFrame[]
>>> spark.sql("show tables").show()
+-----------+------------------+-----------+
|   database|         tableName|isTemporary|
+-----------+------------------+-----------+
|legislators|        areas_json|      false|
|legislators|    countries_json|      false|
|legislators|       events_json|      false|
|legislators|  memberships_json|      false|
|legislators|organizations_json|      false|
|legislators|      persons_json|      false|

>>> spark.sql("select distinct organization_id from memberships_json").show()
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+

A similar approach to the above would be to use AWS Glue DynamicFrame API to read the data from S3. The DynamicFrame is then converted to a Spark DataFrame using the toDF method. Next, a temporary view can be registered for DataFrame, which can be queried using SparkSQL. The key difference between the two approaches is the use of Hive SerDes for the first approach, and native Glue/Spark readers for the second approach. The use of native Glue/Spark provides the performance and flexibility benefits such as computation of the schema at runtime, schema evolution, and job bookmarks support for Glue Dynamic Frames.

>>> memberships = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="memberships_json")
>>> memberships.toDF().createOrReplaceTempView("memberships")
>>> spark.sql("select distinct organization_id from memberships").show()
+--------------------+
|     organization_id|
+--------------------+
|d56acebe-8fdc-47b...|
|8fa6c3d2-71dc-478...|
+--------------------+

Workflows and S3 Consistency

If you have a workflow of external processes ingesting data into S3, or upstream AWS Glue jobs generating input for a table used by downstream jobs in a workflow, you can encounter the following Apache Spark errors.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 10 in stage 16.0 failed 4 times, most recent failure: Lost task 10.3 in stage 16.0 (TID 761, ip-<>.ec2.internal, executor 1): 
java.io.FileNotFoundException: No such file or directory 's3://<bucket>/fileprefix-c000.snappy.parquet'
It is possible the underlying files have been updated.
You can explicitly invalidate the cache in Spark by running 
'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

These errors happen when the upstream jobs overwrite to the same S3 objects that the downstream jobs are concurrently listing or reading. This can also happen due to eventual consistency of S3 resulting in overwritten or deleted objects get updated at a later time when the downstream jobs are reading. A common manifestation of this error occurs when you are create a SparkSQL view and execute SQL queries in the downstream job. To avoid these errors, the best practice is to set up a workflow with upstream and downstream jobs scheduled at different times, and read/write to different S3 partitions based on time.

You can also enable the S3-optimized output committer for your Glue jobs by passing in a special job parameter: “–enable-s3-parquet-optimized-committer” set to true. This committer improves application performance by avoiding list and rename operations in Amazon S3 during job and task commit phases. It also avoids issues that can occur with Amazon S3’s eventual consistency during job and task commit phases, and helps to minimize task failures.

Conclusion

In this post, we discussed how to leverage the automatic code generation process in AWS Glue ETL to simplify common data manipulation tasks such as data type conversion and flattening complex structures. We also explored using AWS Glue Workflows to build and orchestrate data pipelines of varying complexity. Lastly, we looked at how you can leverage the power of SQL, with the use of AWS Glue ETL and Glue Data Catalog, to query and transform your data.

In the final post, we will explore specific capabilities in AWS Glue and best practices to help you better manage the performance, scalability and operation of AWS Glue Apache Spark jobs.

 


About the Authors

Mohit Saxena is a technical lead manager at AWS Glue. His passion is building scalable distributed systems for efficiently managing data on cloud. He also enjoys watching movies, and reading about the latest technology.

 

 

Exploring the public AWS COVID-19 data lake

Post Syndicated from Jason Berkowitz original https://aws.amazon.com/blogs/big-data/exploring-the-public-aws-covid-19-data-lake/

The AWS COVID-19 data lake—a centralized repository of up-to-date and curated datasets on or related to the spread and characteristics of the novel coronavirus (SARS-CoV-2) and its associated illness, COVID-19—is now publicly available. For more information, see A public data lake for analysis of COVID-19 data. Globally, there are several efforts underway to gather this data, and AWS is working with partners to make this crucial data freely available and keep it up-to-date.

This data is readily available for you to ask questions, blend it with your own datasets, and create new insights in your own data lake. AWS is supporting Northwestern University in performing research developing pandemic-surveillance methods. Ariel Chandler, health informatics PhD candidate, says, “The AWS COVID-19 data lake provided me access to public data easily so I didn’t have to do the heavy lifting to get access to information that should be at everyone’s fingertips. Access to the AWS Data Exchange and these processing tools are helping to track, report, and visualize the spread of COVID-19 across the state to aid with the Illinois public health response. The data lake uses a wide range of data sources, including consumer and location data, to inform which communities are most at risk. That information is used to guide the provision of medical and social services to those who need them the most during this crisis.”

You can also produce new ways to query the information and publish those insights back into the data lake. Data may come from public websites, data purchased via data providers on AWS Data Exchange, or internal systems.

This post walks you through accessing the AWS COVID-19 data lake through the AWS Glue Data Catalog via Amazon SageMaker or Jupyter and using the open-source AWS Data Wrangler library. AWS Data Wrangler is an open-source Python package that extends the power of Pandas library to AWS and connects DataFrames and AWS data-related services (such as Amazon Redshift, Amazon S3, AWS Glue, Amazon Athena, and Amazon EMR). For more information about what you can build by using this data lake, see the associated public Jupyter notebook on GitHub.

The data for this post is from the following sources:

This data lake is comprised of data in a publicly readable Amazon S3 bucket. For a complete selection of COVID-19 data, see Data related to COVID-19 available for Research & Development. For instructions on subscribing to data products, see AWS Data Exchange – Find, Subscribe To, and Use Data Products.

Solution overview

This walkthrough includes the following steps:

  1. Installing the AWS CLI
  2. Configuring Amazon SageMaker
  3. Exploring the data through the Data Catalog

You also explore four analyses and their visualizations:

  • County-level percent changes
  • Foot traffic to public venues
  • Impact of number of cases on hospital beds
  • Impact of population density on hospital beds

Prerequisites

This post assumes that you have configured access to the data using an AWS CloudFormation template. For instructions, see A public data lake for analysis of COVID-19 data.

You also need access to an AWS account with permissions to do the following:

  • Create a CloudFormation stack
  • Create AWS Glue resources (catalog databases and tables)
  • Launch Amazon SageMaker notebooks

Installing the AWS CLI

Your first step is to install the AWS CLI and configure it for the us-east-2 Region. This is where the COVID-19 public data lake exists.

If you plan to work locally in Jupyter, you should set up a virtual environment for installing Python packages. Make sure that the following Python packages are installed: plotly, pandas, numpy, and awswrangler.

Configuring Amazon SageMaker

To configure Amazon SageMaker, complete the following steps:

  1. Create your Amazon SageMaker notebook instance in us-east-2 (the database and tables you created in the post A public data lake for analysis of COVID-19 data are in that Region).
  2. Record the IAM role you use for the notebook instance.
  3. Modify the IAM role assigned to the notebook instance to add the policies AmazonAthenaFullAccess and AWSDataExchangeSubscriberFullAccess.
  4. Create a Jupyter notebook on your new notebook instance.

Make sure the following Python packages are installed: plotly, pandas, numpy, awswrangler. For more information about installing external Python packages, see Install External Libraries and Kernels in Notebook Instances.

Exploring the data through the Data Catalog

When the CloudFormation stack shows the status CREATE_COMPLETE, you can view the tables the template created. You’re now ready to explore the data and its visualizations. This post provides four examples of visualizations.

County-level percent changes

Enigma – Global Coronavirus (COVID-19) Data (Johns Hopkins) tracks the global number of cases, recoveries, and deaths per day. Data sources include the World Health Organization (WHO), the US Centers for Disease Control and Prevention (CDC), and the National Health Commission of the People’s Republic of China (NHC). The data is collected by Johns Hopkins University and supported by the ESRI Living Atlas Team.

You can visualize the percent increase of a US county’s infected population over a day with this data. For example, if a county has a population of 1,000 and its infected population increases from 10 to 100 from Monday to Tuesday, then its infected population increased from 1% to 10%.

The following visualizations show the increase in the percent of population infected from March 29, 2020, to March 30, 2020, in New York City and surrounding areas. The more yellow a county, the larger increase of cases occurred. Gray counties increased less than 0.01% from March 29 to March 30.

The following visualization zooms in on New York counties. The yellow county is New York County (the borough of Manhattan), with an increase of 0.23% of its population with COVID-19 from March 29 to March 30. The blue counties to its east are Nassau County and Suffolk County, with 0.07% and 0.05% increases, respectively. The blue-green county north of New York County is Westchester County, with an increase of 0.08%.

The accompanying notebook allows you to vary the date parameter to visualize various rates of increase and zoom out of the map to visualize the entire United States.

Foot traffic to public venues

Foursquare – COVID-19 Foot Traffic Data is a daily aggregated and anonymized percentage dataset that demonstrates how foot traffic to various venues (such as airports, gyms, and grocery stores) has changed since February 19, 2020, in different metro areas. To obtain the following visualizations, you download the data from AWS Data Exchange and use Amazon SageMaker notebooks to visualize.

The following visualization uses the foot traffic data to plot the change of foot traffic to various venues after February 19. The plot shows how public traffic to shopping malls, clothing stores, casual dining chains, and airports have a sharp decline after the National Emergency Declaration on March 13. On the other hand, traffic to grocery stores, warehouse stores, and drug stores have sharp increases in the same period.

You can make similar plots for various metro areas, including New York City, San Francisco/Oakland, Los Angeles, Seattle, and 19 different venues with the accompanying notebook.

Impact of number of cases on hospital beds

The following visualizations use Enigma – Global Coronavirus (COVID-19) Data (Johns Hopkins) and Rearc – USA Hospital Beds – COVID-19 | Definitive Healthcare to analyze how the growing number of COVID-19 cases affects local hospitals. The hospital bed dataset is a dataset of the numbers of licensed beds, staffed beds, ICU beds, and the bed utilization rate for hospitals in the United States.

The first plot shows the growth in the number of hospitalized cases over 10 days in New York County. The hospitalized cases are calculated with a 10% hospitalization rate, which means 10% of all COVID-19 cases for Manhattan result in hospitalization. In the accompanying notebook, the hospitalization rate is a parameter, so you can visualize how various hospitalization rates have different healthcare needs. To generate this plot, you use the daily COVID-19 case information from Johns Hopkins to simulate the number of hospitalized cases and use the Definitive Healthcare hospital bed data to calculate the total hospital capacity for Manhattan.

The second visualization is a hospital utilization plot by county for the entire United States. The more yellow a county, the more its healthcare resources are burdened with a 20% COVID-19 hospitalization rate. Counties in gray have less than a 5% hospitalization rate.

As with the previous visualization, you can simulate various hospitalization rates in the accompanying notebook to visualize how COVID-19 burdens health care resources around the country. You can also change the data parameter to visualize how healthcare resource requirements change over time.

Impact of population density on hospital beds

The following visualization uses Enigma – Global Coronavirus (COVID-19) Data (Johns Hopkins), Rearc – USA Hospital Beds – COVID-19 | Definitive Healthcare, and US Census County-Based Data to compare the confirmed cases and available hospital beds per square kilometer for two different counties. The Enigma dataset provides case data; the Rearc dataset provides hospital bed information throughout the country, which is aggregated at the county level. The number of cases and beds is normalized by the land area in square kilometers using the US Census County data.

In the accompanying notebook, you can change the scope of visualization, the number of entities, and the bed resources. The following visualization provides cases and licensed beds per square kilometer at the county level for Alameda and San Diego counties.

These examples are a few of the innumerable analyses you can run on the public data lake.

Cleaning up

You incur no additional cost for accessing the AWS COVID-19 data lake beyond the standard charges for the AWS services that you use. For example, if you use Athena, you incur the costs for running queries and the data storage for the query result in Amazon S3, but incur no costs for accessing the data lake. Depending on the Amazon SageMaker instance you choose, you may incur Amazon SageMaker fees. For more information, see Amazon SageMaker Pricing.

To avoid recurring charges, shut down and delete the Amazon SageMaker instance, any S3 buckets you created, and disable auto-subscriptions for AWS Data Exchange.

Conclusion

Combining our efforts across organizations and scientific disciplines can help us win the fight against the COVID-19 pandemic. With the AWS COVID-19 data lake, you can experiment with and analyze curated data related to the virus, and share your own data and results. We believe that through an open and collaborative effort that combines data, technology, and science, we can inspire insights and foster breakthroughs necessary to contain, curtail, and ultimately cure COVID-19.

For more information about the public AWS COVID-19 data lake visit: https://aws.amazon.com/covid-19-data-lake/.

 


About the Authors

Jason Berkowitz is the Americas Data & Analytics Professional Services Practice Lead. He comes from a background in Machine Learning, Data Lake Architectures and helping customers become data-driven. He is currently working helping customers shape their data lakes and analytic journeys on AWS within Professional Services.

 

 

Colby Wise is a senior data scientist and manager at Amazon Machine Learning Solutions Lab, where he helps AWS customers across different industries accelerate their AI and cloud adoption.

 

 

 

 

 

Ninad Kulkarni is a data scientist in the Amazon Machine Learning Solutions Lab. He helps customers adopt ML and AI solutions by building solutions to address their business problems. Most recently, he has built predictive models for sports customers for on-screen consumption to improve fan engagement.