Tag Archives: Amazon Athena

Chasing earthquakes: How to prepare an unstructured dataset for visualization via ETL processing with Amazon Redshift

Post Syndicated from Ian Funnell original https://aws.amazon.com/blogs/big-data/chasing-earthquakes-how-to-prepare-an-unstructured-dataset-for-visualization-via-etl-processing-with-amazon-redshift/

As organizations expand analytics practices and hire data scientists and other specialized roles, big data pipelines are growing increasingly complex. Sophisticated models are being built using the troves of data being collected every second.

The bottleneck today is often not the know-how of analytical techniques. Rather, it’s the difficulty of building and maintaining ETL (extract, transform, and load) jobs using tools that might be unsuitable for the cloud.

In this post, I demonstrate a solution to this challenge. I start with a noisy semistructured dataset of seismic events, spanning several years and recorded at different locations across the globe. I set out to obtain broad insights about the nature of the rocks forming the Earth’s surface itself—the tectonic plate structure—to be visualized using the mapping capability in Amazon QuickSight.

To accomplish this, I use several AWS services, orchestrated together using Matillion ETL for Amazon Redshift:

Tectonic plate structure context

An earthquake is caused by a buildup of pressure that gets suddenly released. Earthquakes tend to be more severe at the boundaries of destructive tectonic plates. These boundaries are formed when a heavier and denser oceanic plate collides with a lighter continental plate, or when two oceanic plates collide. Due to the difference in density, the oceanic lithosphere is pushed underneath the continental plate, forming what is called a subduction zone. (See the following diagram.) In subduction zones, earthquakes can occur at depths as great as 700 kilometers.

Photography by KDS4444 [CC BY-SA 4.0 (https://creativecommons.org/licenses/by-sa/4.0)], from Wikimedia Commons

For our analysis, we ultimately want to visualize the depth of an earthquake focus to locate subduction zones, and therefore find places on earth with the most severe earthquakes.

Seismographic data source

The data source is from the International Federation of Digital Seismograph Networks (FDSN). The event data is in JSON format (from the European Mediterranean Seismological Centre, or EMSC). An external process accumulates files daily into an Amazon S3 bucket, as shown following.

Each individual file contains all the seismic events for one day—usually several hundred—in an embedded array named “features,” as shown in the following example:

{
  "type": "FeatureCollection",
  "metadata": {
    "totalCount": 103
  },
  "features": [
    {
      "geometry": {
        "type": "Point",
        "coordinates": [26.76, 45.77, -140]
      },
      "type": "Feature",
      "id": "20180302_0000103",
      "properties": {
        "lastupdate": "2018-03-02T23:27:00.0Z",
        "lon": 26.76, "lat": 45.77, "depth": 140,
        "mag": 3.7,
        "time": "2018-03-02T23:22:52.1Z",
        "flynn_region": "ROMANIA"
      }
    },
    {
      "geometry": {
        "type": "Point",

Architecture overview

Athena reads and flattens the S3 data and makes it available for Matillion ETL to load into Amazon Redshift via JDBC. Matillion orchestrates this data movement, and it also provides a graphical framework to design and build the more complex data enrichment and aggregation steps to be performed by Amazon Redshift. Finally, the prepared data is queried by Amazon QuickSight for visualization.

Amazon Athena setup

You can use Athena to query data in S3 using standard SQL, via a serverless infrastructure that is managed entirely by AWS on your behalf. Before you can query your data, start by creating an external table. By doing this, you are defining the schema to apply to the data when it is being queried.

You can choose to use an AWS Glue crawler to assist in automatically discovering the schema and format of your source data files.

The following is the CREATE TABLE statement that you can copy and paste into the Athena console to create the schema needed to query the seismic data. Make sure that you substitute the correct S3 path to your seismic data in the LOCATION field of the statement.

CREATE EXTERNAL TABLE `sp_events`(
  `type` string COMMENT 'from deserializer', 
  `metadata` struct<totalcount:int> COMMENT 'from deserializer', 
  `features` array<struct<geometry:struct<type:string,coordinates:array<double>>,type:string,id:string,properties:struct<lastupdate:string,magtype:string,evtype:string,lon:double,auth:string,lat:double,depth:double,unid:string,mag:double,time:string,source_id:string,source_catalog:string,flynn_region:string>>> COMMENT 'from deserializer')
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://your-bucket/SeismicPortal'

After the table is created, you are ready to query it. Athena uses the in-memory, distributed SQL engine called Apache Presto. It provides the ability to unnest arrays, which you use next.

Transfer to Amazon Redshift

The embedded array in every source record gets flattened out and converted to individual records during the JDBC export (download the .jar file) into Amazon Redshift. You use a Matillion ETL Database Query component to assist with the data transfer during this step, as shown in the following image.

This component simplifies ETL by automating the following steps:

Runs the SQL SELECT statement (shown in the following example).

  1. Streams query results across the network from Athena and into temporary storage in S3.
  2. Performs a bulk data load into Amazon Redshift.

Athena executes the following SQL statement:

SELECT f.id,
	   f.properties.time AS event_time,
	   f.properties.lastupdate,
   f.properties.lon,
   f.properties.lat,
   f.properties.depth,
   f.properties.mag,
   f.properties.flynn_region
FROM “seismic”.”sp_events”
CROSS JOIN UNNEST (features) as t(f)

The CROSS JOIN UNNEST syntax flattens the embedded array, generating hundreds of individual event records per day.

Now that the data has been copied and flattened into individual event records (shown in the following image), it’s ready for enrichment and aggregation.

Data enrichment

Earthquakes occur along a continuous range of spatial coordinates. In order to aggregate them, as we’ll be doing very soon, it’s necessary to first group them together. A convenient method is to assign every event into a Universal Transverse Mercator (UTM) zone. These zones are six-degree bands of longitudes that convert the spherical latitude/longitude coordinates into a 2D representation. Performing this conversion provides good granularity for visualization later.

The calculation to convert a spherical longitude/latitude coordinate into a two-dimensional UTM coordinate is complex. It can be performed ideally using an Amazon Redshift user-defined function (UDF). I chose a UDF for the ability to invoke it, via a Matillion component, in the next step.

CREATE OR REPLACE FUNCTION f_ll_utm (Lat float, Long float)
      RETURNS VARCHAR
STABLE
AS $$
From math import pi, sin, cos, tan, sqrt

_deg2rad = pi / 180.0
_rad2deg = 180.0 / pi

_EquatorialRadius = 1
_eccentricitySquared = 2
_ellipsoid = [ “WGS-84”, 6378137, 0.00669438]

The UDF has to return three pieces of information:

  • UTM Zone code
  • Easting (x-axis measurement in meters)
  • Northing (ditto, for the y-axis)

A scalar UDF can only return a single value. Therefore the three results were returned as a pipe-delimited string, in which the three values are pipe-separated:

To bring the values out into individual fields, the UDF is first invoked using a Matillion ETL Calculator component, followed by a field splitter and a Calculator to perform data type conversion and rounding.

Data aggregation

To reiterate, we’re interested in the depth of earthquake focus specifically on destructive plate boundaries. Knowing the depth helps us estimate the potential severity of earthquakes.

We need to find the average event depth within each UTM zone, in the expectation that a spatial pattern will appear that will highlight the subduction zones.

The last three steps in the Matillion transformation (shown in the following diagram) perform the necessary aggregation, add a depth quartile, and create an output table from the resulting data.

The ”Aggregate to UTM ref” step gets Amazon Redshift to perform a GROUP BY function in SQL, which approximates every event to the appropriate UTM zone. While doing this aggregation, you simultaneously do the following:

  • Count the events (which determines the size of the visual representation).
  • Find the average depth (which determines the color of the visual representation).
  • Determine the average latitude and longitude (which approximates to the center of the UTM zone, and determines the position of the visual representation).

The following image shows the aggregation type for each column:

Average depth is a useful measure, but to maximize the visual impact of the final presentation, we also take the opportunity to rank the results into quartiles. This allows the zones with the deepest quartile to stand out consistently on the map.

NTILE(4) OVER (ORDER BY "avg_depth")

Amazon Redshift is great at performing this type of analytics, which is delivered inside another Matillion ETL Calculator component.

The Recreate Output step materializes the dataset into an Amazon Redshift table, ready for Amazon QuickSight to visualize.

Amazon QuickSight visualization

The Amazon QuickSight “points on map” visualization is perfect for this 2D rendering. The values for the field wells come straight from the aggregated data in Amazon Redshift:

  • Geospatial — the average lat/long per UTM grid.
  • Size — the record count, in other words, the number of events in that UTM zone.
  • Color — the Depth Ntile, with the fourth quartile in pink.

The resulting map shows the global subduction zones highlighted clearly in pink, these being the areas with the deepest earthquake’s focus on average.

Recap and summary

In this post, I used seismological data as an example to explore challenges around the visualization of unstructured data and to provide best practices. I suggested a way to overcome these challenges with an architecture that is also applicable for datasets from a wide array of sources, beyond geology. I then explored how to orchestrate activities of different data processing tasks between S3, Athena, and Amazon Redshift using Matillion ETL for Amazon Redshift.

If you’re trying to solve similar analytics challenges and want to see how Amazon Redshift and Matillion can help, launch a 14 day free trial of Matillion ETL for Amazon Redshift on the AWS Marketplace or schedule a demo today. If you have questions or suggestions, please comment below.


Additional Reading

If you found this post helpful, be sure to check out Top 8 Best Practices for High-Performance ETL Processing Using Amazon Redshift, and Orchestrate multiple ETL jobs using AWS Step Functions and AWS Lambda.

 


About the Author

Ian Funnell, a lead solution architect at Matillion, is a seasoned cloud and data warehousing specialist. Ian is dedicated to supporting customers in driving their data transformation forward and solving their deepest technical challenges. Ian’s has 25+ years experience in the tech industry.

 

 

 

Using CTAS statements with Amazon Athena to reduce cost and improve performance

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/using-ctas-statements-with-amazon-athena-to-reduce-cost-and-improve-performance/

Amazon Athena is an interactive query service that makes it more efficient to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Athena recently released support for creating tables using the results of a SELECT query or CREATE TABLE AS SELECT (CTAS) statement. Analysts can use CTAS statements to create new tables from existing tables on a subset of data, or a subset of columns. They also have options to convert the data into columnar formats, such as Apache Parquet and Apache ORC, and partition it. Athena automatically adds the resultant table and partitions to the AWS Glue Data Catalog, making them immediately available for subsequent queries.

CTAS statements help reduce cost and improve performance by allowing users to run queries on smaller tables constructed from larger tables. This post covers three use cases that demonstrate the benefit of using CTAS to create a new dataset, smaller than the original one, allowing subsequent queries to run faster. Assuming our use case requires repeatedly querying the data, we can now query a smaller and more optimal dataset to get the results faster.

Using Amazon Athena CTAS

The familiar CREATE TABLE statement creates an empty table. In contrast, the CTAS statement creates a new table containing the result of a SELECT query. The new table’s metadata is automatically added to the AWS Glue Data Catalog. The data files are stored in Amazon S3 at the designated location. When creating new tables using CTAS, you can include a WITH statement to define table-specific parameters, such as file format, compression, and partition columns. For more information about the parameters you can use, see Creating a Table from Query Results (CTAS).

Before you begin: Set up CloudTrail for querying with Athena

If you don’t already use Athena to query your AWS CloudTrail data, we recommend you set this up. To do so:

  1. Open the CloudTrail console.
  2. On the left side of the console, choose Event History.
  3. At the top of the window, choose Run advanced queries in Amazon Athena.
  4. Follow the setup wizard and create your Athena table.

It takes some time for data to collect. If this is your first time, it takes about an hour to get meaningful data. This assumes that there is activity in your AWS account.

This post assumes that your CloudTrail table is named cloudtrail_logs, and that it resides in the default database.

Use Case 1: optimizing for repeated queries by reducing dataset size

As with other AWS services, Athena uses AWS CloudTrail to track its API calls. In this use case, we use CloudTrail to provide an insight into our Athena usage. CloudTrail automatically publishes data in JSON format to S3. We use a CTAS statement to create a table with only 30 days of Athena API events, to remove all of the other API events that we don’t care about.  This reduces the table size, which improves subsequent queries.

The following query uses the last 30 days of Athena events. It creates a new table called “athena_30_days” and saves the data files in Parquet format.

CREATE TABLE athena_30_days
AS
SELECT
  date(from_iso8601_timestamp(eventtime)) AS dt,
  *
FROM cloudtrail_logs
WHERE eventsource = 'athena.amazonaws.com'
AND 
  date(from_iso8601_timestamp(eventtime)) 
    BETWEEN current_date - interval '30' day AND current_date

Executing this query on the original CloudTrail data takes close to 5 minutes to run, and scans around 14 GB of data. This is because the raw data is in JSON format and not well partitioned.  Executing a SELECT * on the newly created table now takes 1.7 seconds and scans 1.14MB of data.

Now you can run multiple queries or build a dashboard on the reduced dataset.

For example, the following query aggregates the total count of each Athena API, grouping results by IAM user, date, and API event name.  This query took only 1.8 seconds to complete.

SELECT 
  dt, 
  eventname,
  count(eventname) as event_count,
  split_part(useridentity.arn, ':', 6) as user
FROM athena_30_days
GROUP BY 1,2,4
ORDER BY event_count DESC

Use case 2: Selecting a smaller number of columns

In this use case, I join the CloudTrail table with the S3 Inventory table while only selecting specific columns relevant to my analysis.  I use CTAS to generate a table from the results.

CREATE TABLE athena_s3_30_days
AS
SELECT 
  json_extract_scalar(ct.requestparameters, '$.bucketName') AS bucket,
  json_extract_scalar(ct.requestparameters, '$.key') AS key,
  ct.useridentity.username AS username,
  ct.eventname,
  cast (from_iso8601_timestamp(ct.eventtime) as timestamp) as ts,
  s3.storage_class,
  s3.size
FROM cloudtrail_logs ct
JOIN s3inventory s3 
ON json_extract_scalar(ct.requestparameters, '$.bucketName') = s3.bucket
AND json_extract_scalar(ct.requestparameters, '$.key') = s3.key
AND date(from_iso8601_timestamp(ct.eventtime)) = date(s3.last_modified_date)
WHERE ct.eventsource = 's3.amazonaws.com' 
AND ct.eventname = 'GetObject'
AND ct.useridentity.invokedby LIKE '%athena%'
AND date(from_iso8601_timestamp(eventtime)) 
    BETWEEN current_date - interval '30' day AND current_date

The previous query example returns the last 30 days of S3 GetObject API events that were invoked by the Athena service.  It adds the S3 object size and storage class for each event returned from the S3 Inventory table.

We can then, for example, count the number of times each key has been accessed by Athena, ordering the results based on the count from small to large.  This provides us an indication of the size of files we’re scanning and how often. Knowing this helps us determine if we should optimize by performing compaction on those keys.

SELECT
  bucket,
  size,
  key,
  count(key) AS key_count
FROM athena_s3_30_days
GROUP BY 1,2,3
ORDER BY key_count DESC

In the case of my example, it looks like this:

Use case 3: Repartitioning an existing table

The third use case I want to highlight where CTAS can be of value is taking an existing unoptimized dataset, converting it to Apache ORC and partitioning it to better optimize for repeated queries.  We’ll take the last 100 days of CloudTrail events and partition it by date.

CREATE TABLE cloudtrail_partitioned
WITH (
  partitioned_by = ARRAY['year', 'month'],
  format = 'orc',
  external_location = 's3://royon-demo/cloudtrail_partitioned'
)
AS
SELECT
  *, 
  year(date(from_iso8601_timestamp(eventtime))) as year,
  month(date(from_iso8601_timestamp(eventtime))) as month
FROM cloudtrail_logs

Notice that I’ve added a WITH clause following the CREATE TABLE keywords but before the AS keyword.  Within the WITH clause, we can define the table properties that we want.  In this particular case, we declared “year” and “month” as our partitioning columns and defined ORC as the output format.  The reason I used ORC is because CloudTrail data may contain empty columns that are not allowed by the Parquet specification, but are allowed by ORC.  Additionally, I defined the external S3 location to store our table.  If we don’t define an external location, Athena uses the default query result S3 location.

The resulting S3 destination bucket looks similar to the following example:

An additional optimization supported by Athena CTAS is bucketing.  Partitioning is used to group similar types of data based on a specific column.  Bucketing is commonly used to combine data within a partition into a number of equal groups, or files.  Therefore, partitioning is best suited for low cardinality columns and bucketing is best suited for high cardinality columns.  For more information, see Bucketing vs Partitioning.

Let’s take the previous CTAS example and add bucketing.

CREATE TABLE cloudtrail_partitioned_bucketed
WITH (
  partitioned_by = ARRAY['year', 'month'],
  bucketed_by = ARRAY['eventname'],
  bucket_count = 3,
  format = 'orc',
  external_location = 's3://royon-demo/cloudtrail_partitioned'
)
AS
SELECT
  *, 
  year(date(from_iso8601_timestamp(eventtime))) as year,
  month(date(from_iso8601_timestamp(eventtime))) as month
FROM cloudtrail_logs

 

And this is what it looks like in S3:

Here is an example query on both a partitioned table and a partitioned and bucketed table.  You can see that the speed is similar, but that the bucketed query scans less data.

Partitioned table:

Partitioned and bucketed table:

Conclusion

In this post, we introduced CREATE TABLE AS SELECT (CTAS) in Amazon Athena. CTAS lets you create a new table from the result of a SELECT query. The new table can be stored in Parquet, ORC, Avro, JSON, and TEXTFILE formats. Additionally, the new table can be partitioned and bucketed for improved performance. We looked at how CTAS helps with three common use cases:

  1. Reducing a large dataset into a smaller, more efficient dataset.
  2. Selecting a subset of the columns and rows to only deliver what the consumer of the data really needs.
  3. Partitioning and bucketing a dataset that is not currently optimized to improve performance and reduce the cost.

Additional Reading

If you found this post useful, be sure to check out How Realtor.com Monitors Amazon Athena Usage with AWS CloudTrail and Amazon QuickSight.

 


About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.

 

 

 

Connect to Amazon Athena with federated identities using temporary credentials

Post Syndicated from Nitin Wagh original https://aws.amazon.com/blogs/big-data/connect-to-amazon-athena-with-federated-identities-using-temporary-credentials/

Many organizations have standardized on centralized user management, most commonly Microsoft Active Directory or LDAP.  Access to AWS resources is no exception.  Amazon Athena is a serverless query engine for data on Amazon S3 that is popular for quick and cost-effective queries of data in a data lake.  To allow users or applications to access Athena, organizations are required to use an AWS access key and an access secret key from which appropriate policies are enforced. To maintain a consistent authorization model across, organizations must enable authentication and authorization for Athena by using federated users.

This blog post shows the process of enabling federated user access with the AWS Security Token Service (AWS STS). This approach lets you create temporary security credentials and provides them to trusted users for running queries in Athena.

Temporary security credentials in AWS STS

Temporary security credentials ensures that access keys to protected AWS resources are properly rotated. Therefore, potential security leaks can be caught and remedied. AWS STS generates these per-use temporary access keys.

Temporary security credentials work similar to the long-term access key credentials that your IAM users can use. However, temporary security credentials have the following differences:

  • They are intended for short-term use only. You can configure these credentials to last from a few minutes to several hours, with a maximum of 12 hours. After they expire, AWS no longer recognizes them, or allows any kind of access from API requests made with them.
  • They are not stored with the user. They are generated dynamically and provided to the user when requested. When or before they expire, the user can request new credentials, if they still have permissions to do so.

Common scenarios for federated access

The following common scenarios describe when your organization may require federated access to Athena:

  1. Running queries in Athena while using federation. Your group is required to run queries in Athena while federating into AWS using SAML with permissions stored in Active Directory.
  2. Enabling access across accounts to Athena for users in your organization. Users in your organization with access to one AWS account needs to run Athena queries in a different account.
  3. Enabling access to Athena for a data application. A data application deployed on an Amazon EC2 instance needs to run Athena queries via JDBC.

Athena as the query engine for your data lake on S3

Athena is an interactive query service that lets you analyze data directly in Amazon S3 by using standard SQL. You can access Athena by using JDBC and ODBC drivers, AWS SDK, or the Athena console.

Athena enables schema-on-read analytics to gain insights from structured or semi-structured datasets found in the data lake. A data lake is ubiquitous, scalable, and reliable storage that lets you consume all of your structured and unstructured data. Customers increasingly prefer a serverless approach to querying data in their data lake.  Some benefits of using an Amazon S3 for a data lake include:

  • The ability to efficiently consume and store any data, at any scale, at low cost.
  • A single destination for searching and finding the relevant data.
  • Analysis of the data in S3 through a unified set of tools.

Solution overview

The following sections describe how to enable the common scenarios introduced previously in this post. They show how to download, install, and configure SQL Workbench to run queries in Athena. Next, they show how to use AWS STS with a custom JDBC credentials provider to obtain temporary credentials for an authorized user.  These credentials are passed to Athena’s JDBC driver, which enables SQL Workbench to run authorized queries.

As a reminder, the scenarios are:

  1. Running queries in Athena while using federation.
  2. Enabling access across accounts to Athena for users in your organization.
  3. Enabling access to Athena for a data application.

Walkthrough

This walkthrough uses a sample table created to query Amazon CloudFront logs.  It demonstrates that proper access has been granted to Athena after each scenario. This walkthrough also assumes that you have a table for testing.  For information about creating a table, see Getting Started with Amazon Athena.

Prerequisites for scenarios 1 and 2

  1. Download and install SQL Workbench.
  2. Download the Athena custom credentials provider .jar file to the same computer where SQL Workbench is installed.

Note: You can also compile the .jar file by using this Athena JDBC source code.

  1. Download the Amazon Athena JDBC driver to the same computer where SQL Workbench is installed.
  2. In SQL Workbench, open File, Connect window, Manage Drivers. Choose the Athena driver and add two libraries, Athena JDBC driver and the custom credentials provider by specifying the location where you downloaded them.

Scenario 1: SAML Federation with Active Directory

In this scenario, we use a SAML command line tool. It performs a SAML handshake with an identity provider, and then retrieves temporary security credentials from AWS STS. We then pass the obtained security credentials through a custom credentials provider to run queries in Athena.

Before You Begin

  1. Make sure you have followed the prerequisites for scenarios 1 and 2.
  2. Set up a SAML integration with ADFS. For information, see Enabling Federation to AWS Using Windows Active Directory, ADFS, and SAML 2.0.
  3. Add the following IAM policies to the ADFS-Production role:

  1. Set up federated AWS CLI For more information, see How to Implement a General Solution for Federated API/CLI Access Using SAML 2.0

Executing queries in Athena with a SAML federated user identity

  1. Run the federated AWS CLI script configured as part of the prerequisites. Log in using a valid user name and password, then choose the ADFS-Production role. As a result, your temporary access_key, secret_access_key and session_token are generated. They are stored in a “credentials” file under the [saml] profile that looks similar to the following:
 [saml]
output = json
region = us-east-1
aws_access_key_id = XXXXXXXXXXXXXXXXX
aws_secret_access_key = XXXXXXXXXXXXXXXXXXXXXX
aws_session_token = XXXXXXXXXXXXXXXXX
  1. To enable running queries in Athena through SQL Workbench, configure an Athena connection as follows:

  1. Choose Extended Properties, and enter the properties as follows:
"AWSCredentialsProviderClass"="com.amazonaws.athena.jdbc.CustomIAMRoleAssumptionSAMLCredentialsProvider"
"AWSCredentialsProviderArguments"="<access_key_id>, <secret_access_key>, <session token>"
"S3OutputLocation"="s3://<bucket where query results are stored>"
"LogPath"= "<local path on laptop, or pc where logs are stored>"
"LogLevel"= "<Log Level from 0 to 6>"

  1. Choose Test to verify that you can successfully connect to Athena.
  2. Run a query in SQL Workbench to verify that credentials are correctly applied.

Scenario 2: Cross-account access

In this scenario, you allow users in one AWS account, referred to as Account A to run Athena queries in a different account, called Account B.

To enable this configuration, use the CustomIAMRoleAssumptionCredentialsProvider custom credentials provider to retrieve the necessary credentials. By doing so, you can run Athena queries by using credentials from Account A in Account B. This is made possible by the cross-account roles, as shown in the following diagram:

Before You Begin

1. If you haven’t already done so, follow these prerequisites for scenarios 1 and 2.

  1. Use AWS CLI to define a role named AthenaCrossAccountRole in Account B, as follows:
aws iam create-role --role-name AthenaCrossAccountRole --assume-role-policy-document file://cross-account-trust.json

Content of cross-account-trust.json file

{
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::<ACCOUNT-A-ID>:root"
      },
      "Action": "sts:AssumeRole"
      }
    }
  ]
}
  1. Attach the IAM policies, AmazonAthenaFullAccess and AmazonS3FullAccess, to the AthenaCrossAccountRole IAM role, as follows:
aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonAthenaFullAccess --role-name AthenaCrossAccountRole

aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess --role-name --role-name AthenaCrossAccountRole

Executing Queries in Athena with a SAML Federated User Identity

  1. Set up a new Athena database connection in SQLWorkbench, as shown in the following example:

  1. Choose Extended Properties, and enter the properties as shown in the following example. This example configures the AWS credentials of an IAM user from Account A (AccessKey, SecretKey, and Cross Account Role ARN).
"AwsCredentialsProviderClass"="com.amazonaws.custom.athena.jdbc.CustomIAMRoleAssumptionCredentialsProvider"
"AwsCredentialsProviderArguments"="<aws_key_id>,<secret_key_id>,<cross Account Role ARN>"
"S3OutputLocation"="s3://<bucket where Athena results are stored>"
"LogPath"= "<local directory path on laptop, or pc where logs are stored>"
“LogLevel” = “Log Level from 1 to 6”

  1. Choose Test to verify that you can successfully connect to Athena.
  2. Run a query in SQL Workbench to verify that credentials are correctly applied.

Scenario 3: Using EC2 Instance Profile Role

This scenario uses the Amazon EC2 instance profile role to retrieve temporary credentials.  First, create the EC2AthenaInstanceProfileRole IAM role via AWS CLI, as shown in the following example:

aws iam create-role --role-name EC2AthenaInstanceProfileRole --assume-role-policy-document file://policy.json

Content of policy.json file

{
  "Statement": [
      {
        "Action": "sts:AssumeRole",
        "Effect": "Allow",
        "Principal": {
           "Service": "ec2.amazonaws.com"
         }
      }
  ]
}

Attach the IAM policies, AmazonAthenaFullAccess and AmazonS3FullAccess, to the EC2AthenaInstanceProfileRole IAM role, as follows:

aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonAthenaFullAccess --role-name EC2AthenaInstanceProfileRole

aws iam attach-role-policy --policy-arn arn:aws:iam::aws:policy/AmazonS3FullAccess --role-name --role-name EC2AthenaInstanceProfileRole

Before You Begin

  1. Launch the Amazon EC2 instance for Windows, then attach the InstanceProfile role created in the previous step:
aws ec2 run-instances --image-id <use Amazon EC2 on Windows 2012 AMI Id for your region> --iam-instance-profile 'Arn=arn:aws:iam::<your_aws_account_id>:instance-profile/EC2AthenaInstanceProfileRole' --tag-specifications 'ResourceType=instance,Tags=[{Key=Name,Value=AthenaTest}]' --count 1 --instance-type t2.micro --key-name <your key> --security-group-ids <your_windows_security_group_id>

2.    Log in to your Windows instance using RDP.

3.    Install Java 8 and download and install SQL Workbench.

4.    Download the Athena JDBC driver.

Executing queries in Athena using an EC2 instance profile role

1.      In SQL Workbench, open File, Connect window, Manage Drivers. Specify a path to the Athena JDBC driver that was previously downloaded, as shown in the following example.

  1. The instance credentials provider, InstanceProfileCredentialsProvider, is included with the Amazon Athena JDBC driver. In SQL Workbench, set up an Athena connection as follows:

  1. Choose Extended Properties, and enter the properties as follows:
"AwsCredentialsProviderClass"="com.simba.athena.amazonaws.auth.InstanceProfileCredentialsProvider"
"S3OutputLocation"="s3://<bucket where Athena results are stored>"
"LogPath"= "<local directory path on laptop, or pc where logs are stored>"
“LogLevel” = “Log Level from 0 to 6”

  1. Choose Test to verify that you can successfully connect to Athena.
  2. Run a query in SQL Workbench to verify that credentials are correctly applied.

Conclusion

This post walked through three scenarios to enable trusted users to access Athena using temporary security credentials. First, we used SAML federation where user credentials were stored in Active Directory. Second, we used a custom credentials provider library to enable cross-account access. And third, we used an EC2 Instance Profile role to provide temporary credentials for users in our organization to access Athena.

These approaches ensure that access keys protecting AWS resources are not directly hardcoded in applications and can be easily revoked as needed. To achieve this, we used AWS STS to generate temporary per-use credentials.

The scenarios demonstrated in this post used SQL Workbench.  However, they apply for all other uses of the JDBC driver with Amazon Athena. Additionally, when using AWS SDK with Athena, similar approaches also apply.

Happy querying!

 


Additional Reading

If you found this post useful, be sure to check out Top 10 Performance Tuning Tips for Amazon Athena, and Analyze and visualize your VPC network traffic using Amazon Kinesis and Amazon Athena.

 


About the Author

Nitin Wagh is a Solutions Architect with Amazon Web Services specializing in Big Data Analytics. He helps AWS customers develop distributed big data solutions using AWS technologies.

 

 

 

 

Store, Protect, Optimize Your Healthcare Data with AWS: Part 2

Post Syndicated from Stephen Jepsen original https://aws.amazon.com/blogs/architecture/store-protect-optimize-your-healthcare-data-with-aws-part-2/

Leveraging Analytics and Machine Learning Tools for Readmissions Prediction

This blog post was co-authored by Ujjwal Ratan, a senior AI/ML solutions architect on the global life sciences team.

In Part 1, we looked at various options to ingest and store sensitive healthcare data using AWS. The post described our shared responsibility model and provided a reference architecture that healthcare organizations could use as a foundation to build a robust platform on AWS to store and protect their sensitive data, including protected health information (PHI). In Part 2, we will dive deeper into how customers can optimize their healthcare datasets for analytics and machine learning (ML) to address clinical and operational challenges.

There are a number of factors creating pressures for healthcare organizations, both providers and payers, to adopt analytic tools to better understand their data: regulatory requirements, changing reimbursement models from volume- to value-based care, population health management for risk-bearing organizations, and movement toward personalized medicine. As organizations deploy new solutions to address these areas, the availability of large and complex datasets from electronic health records, genomics, images (for example, CAT, PET, MRI, ultrasound, X-ray), and IoT has been increasing. With these data assets growing in size, healthcare organizations want to leverage analytic and ML tools to derive new actionable insights across their departments.

One example of the use of ML in healthcare is diagnostic image analysis, including digital pathology. Pathology is extremely important in diagnosing and treating patients, but it is also extremely time-consuming and largely a manual process. While the complexity and quantity of workloads are increasing, the number of pathologists is decreasing. According to one study, the number of active pathologists could drop by 30 percent by 2030 compared to 2010 levels. (1) A cloud architecture and solution can automate part of the workflow, including sample management, analysis, storing, sharing, and comparison with previous samples to complement existing provider workflows effectively. A recent study using deep learning to analyze metastatic breast cancer tissue samples resulted in an approximately 85% reduction in human error rate. (2)

ML is also being used to assist radiologists in examining other diagnostic images such as X-rays, MRIs, and CAT scans. Having large quantities of images and metadata to train the algorithms that are the key to ML is one of the main challenges for ML adoption. To help address this problem, the National Institutes of Health recently released 90,000 X-ray plates tagged either with one of 14 diseases or tagged as being normal. Leading academic medical centers are using these images to build their neural networks and train their algorithms. With advanced analytics and ML, we can answer the hard questions such as “what is the next best action for my patient, the expected outcome, and the cost.”

The foundations for a great analytical layer

Let’s pick up from where we left off in Part 1. We have seen how providers can ingest data into AWS from their data centers and store it securely into different services depending on the type of data. For example:

  1. All object data is stored in Amazon S3, Amazon S3 Infrequent Access, or Amazon Glacier depending on how often they are used.
  2. Data from the provider’s database is either processed and stored as objects in Amazon S3 or aggregated into data marts on Amazon Redshift.
  3. Metadata of the objects on Amazon S3 are maintained in the DynamoDB database.
  4. Amazon Athena is used to query the objects directly stored on Amazon S3 to address ad hoc requirements.

We will now look at two best practices that are key to building a robust analytical layer using these datasets.

  1. Separating storage and compute: You should not be compelled to scale compute resources just to store more data. The scaling rules of the two layers should be separate.
  2. Leverage the vast array of AWS big data services when it comes to building the analytical platforms instead of concentrating on just a few of them. Remember, one size does not fit all.

Technical overview

In this overview, we will demonstrate how we can leverage AWS big data and ML services to build a scalable analytical layer for our healthcare data. We will use a single source of data stored in Amazon S3 for performing ad hoc analysis using Amazon Athena, integrate it with a data warehouse on Amazon Redshift, build a visual dashboard for some metrics using Amazon QuickSight, and finally build a ML model to predict readmissions using Amazon SageMaker. By not moving the data around and just connecting to it using different services, we avoid building redundant copies of the same data. There are multiple advantages to this approach:

  1. We optimize our storage. Not having redundant copies reduces the amount of storage required.
  2. We keep the data secure with only authorized services having access to it. Keeping multiple copies of the data can result in higher security risk.
  3. We are able to scale the storage and compute separately as needed.
  4. It becomes easier to manage the data and monitor usage metrics centrally such as how often the data has been accessed, who has been accessing it, and what has been the growth pattern of the data over a period of time. These metrics can be difficult to aggregate if the data is duplicated multiple times.

Let’s build out this architecture using the following steps:

  1. Create a database in AWS Glue Data Catalog

We will do this using a Glue crawler. First create a JSON file that contains the parameters for the Glue crawler.

{
"Name": "readmissions",
"Role": "arn of the role for Glue",
"DatabaseName": "readmissions",
"Description": "glue data catalog for storing readmission data",
"Targets": {
"S3Targets": [
{
"Path": "s3://<bucket>/<prefix>"
},
{
"Path": "s3://<bucket>/<prefix>"
}
]
}
}

As you can see, the crawler will crawl two locations in Amazon S3 and save the resulting tables in a new database called “readmissions.” Replace the role ARN and Amazon S3 locations with your corresponding details. Save this in a file create_crawler.json. Then from the AWS CLI, call the following command to create the crawler:

aws glue create-crawler --cli-input-json file://create_crawler.json

Once the crawler is created, run it by calling the following command:

aws glue start-crawler --name readmissions

Log on to the AWS Glue console, navigate to the crawlers, and wait until the crawler completes running.

This will create two tables — phi and non-phi — in a database named “readmissions” in the AWS Glue Data Catalog as shown below.

  1. Query the data using Athena

The Amazon Glue Data Catalog is seamlessly integrated with Amazon Athena. For details on how to enable this, see Integration with AWS Glue.

As a result of this integration, the tables created using the Glue crawler can now be queried using Amazon Athena. Amazon Athena allows you to do ad hoc analysis on the dataset. You can do exploratory analysis on the data and also determine its structure and quality. This type of upfront ad hoc analysis is invaluable for ensuring the data quality in your downstream data warehouse or your ML algorithms that will make use of this data for training models. In the next few sections, we will explore these aspects in greater detail.

To query the data using Amazon Athena, navigate to the Amazon Athena console.

NOTE: Make sure the region is the same as the region you chose in the previous step. If it’s not the same, switch the region by using the drop-down menu on the top right-hand corner of the screen.

Once you arrive in the Amazon Athena console, you should already see the tables and databases you created previously, and you should be able to see the data in the two tables by writing Amazon Athena queries. Here is a list of the top 10 rows from the table readmissions.nonphi:

Now that we are able to query the dataset, we can run some queries for exploratory analysis. Here are just a few examples:

Analysis Amazon Athena Query
How many Patients have been discharged to home? SELECT count(*) from nonphi where discharge_disposition = ‘Discharged to home’
What’s the minimum and the maximum number of procedures carried out on a patient? SELECT min(num_procedures), max(num_procedures) from nonphi
How many patients were referred to this hospital by another physician? SELECT count(*) FROM nonphi group by admission_source having admission_source = ‘Physician Referral’
What were the top 5 specialties with positive readmissions?

SELECT count(readmission_result) as num_readmissions, medical_specialty from

(select readmission_result,medical_specialty from nonphi where readmission_result = ‘Yes’)

group by medical_specialty order by num_readmissions desc limit 5

Which payer was responsible for paying for treatments that involved more than 5 procedures? SELECT distinct payer_code from nonphi where num_procedures >5 and payer_code !='(null)’

While this information is valuable, you typically do not want to invest too much time and effort into building an ad hoc query platform like this because at this stage, you are not even sure if the data is of any value for your business-critical analytical applications. One benefit of using Amazon Athena for ad hoc analysis is that it requires little effort or time. It uses Schema-On-Read instead of schema on write, allowing you to work with various source data formats without worrying about the underlying structures. You can put the data on Amazon S3 and start querying immediately.

  1. Create an external table in Amazon Redshift Spectrum with the same data

Now that we are satisfied with the data quality and understand the structure of the data, we would like to integrate this with a data warehouse. We’ll use Amazon Redshift Spectrum to create external tables on the files in S3 and then integrate these external tables with a physical table in Amazon Redshift.

Amazon Redshift Spectrum allows you to run Amazon Redshift SQL queries against data on Amazon S3, extending the capabilities of your data warehouse beyond the physical Amazon Redshift clusters. You don’t need to do any elaborate ETL or move the data around. The data exists in one place in Amazon S3 and you interface with it using different services (Athena and Redshift Spectrum) to satisfy different requirements.

Before beginning, please look at this step by step guide to set up Redshift Spectrum.

After you have set up Amazon Redshift Spectrum, you can begin executing the steps below:

  1. Create an external schema called “readmissions.” Amazon Redshift Spectrum integrates with the Amazon Glue Data Catalog and allows you to create spectrum tables by referring the catalog. This feature allows you to build the external table on the same data that you analyzed with Amazon Athena in the previous step without the need for ETL. This can be achieved by the following:
create external schema readmissions
from data catalog
database 'readmissions'
iam_role 'arn for your redshift spectrum role '
region ‘region when the S3 data exists’;

NOTE: Make sure you select the appropriate role arn and region.

  1. Once the command executes successfully, you can confirm the schema was created by running the following:
select * from svv_external_schemas;

You should see a row similar to the one above with your corresponding region and role.

You can also see the external tables that were created by running the following command:

select * from SVV_EXTERNAL_TABLES;

  1. Let’s confirm we can see all the rows in the external table by counting the number of rows:
select count(*) from readmissions.phi;
select count(*) from readmissions.nonphi;

You should see 101,766 rows in both the tables, confirming that your external tables contain all the records that you read using the AWS Glue data crawler and analyzed using Athena.

  1. Now that we have all the external tables created, let’s create an aggregate fact table in the physical Redshift data warehouse. We can use the “As Select” clause of the Redshift create table query to do this:
create table readmissions_aggregate_fact as
select
readmission_result,admission_type,discharge_disposition,diabetesmed,
avg(time_in_hospital) as avg_time_in_hospital,
min(num_procedures) as min_procedures,
max(num_procedures) as max_procedures,
avg(num_procedures) as avg_num_procedures,
avg(num_medications) as avg_num_medications,
avg(number_outpatient) as avg_number_outpatient,
avg(number_emergency) as avg_number_emergency,
avg(number_inpatient) as avg_number_inpatient,
avg(number_diagnoses) as avg_number_diagnoses
from readmissions.nonphi
group by readmission_result,admission_type,discharge_disposition,diabetesmed

Once this query executes successfully, you can see a new table created in the physical public schema of your Amazon Redshift cluster. You can confirm this by executing the following query:

select distinct(tablename) from pg_table_def where schemaname = 'public'

  1. Build a QuickSight Dashboard from the aggregate fact

We can now create dashboards to visualize the data in our readmissions aggregate fact table using Amazon QuickSight. Here are some examples of reports you can generate using Amazon QuickSight on the readmission data.

For more details on Amazon QuickSight, refer to the service documentation.

  1. Build a ML model in Amazon SageMaker to predict readmissions

As a final step, we will create a ML model to predict the attribute readmission_result, which denotes if a patient was readmitted or not, using the non-PHI dataset.

  1. Create a notebook instance in Amazon SageMaker that is used to develop our code.
  2. The code reads non-PHI data from the Amazon S3 bucket as a data frame in Python. This is achieved using the pandas.readcsv function.

  1. Use the pandas.get_dummies function to encode categorical values into numeric values for use with the model.

  1. Split the data into two, 80% for training and 20% for testing, using the numpy.random.rand function.

  1. Form train_X, train_y and test_X, test_y corresponding to training features, training labels, testing features, and testing labels respectively.

  1. Use the Amazon SageMaker Linear learner algorithm to train our model. The implementation of the algorithm uses dense tensor format to optimize the training job. Use the function write_numpy_to_dense_tensor from the Amazon SageMaker library to convert the numpy array into the dense tensor format.

  1. Create the training job in Amazon SageMaker with appropriate configurations and run it.

  1. Once the training job completes, create an endpoint in Amazon SageMaker to host our model, using the linear.deploy function to deploy the endpoint.

  1. Finally, run a prediction by invoking the endpoint using the linear_predictor.predict function.

You can view the complete notebook here.

Data, analytics, and ML are strategic assets to help you manage your patients, staff, equipment, and supplies more efficiently. These technologies can also help you be more proactive in treating and preventing disease. Industry luminaries share this opinion: “By leveraging big data and scientific advancements while maintaining the important doctor-patient bond, we believe we can create a health system that will go beyond curing disease after the fact to preventing disease before it strikes by focusing on health and wellness,” writes Lloyd B. Minor, MD, dean of the Stanford School of Medicine.

ML and analytics offer huge value in helping achieve the quadruple aim : improved patient satisfaction, improved population health, improved provider satisfaction, and reduced costs. Technology should never replace the clinician but instead become an extension of the clinician and allow them to be more efficient by removing some of the mundane, repetitive tasks involved in prevention, diagnostics, and treatment of patients.

(1) “The Digital Future of Pathology.” The Medical Futurist, 28 May 2018, medicalfuturist.com/digital-future-pathology.

(2) Wang, Dayong, et al. “Deep Learning for Identifying Metastatic Breast Cancer.” Deep Learning for Identifying Metastatic Breast Cancer, 18 June 2016, arxiv.org/abs/1606.05718.

About the Author

Stephen Jepsen is a Global HCLS Practice Manager in AWS Professional Services.

 

Analyze and visualize your VPC network traffic using Amazon Kinesis and Amazon Athena

Post Syndicated from Allan MacInnis original https://aws.amazon.com/blogs/big-data/analyze-and-visualize-your-vpc-network-traffic-using-amazon-kinesis-and-amazon-athena/

Network log analysis is a common practice in many organizations.  By capturing and analyzing network logs, you can learn how devices on your network are communicating with each other, and the internet.  There are many reasons for performing log analysis, such as audit and compliance, system troubleshooting, or security forensics.  Within an Amazon Virtual Private Cloud (VPC), you can capture network flows with VPC Flow Logs.  You can create a flow log for a VPC, a subnet, or a network interface.  If you create a flow log for a subnet or VPC, each network interface in the VPC or subnet is monitored. Flow log data is published to a log group in Amazon CloudWatch Logs, and each network interface has a unique log stream.

CloudWatch Logs provides some great tools to get insights into this log data.  However, in most cases, you want to efficiently archive the log data to S3 and query it using SQL.  This provides more flexibility and control over log retention and the analysis you want to perform.  But also, you often want the ability to obtain near real-time insights into that log data by performing analysis automatically, soon after the log data has been generated.  And, you want to visualize certain network characteristics on a dashboard so you can more clearly understand the network traffic within your VPC.  So how can you accomplish both efficient log archival to S3, real-time network analysis, and data visualization?  This can be accomplished by combining several capabilities of CloudWatch, Amazon Kinesis, AWS Glue, and Amazon Athena, but setting up this solution and configuring all the services can be daunting.

In this blog post, we describe the complete solution for collecting, analyzing, and visualizing VPC flow log data.  In addition, we created a single AWS CloudFormation template that lets you efficiently deploy this solution into your own account.

Solution overview

This section describes the overall architecture and each step of this solution.

We want the ability to query the flow log data in a one-time, or ad hoc, fashion. We also want to analyze it in near real time. So our flow log data takes two paths through the solution.  For ad hoc queries, we use Amazon Athena.  By using Athena, you can use standard SQL to query data that has been written to S3.  An Athena best practice to improve query performance and reduce cost is to store data in a columnar format such as Apache Parquet.  This solution uses Kinesis Data Firehose’s record format conversion feature to convert the flow log data to Parquet before it writes the files to S3. Converting the data into a compressed, columnar format lowers your cost and improves query performance by enabling Athena to scan less data from S3 when executing your queries.

By streaming the data to Kinesis Data Firehose from CloudWatch logs, we have enabled our second path for near real-time analysis on the flow log data.  Kinesis Data Analytics is used to analyze the log data as soon as it is delivered to Kinesis Data Firehose.  The Analytics application aggregates key data from the flow logs and creates custom CloudWatch metrics that are used to drive a near real-time CloudWatch dashboard.

Let’s review each step in detail.

1.  VPC Flow Logs

The VPC Flow Logs feature contains the network flows in a VPC.  In this solution, it is assumed that you want to capture all network traffic within a single VPC.  By using the CloudFormation template, and you can define the VPC you want to capture.  Each line in the flow log contains space-delimited information about the packets traversing the network between two entities, which are source and destination.  The log line contains details including the source and destination IP addresses and ports, the number of packets, and the action taken on that data. Examples of the action taken would be whether it was accepted or rejected.  Here’s an example of a typical flow log:

2 123456789010 eni-abc123de 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK

For more information about each item in the line, see Flow Log Records.  Note that VPC flow logs buffer for about 10 minutes before they’re delivered to CloudWatch Logs.

2.  Stream to Kinesis Data Firehose

By creating a CloudWatch Logs subscription, our flow logs can automatically be streamed when they arrive in CloudWatch Logs.  This solution’s subscription filter uses Kinesis Data Firehose as its destination.  Kinesis Data Firehose is the most effective way to load streaming data into data stores, such as Amazon S3.  The CloudWatch Logs subscription filter has also been configured to parse the space-delimited log lines and create a structured JSON object for each line in the log.  The naming convention for each attribute in the object follow the names defined by for each element by VPC Flow Logs.  Therefore, the example log line referenced earlier streams as the following JSON record:

{
    "version": 2,
    "account-id": "123456789010",
    "interface-id": "eni-abc123de",
    "srcaddr": "172.31.16.139",
    "dstaddr": "172.31.16.21",
    "srcport": 20641,
    "dstport": 22,
    "protocol": 6,
    "packets": 20,
    "bytes": 4249,
    "start": 1418530010,
    "end": 1418530070,
    "action": "ACCEPT",
    "log-status": "OK"
}

CloudWatch Logs subscriptions sends data to the configured destination as a gzipped collection of records.  Before we can analyze the data, we must first decompress it.

3.  Decompress records with AWS Lambda

There may be situations where you want to transform or enrich streaming data before writing it to its final destination.  In this solution, we must decompress the data that is streamed from CloudWatch Logs.  With the Amazon Kinesis Data Firehose Data Transformation feature, we can decompress the data with an AWS Lambda function.  Kinesis Data Firehose manages the invocation of the function.  Inside the function, the data is decompressed and returned to Kinesis Data Firehose.  The complete source code for the Lambda function can be found here.

4.  Convert data to Apache Parquet

To take advantage of the performance capabilities in Amazon Athena, we convert the streaming data to Apache Parquet before persisting it to S3.  We use the record format conversion capabilities of Kinesis Data Firehose to perform this conversion.  When converting from JSON to Parquet, Kinesis Data Firehose must know the schema.  To accomplish this, we configure a table in the Glue Data Catalog.  In this table, we map the attributes of our incoming JSON records to fields in the table.

5.  Persist data to Amazon S3

When using the data format conversion feature in Kinesis Data Firehose, the only supported destination is S3.  Kinesis Data Firehose buffers data for a period of time, or until a data size threshold is met, before it creates the Parquet files in S3.  In general, converting to Parquet results in effective file compression.  If the file size is too small, it isn’t optimal for Athena queries.  To maximize the file sizes created in S3, the solution has been configured to buffer for 15 minutes, or 128 MB.  However, you can adjust this configuration to meet your needs by using the Kinesis Data Firehose console.

6.  Query flow logs with SQL in Athena

In this solution, Athena uses the database and table created in the Glue Data Catalog to make your flow log data queryable.  There are sample queries to review later in this article.

7.  Analyze the network flows in near real-time with Kinesis Data Analytics

Following the data through the first six steps, the solution enables you to query flow log data using SQL in Athena.  This is great for ad hoc queries, or querying data that was generated over a long period of time.  However, to get the most out of the data, you should analyze it as soon as possible after it is generated.  To accomplish this, the solution uses Kinesis Data Analytics (KDA) to analyze the flow logs and extract some immediate insights.  Kinesis Data Analytics (KDA) enables you to query streaming data using SQL so you can get immediate insights into your data.  In this solution, the KDA application uses a Lambda function to decompress the gzipped records from Kinesis Data Firehose, and then analyzes the flow log data to create some aggregations of interest.  The KDA application creates the following aggregations:

  • A count of rejected TCP packets, every 15 minutes.
  • A count of rejected TCP packets by protocol, every 15 minutes.

These metrics are aggregated over a 15-minute window.  At the end of the window, KDA invokes a Lambda function, passing the aggregated values as input to the function.

8.  Write the aggregations as custom CloudWatch metrics

At the end of the 15-minute window, KDA invokes a Lambda function, passing the aggregated values.  The Lambda function writes these values to CloudWatch as custom metrics. This enables the solution to support alarms on those metrics using CloudWatch alarms, and it enables custom dashboards to be created from the metrics.

9.  View the aggregated data in CloudWatch dashboards

CloudWatch dashboards are customizable home pages in the CloudWatch console for monitoring your resources in a single view.  You can use CloudWatch dashboards to create customized views of the metrics and alarms for your AWS resources. In this solution, we create a dashboard that monitors the custom aggregations created in our KDA application. The solution creates a sample dashboard to get you started, but you should review the metrics and create a dashboard and alarms to meet your needs.

Deploying the solution

To deploy this solution into your own account, you use the CloudFormation template to create the stack. You can deploy the solution stack into the following AWS Regions: US East (N. Virginia), US West (Oregon), and EU (Ireland).  To deploy, choose the link for the Region where you want to deploy.  The CloudFormation console for that Region opens, and the template URL is pre-populated:

Deploy the solution in:

US East (N. Virginia)

The Create Stack wizard for CloudFormation will be opened.  The template location is pre-populated.  Click Next, and you will prompted to provide values for several template parameters.

Let’s review what each parameter represents:

  • Stack name — The name for this CloudFormation stack.  You can rename it from the default, but choose a short (up to 16 characters) name, and ensure your name uses only lower-case letters.  The value you use here will be used as a prefix in the name of many of the resources created by this stack.  By providing a short name with lower-case letters, the names for those resources will pass the resource naming rules.
  • S3BucketName — The name of the S3 bucket into which the Parquet files are delivered. This name must be globally unique.
  • VPCId — The ID of the existing VPC for which flow logs are captured.

Choose Next, and accept any defaults for the remainder of the CloudFormation wizard. The stack is created in a few minutes.

Analyze the flow log data

After the stack has been deployed, it may take up to 15 minutes before data can be queried in Athena, or viewed in the CloudWatch dashboard.  Let’s look at a few sample queries you can run in Athena to learn more about the network traffic within your VPC.

Navigate to the Athena console in the Region where you deployed the stack.  In the console, choose the database named “vpc_flow_logs”.  Notice that this database contains one table, named “flow_logs.”  Run the following query to see which protocol is being rejected the most within your VPC:

select protocol, sum(packets) as rejected_packets
from flow_logs
where action = 'REJECT'
group by protocol
order by rejected_packets desc

Your results should look similar to the following example

This example shows that the value for the protocol box follows the standard defined by the Internet Assigned Numbers Authority (IANA).  So in the previous example, the top two rejected protocols are TCP and ICMP.

Here are a few additional queries to help you understand the network traffic in your VPC:

Identify the top 10 IP addresses from which packets were rejected in the past 2 weeks:

SELECT
	srcaddr,
	SUM(packets) AS rejected_packets
FROM flow_logs
WHERE start >= current_timestamp - interval '14' day
GROUP BY srcaddr
ORDER BY rejected_packets DESC
LIMIT 10;

Identify the top 10 servers that are receiving the highest number of HTTPS requests:

SELECT
	dstaddr,
	SUM(packets) AS packet_count
FROM flow_logs
WHERE dstport = '443'
GROUP BY dstaddr
ORDER BY packet_count DESC
LIMIT 10;

Now let’s look at the analysis we’re performing in real time with Kinesis Data Analytics.  By default, the solution creates a dashboard named “VPC-Flow-Log-Analysis.”  On this dashboard, we’ve created a few default widgets.  The aggregate data being generated by KDA is plotted in a few charts, as shown in the following example:

This example shows that the Rejected Packets per Protocol chart has been created to plot only a subset of all possible protocols.  Modify this widget to show the protocols that are relevant for your environment.

Next steps

The solution outlined in this blog post provides an efficient way to get started with analyzing VPC Flow Logs.  To get the most out of this solution, consider these next steps:

  • Create partitions in the Glue table to help optimize Athena query performance. The current solution creates data in S3 partitioned by Y/M/D/H, however these S3 prefixes are not automatically mapped to Glue partitions.  This means that Athena queries scan all Parquet files.  As the volume of data grows, the Athena query performance degrades.  For more information about partitioning and Athena tuning, see Top 10 Performance Tuning Tips for Amazon Athena.
  • Apply the solution to additional VPCs, or in different regions. If your account contains multiple VPCs, or if your infrastructure is deployed in multiple Regions, you must create the stack in those Regions.  If you have multiple VPCs within the same Region, you can create a new flow log for each additional VPC by using the VPC console.  Configure the flow log to deliver to the same Destination Log group that you created with the stack was initially created (CWLogGroupName parameter value in the CloudFormation template).
  • Modify the default widgets in the CloudWatch dashboard. The stack created a couple of default CloudWatch dashboards; however, you can create more to meet your needs, based on the insights you’d like to get from the flow logs in your environment.
  • Create additional queries in Athena to learn more about your network behavior.

Conclusion

Using the solution provided in this blog post, you can quickly analyze the network traffic in your VPC.  It provides both a near real-time solution, and also the capabilities to query historical data.  You can get the most out of this solution by extending it with queries and visualizations of your own to meet the needs of your system.

 


Additional Reading

If you found this post useful, be sure to check out Analyze Apache Parquet optimized data using Amazon Kinesis Data Firehose, Amazon Athena, and Amazon Redshift and Preprocessing Data in Amazon Kinesis Analytics with AWS Lambda.


About the Author

Allan MacInnis is a Solutions Architect at Amazon Web Services. He works with our customers to help them build streaming data solutions using Amazon Kinesis. In his spare time, he enjoys mountain biking and spending time with his family.

 

 

 

How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes – Part 2

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/how-to-build-a-front-line-concussion-monitoring-system-using-aws-iot-and-serverless-data-lakes-part-2/

In part 1 of this series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Amazon Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and AWS Lambda. In part 2, we discuss how to process and visualize the data by creating a serverless data lake that uses key analytics to create actionable data.

Create a serverless data lake and explore data using AWS Glue, Amazon Athena, and Amazon QuickSight

As we discussed in part 1, you can store heart rate data in an Amazon S3 bucket using Kinesis Data Streams. However, storing data in a repository is not enough. You also need to be able to catalog and store the associated metadata related to your repository so that you can extract the meaningful pieces for analytics.

For a serverless data lake, you can use AWS Glue, which is a fully managed data catalog and ETL (extract, transform, and load) service. AWS Glue simplifies and automates the difficult and time-consuming tasks of data discovery, conversion, and job scheduling. As you get your AWS Glue Data Catalog data partitioned and compressed for optimal performance, you can use Amazon Athena for the direct query to S3 data. You can then visualize the data using Amazon QuickSight.

The following diagram depicts the data lake that is created in this demonstration:

Amazon S3 now has the raw data stored from the Kinesis process. The first task is to prepare the Data Catalog and identify what data attributes are available to query and analyze. To do this task, you need to create a database in AWS Glue that will hold the table created by the AWS Glue crawler.

An AWS Glue crawler scans through the raw data available in an S3 bucket and creates a data table with a Data Catalog. You can add a scheduler to the crawler to run periodically and scan new data as required. For specific steps to create a database and crawler in AWS Glue, see the blog post Build a Data Lake Foundation with AWS Glue and Amazon S3.

The following figure shows the summary screen for a crawler configuration in AWS Glue:

After configuring the crawler, choose Finish, and then choose Crawler in the navigation bar. Select the crawler that you created, and choose Run crawler.

The crawler process can take 20–60 seconds to initiate. It depends on the Data Catalog, and it creates a table in your database as defined during the crawler configuration.

You can choose the table name and explore the Data Catalog and table:

In the demonstration table details, our data has three attribute time stamps as value_time, the person’s ID as id, and the heart rate as colvalue. These attributes are identified and listed by the AWS Glue crawler. You can see other information such as the data format (text) and the record count (approx. 15,000 with each record size of 61 bytes).

You can use Athena to query the raw data. To access Athena directly from the AWS Glue console, choose the table, and then choose View data on the Actions menu, as shown following:

As noted, the data is currently in a JSON format and we haven’t partitioned it. This means that Athena continues to scan more data, which increases the query cost. The best practice is to always partition data and to convert the data into a columnar format like Apache Parquet or Apache ORC. This reduces the amount of data scans while running a query. Having fewer data scans means better query performance at a lower cost.

To accomplish this, AWS Glue generates an ETL script for you. You can schedule it to run periodically for your data processing, which removes the necessity for complex code writing. AWS Glue is a managed service that runs on top of a warm Apache Spark cluster that is managed by AWS. You can run your own script in AWS Glue or modify a script provided by AWS Glue that meets your requirements. For examples of how to build a custom script for your solution, see Providing Your Own Custom Scripts in the AWS Glue Developer Guide.

For detailed steps to create a job, see the blog post Build a Data Lake Foundation with AWS Glue and Amazon S3. The following figure shows the final AWS Glue job configuration summary for this demonstration:

In this example configuration, we enabled the job bookmark, which helps AWS Glue maintain state information and prevents the reprocessing of old data. You only want to process new data when rerunning on a scheduled interval.

When you choose Finish, AWS Glue generates a Python script. This script processes your data and stores it in a columnar format in the destination S3 bucket specified in the job configuration.

If you choose Run Job, it takes time to complete depending on the amount of data and data processing units (DPUs) configured. By default, a job is configured with 10 DPUs, which can be increased. A single DPU provides processing capacity that consists of 4 vCPUs of compute and 16 GB of memory.

After the job is complete, inspect your destination S3 bucket, and you will find that your data is now in columnar Parquet format.

Partitioning has emerged as an important technique for organizing datasets so that they can be queried efficiently by a variety of big data systems. Data is organized in a hierarchical directory structure based on the distinct values of one or more columns. For information about efficiently processing partitioned datasets using AWS Glue, see the blog post Work with partitioned data in AWS Glue.

You can create triggers for your job that run the job periodically to process new data as it is transmitted to your S3 bucket. For detailed steps on how to configure a job trigger, see Triggering Jobs in AWS Glue.

The next step is to create a crawler for the Parquet data so that a table can be created. The following image shows the configuration for our Parquet crawler:

Choose Finish, and execute the crawler.

Explore your database, and you will notice that one more table was created in the Parquet format.

You can use this new table for direct queries to reduce costs and to increase the query performance of this demonstration.

Because AWS Glue is integrated with Athena, you will find in the Athena console an AWS Glue catalog already available with the table catalog. Fetch 10 rows from Athena in a new Parquet table like you did for the JSON data table in the previous steps.

As the following image shows, we fetched the first 10 rows of heartbeat data from a Parquet format table. This same Athena query scanned only 4.99 KB of data compared to 205 KB of data that was scanned in a raw format. Also, there was a significant improvement in query performance in terms of run time.

Visualize data in Amazon QuickSight

Amazon QuickSight is a data visualization service that you can use to analyze data that has been combined. For more detailed instructions, see the Amazon QuickSight User Guide.

The first step in Amazon QuickSight is to create a new Amazon Athena data source. Choose the heartbeat database created in AWS Glue, and then choose the table that was created by the AWS Glue crawler.

Choose Import to SPICE for quicker analytics. This option creates a data cache and improves graph loading. All non-database datasets must use SPICE. To learn more about SPICE, see Managing SPICE Capacity.

Choose Visualize, and wait for SPICE to import the data to the cache. You can also schedule a periodic refresh so that new data is loaded to SPICE as the data is pipelined to the S3 bucket.

When the SPICE import is complete, you can create a visual dashboard easily. The following figure shows graphs displaying the occurrence of heart rate records per device.  The first graph is a horizontally stacked bar chart, which shows the percentage of heart rate occurrence per device. In the second graph, you can visualize the heart rate count group to the heart rate device.

Conclusion

Processing streaming data at scale is relevant in every industry. Whether you process data from wearables to tackle human health issues or address predictive maintenance in manufacturing centers, AWS can help you simplify your data ingestion and analysis while keeping your overall IT expenditure manageable.

In this two-part series, you learned how to ingest streaming data from a heart rate sensor and visualize it in such a way to create actionable insights. The current state of the art available in the big data and machine learning space makes it possible to ingest terabytes and petabytes of data and extract useful and actionable information from that process.


Additional Reading

If you found this post useful, be sure to check out Work with partitioned data in AWS Glue, and 10 visualizations to try in Amazon QuickSight with sample data.

 


About the Authors

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them architectural guidance for building scalable architecture in hybrid and AWS environments.

 

 

 

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. During his free time, he likes to cook and travel.

 

 

 

John Cupit is a partner solutions architect for AWS’ Global Telecom Alliance Team. His passion is leveraging the cloud to transform the carrier industry. He has a son and daughter who have both graduated from college. His daughter is gainfully employed, while his son is in his first year of law school at Tulane University. As such, he has no spare money and no spare time to work a second job.

 

 

David Cowden is partner solutions architect and IoT specialist working with AWS emerging partners. He works with customers to provide them architectural guidance for building scalable architecture in IoT space.

 

 

 

Josh Ragsdale is an enterprise solutions architect at AWS. His focus is on adapting to a cloud operating model at very large scale. He enjoys cycling and spending time with his family outdoors.

 

 

 

Pierre-Yves Aquilanti, Ph.D., is a senior specialized HPC solutions architect at AWS. He spent several years in the oil & gas industry to optimize R&D applications for large scale HPC systems and enable the potential of machine learning for the upstream. He and his family crave to live in Singapore again for the human, cultural experience and eat fresh durians.

 

 

Manuel Puron is an enterprise solutions architect at AWS. He has been working in cloud security and IT service management for over 10 years. He is focused on the telecommunications industry. He enjoys video games and traveling to new destinations to discover new cultures.

 

How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes – Part 1

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/how-to-build-a-front-line-concussion-monitoring-system-using-aws-iot-and-serverless-data-lakes-part-1/

Sports-related minor traumatic brain injuries (mTBI) continue to incite concern among different groups in the medical, sports, and parenting community. At the recreational level, approximately 1.6–3.8 million related mTBI incidents occur in the United States every year, and in most cases, are not treated at the hospital. (See “The epidemiology and impact of traumatic brain injury: a brief overview” in Additional resources.) The estimated medical and indirect costs of minor traumatic brain injury are reaching $60 billion annually.

Although emergency facilities in North America collect data on admitted traumatic brain injuries (TBI) cases, there isn’t meaningful data on the number of unreported mTBIs among athletes. Recent studies indicate a significant rate of under-reporting of sports-related mTBI due to many factors. These factors include the simple inability of team staff to either recognize the signs and symptoms or to actually witness the impact. (See “A prospective study of physician-observed concussions during junior ice hockey: implications for incidence rates” in Additional resources.)

The majority of players involved in hockey and football are not college or professional athletes. There are over 3 million youth hockey players and approximately 5 million registered participants in football. (See “Head Impact Exposure in Youth Football” in Additional resources.) These recreational athletes don’t have basic access to medical staff trained in concussion recognition and sideline injury assessment. A user-friendly measurement and a smartphone-based assessment tool would facilitate the process between identifying potential head injuries, assessment, and return to play (RTP) criteria.

Recently, the use of instrumented sports helmets, including the Head Impact Telemetry System (HITS), has allowed for detailed recording of impacts to the head in many research trials. This practice has led to recommendations to alter contact in practices and certain helmet design parameters. (See “Head impact severity measures for evaluating mild traumatic brain injury risk exposure” in Additional resources.) However, due to the higher costs of the HITS system and complexity of the equipment, it is not a practical impact alert device for the general recreational population.

A simple, practical, and affordable system for measuring head trauma within the sports environment, subject to the absence of trained medical personnel, is required.

Given the proliferation of smartphones, we felt that this was a practical device to investigate to provide this type of monitoring.  All smartphone devices have an embedded Bluetooth communication system to receive and transmit data at various ranges.  For the purposes of this demonstration, we chose a class 1 Bluetooth device as the hardware communication method. We chose it because of its simplicity, widely accepted standard, and compatibility to interface with existing smartphones and IoT devices.

Remote monitoring typically involves collecting information from devices (for example, wearables) at the edge, integrating that information into a data lake, and generating inferences that can then be served back to the relevant stakeholders. Additionally, in some cases, compute and inference must also be done at the edge to shorten the feedback loop between data collection and response.

This use case can be extended to many other use cases in myriad verticals. In this two-part series, we show you how to build a data pipeline in support of a data lake. We use key AWS services such as Amazon Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and AWS Lambda. In part 2, we focus on generating simple inferences from that data that can support RTP parameters.

Architectural overview

Here is the AWS architecture that we cover in this two-part series:

Note: For the purposes of our demonstration, we chose to use heart rate monitoring sensors rather than helmet sensors because they are significantly easier to acquire. Both types of sensors are very similar in how they transmit data. They are also very similar in terms of how they are integrated into a data lake solution.

The resulting demonstration transfers the heartbeat data using the following components:

  • AWS Greengrass set up with a Raspberry Pi 3 to stream heart rate data into the cloud.
  • Data is ingested via Amazon Kinesis Data Streams, and raw data is stored in an Amazon S3 bucket using Kinesis Data Firehose. Find more details about writing to Kinesis Data Firehose using Kinesis Data Streams.
  • Kinesis Data Analytics averages out the heartbeat-per-minute data during stream data ingestion and passes the average to an AWS Lambda
  • AWS Lambda enriches the heartbeat data by comparing the real-time data with baseline information stored in Amazon DynamoDB.
  • AWS Lambda sends SMS/email alerts via an Amazon SNS topic if the heartbeat rate is greater than 120 BPM, for example.
  • AWS Glue runs an extract, transform, and load (ETL) job. This job transforms the data store in a JSON format to a compressed Apache Parquet columnar format and applies that transformed partition for faster query processing. AWS Glue is a fully managed ETL service for crawling data stored in an Amazon S3 bucket and building a metadata catalog.
  • Amazon Athena is used for ad hoc query analysis on the data that is processed by AWS Glue. This data is also available for machine learning processing using predictive analysis to reduce heart disease risk.
  • Amazon QuickSight is a fully managed visualization tool. It uses Amazon Athena as a data source and depicts visual line and pie charts to show the heart rate data in a visual dashboard.

All data pipelines are serverless and are refreshed periodically to provide up-to-date data.

You can use Kinesis Data Firehose to transform the data in the pipeline to a compressed Parquet format without needing to use AWS Glue. For the purposes of this post, we are using AWS Glue to highlight its capabilities, including a centralized AWS Glue Data Catalog. This Data Catalog can be used by Athena for ad hoc queries and by Apache Spark EMR to run complex machine learning processes. AWS Glue also lets you edit generated ETL scripts and supports “bring your own ETL” to process data for more complex use cases.

Configuring key processes to support the pipeline

The following sections describe how to set up and configure the devices and services used in the demonstration to build a data pipeline in support of a data lake.

Remote sensors and IoT devices

You can use commercially available heart rate monitors to collect electrocardiography (ECG) information such as heart rate. The monitor is strapped around the chest area with the sensor placed over the sternum for better accuracy. The monitor measures the heart rate and sends the data over Bluetooth Low Energy (BLE) to a Raspberry Pi 3. The following figure depicts the device-side architecture for our demonstration.

The Raspberry Pi 3 is host to both the IoT device and the AWS Greengrass core. The IoT device is responsible for connecting to the heart rate monitor over BLE and collecting the heart rate data. The collected data is then sent locally to the AWS Greengrass core, where it can be processed and routed to the cloud through a secure connection. The AWS Greengrass core serves as the “edge” gateway for the heart rate monitor.

Set up AWS Greengrass core software on Raspberry Pi 3

To prepare your Raspberry Pi for running AWS Greengrass software, follow the instructions in Environment Setup for Greengrass in the AWS Greengrass Developer Guide.

After setting up your Raspberry Pi, you are ready to install AWS Greengrass and create your first Greengrass group. Create a Greengrass group by following the steps in Configure AWS Greengrass on AWS IoT. Then install the appropriate certificates to the Raspberry Pi by following the steps to start AWS Greengrass on a core device.

The preceding steps deploy a Greengrass group that consists of three discrete configurable items: a device, a subscription list, and the connectivity information.

The core device is a set of code that is responsible for collecting the heart rate information from the sensor and sending it to the AWS Greengrass core. This device is using the AWS IoT Device SDK for Python including the Greengrass Discovery API.

Use the following AWS CLI command to create a Greengrass group:

aws greengrass create-group --name heartRateGroup

To complete the setup, follow the steps in Create AWS IoT Devices in an AWS Greengrass Group.

After you complete the setup, the heart rate data is routed from the device to the AWS IoT Core service using AWS Greengrass. As such, you need to add a single subscription in the Greengrass group to facilitate this message route:

Here, your device is named Heartrate_Sensor, and the target is the IoT Cloud on the topic iot/heartrate. That means that when your device publishes to the iot/heartrate topic, AWS Greengrass also sends this message to the AWS IoT Core service on the same topic. Then you can use the breadth of AWS services to process the data.

The connectivity information is configured to use the local host because the IoT device resides on the Raspberry Pi 3 along with the AWS Greengrass core software. The IoT device uses the Discovery API, which is responsible for retrieving the connectivity information of the AWS Greengrass core that the IoT device is associated with.

The IoT device then uses the endpoint and port information to open a secure TLS connection to AWS Greengrass core, where the heart rate data is sent. The AWS Greengrass core connectivity information should be depicted as follows:

The power of AWS Greengrass core is that you can deploy AWS Lambda functions and new subscriptions to process the heart rate information locally on the Raspberry Pi 3. For example, you can deploy an AWS Lambda function that can trigger a reaction if the detected heart rate is reaching a set threshold. In this scenario, different individuals might require different thresholds and responses, so you could theoretically deploy unique Lambda functions on a per-individual basis if needed.

Configure AWS Greengrass and AWS IoT Core

To enable further processing and storage of the heart rate data messages published from AWS Greengrass core to AWS IoT Core, create an AWS IoT rule. The AWS IoT rule retrieves messages published to the IoT/heartrate topic and sends them to the Kinesis data stream through an AWS IoT rule action for Kinesis action.  

Simulate heart rate data

You might not have access to an IoT device, but you still want to run a proof of concept (PoC) around heart rate use cases. You can simulate data by creating a shell script and deploying that data simulation script on an Amazon EC2 instance. Refer to the EC2 user guide to get started with Amazon EC2 Linux instances.

On the Amazon EC2 instance, create a shell script kinesis_client_HeartRate.sh, and copy the provided code to start writing some records into the Kinesis data stream. Be sure to create your Kinesis data stream and replace the variable <your_stream_name> in the following script.

#!/bin/sh
while true
do
  deviceID=$(( ( RANDOM % 10 )  + 1 ))
  heartRate=$(jot -r 1 60 140)
  echo "$deviceID,$heartRate"
  aws kinesis put-record --stream-name <your_stream_name> --data "$deviceID,$heartRate"$'\n' --partition-key $deviceID --region us-east-1
done

You can also use the Kinesis Data Generator to create data and then stream it to your solution or demonstration. For details on its use, see the blog post Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator.

Ingest data using Kinesis and manage alerts with Lambda, DynamoDB, and Amazon SNS

Now you need to ingest data from the IoT device, which can be processed for real-time notifications when abnormal heart rates are detected.

Streaming data from the heart rate monitoring device is ingested to Kinesis Data Streams. Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data. For this project, the data stream was configured with one open shard and a data retention period of 24 hours. This lets you send 1 MB of data or 1,000 events per second and read 2 MB of data per second. If you need to support more devices, you can scale up and add more shards using the UpdateShardCount API or the Amazon Kinesis scaling utility.

You can configure your data stream by using the following AWS CLI command (and then using the appropriate flag to turn on encryption).

aws kinesis create-stream --stream-name hearrate_stream --shard-count 1

You can use an AWS CloudFormation template to create the entire stack depicted in the following architecture diagram.

When launching an AWS CloudFormation template, be sure to enter your email address or mobile phone number with the appropriate endpoint protocol (“Email” or “SMS”) as parameters:

Alternatively, you can follow the manual steps in the documentation links that are provided in this post.

Streaming data in Kinesis can be processed and analyzed in real time by Kinesis clients. Refer to the Kinesis Data Streams Developer Guide to learn how to create a Kinesis data stream.

To identify abnormal heart rate information, you must use real-time analytics to detect abnormal behavior. You can use Kinesis Data Analytics to perform analytics on streaming data in real time. Kinesis Data Analytics consists of three configurable components: source, real-time analytics, and destination. Refer to the AWS documentation to learn the detailed steps to configure Kinesis Data Analytics.

Kinesis Data Analytics uses Kinesis Data Streams as the source stream for the data. In the source configuration process, if there are scenarios where in-filtering or masking records is required, you can preprocess records using AWS Lambda. The data in this particular case is relatively simple, so you don’t need preprocessing of records on the data.

The Kinesis Data Analytics schema editor lets you edit and transform the schema if required. In the following example, we transformed the second column to Value instead of COL_Value.

The SQL code to perform the real-time analysis of the data has to be copied to the SQL Editor for real-time analytics. The following is the sample code that was used for this demonstration.

“CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                                   VALUEROWTIME TIMESTAMP,
                                   ID INTEGER, 
                                   COLVALUE INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
  INSERT INTO "DESTINATION_SQL_STREAM" 
SELECT STREAM ROWTIME,
              ID,
              AVG("Value") AS HEARTRATE
FROM     "SOURCE_SQL_STREAM_001"
GROUP BY ID, 
         STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) HAVING AVG("Value") > 120 OR AVG("Value") < 40;”

This code generates DESTINATION_SQL_STREAM. It inserts values into the stream only when the average value of the heart beat that is received from SOURCE_SQL_STREAM_001 is greater than 120 or less than 40 in the 60-second time window.

For more information about the tumbling window concept, see Tumbling Windows (Aggregations Using GROUP BY).

Next, add an AWS Lambda function as one of your destinations, and configure it as follows:

In the destination editor, make sure that the stream name selected is the DESTINATION_SQL_STREAM. You only want to trigger the Lambda function when anomalies in the heart rate are detected. The output format can be JSON or CSV. In this example, our Lambda function expects the data in JSON format, so we chose JSON.

Athlete and athletic trainer registration information is stored in the heartrate Registrations DynamoDB table. Amazon DynamoDB offers fully managed encryption at rest using an AWS Key Management Service (AWS KMS) managed encryption key for DynamoDB. You need to create a table with encryption at rest enabled. Follow the detailed steps in Amazon DynamoDB Encryption at Rest.

Each record in the table should include deviceid, customerid, firstname, lastname, and mobile. The following is an example table record for reference.

{
  "customerid": {
    "S": "3"
  },
  "deviceid": {
    "S": "7"
  },
  "email": {
    "S": "[email protected]"
  },
  "firstname": {
    "S": "John"
  },
  "lastname": {
    "S": "Smith"
  },
  "mobile": {
    "S": "19999999999"
  }
}

Refer to the DynamoDB Developer Guide for complete instructions for creating and populating a DynamoDB table.

The Lambda function is created to process the record passed from the Kinesis Data Analytics application.  The node.js Lambda function retrieves the athlete and athletic trainer information from the DynamoDB registrations table. It then alerts the athletic trainer to the event by sending a cellular text message via the Amazon Simple Notification Service (Amazon SNS).

Note: The default AWS account limit for Amazon SNS for mobile messages is $1.00 per month. You can increase this limit through an SNS Limit Increase case as described in AWS Service Limits.

You now create a new Lambda function with a runtime of Node.js 6.10 and choose the Create a custom role option for IAM permissions.  If you are new to deploying Lambda functions, see Create a Simple Lambda Function.

You must configure the new Lambda function with a specific IAM role, providing privileges to Amazon CloudWatch Logs, Amazon DynamoDB, and Amazon SNS as provided in the supplied AWS CloudFormation template.

The provided AWS Lambda function retrieves the HR Monitor Device ID and HR Average from the base64-encoded JSON message that is passed from Kinesis Data Analytics.  After retrieving the HR Monitor Device ID, the function then queries the DynamoDB Athlete registration table to retrieve the athlete and athletic trainer information.

Finally, the AWS Lambda function sends a mobile text notification (which does not contain any sensitive information) to the athletic trainer’s mobile number retrieved from the athlete data by using the Amazon SNS service.

To store the streaming data to an S3 bucket for further analysis and visualization using other tools, you can use Kinesis Data Firehose to connect the pipeline to Amazon S3 storage.  To learn more, see Create a Kinesis Data Firehose Delivery Stream.

Kinesis Data Firehose delivers the streaming data in intervals to the destination S3 bucket. The intervals can be defined using either an S3 buffer size or an S3 buffer interval (or both, whichever exceeds the first metric). The data in the Data Firehose delivery stream can be transformed. It also lets you back up the source record before applying any transformation. The data can be encrypted and compressed to GZip, Zip, or Snappy format to store the data in a columnar format like Apache Parquet and Apache ORC. This improves the query performance and reduces the storage footprint. You should enable error logging for operational and production troubleshooting.

Conclusion

In part 1 of this blog series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and Lambda. In part 2, we’ll discuss how to deploy a serverless data lake and use key analytics to create actionable insights from the data lake.

Additional resources

Langlois, J.A., Rutland-Brown, W. & Wald, M., “The epidemiology and impact of traumatic brain injury: a brief overview,” Journal of Head Trauma Rehabilitation, Vol. 21, No. 5, 2006, pp. 375-378.

Echlin, S. E., Tator, C. H., Cusimano, M. D., Cantu, R. C., Taunton, J. E., Upshur E. G., Hall, C. R., Johnson, A. M., Forwell, L. A., Skopelja, E. N., “A prospective study of physician-observed concussions during junior ice hockey: implications for incidence rates,” Neurosurg Focus, 29 (5):E4, 2010

Daniel, R. W., Rowson, S., Duma, S. M., “Head Impact Exposure in Youth Football,” Annals of Biomedical Engineering., Vol. 10, 2012, 1007.

Greenwald, R. M., Gwin, J. T., Chu, J. J., Crisco, J. J., “Head impact severity measures for evaluating mild traumatic brain injury risk exposure,” Neurosurgery Vol. 62, 2008, pp. 789–79


Additional Reading

If you found this post useful, be sure to check out Setting Up Just-in-Time Provisioning with AWS IoT Core, and Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.

 


About the Authors

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them architectural guidance for building scalable architecture in hybrid and AWS environments.

 

 

 

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. During his free time, he likes to cook and travel.

 

 

 

John Cupit is a partner solutions architect for AWS’ Global Telecom Alliance Team.  His passion is leveraging the cloud to transform the carrier industry.  He has a son and daughter who have both graduated from college. His daughter is gainfully employed, while his son is in his first year of law school at Tulane University.  As such, he has no spare money and no spare time to work a second job.

 

 

David Cowden is partner solutions architect and IoT specialist working with AWS emerging partners. He works with customers to provide them architectural guidance for building scalable architecture in IoT space.

 

 

 

Josh Ragsdale is an enterprise solutions architect at AWS.  His focus is on adapting to a cloud operating model at very large scale. He enjoys cycling and spending time with his family outdoors.

 

 

 

Pierre-Yves Aquilanti, Ph.D., is a senior specialized HPC solutions architect at AWS. He spent several years in the oil & gas industry to optimize R&D applications for large scale HPC systems and enable the potential of machine learning for the upstream. He and his family crave to live in Singapore again for the human, cultural experience and eat fresh durians.

 

 

Manuel Puron is an enterprise solutions architect at AWS. He has been working in cloud security and IT service management for over 10 years. He is focused on the telecommunications industry. He enjoys video games and traveling to new destinations to discover new cultures.

 

How SimilarWeb analyze hundreds of terabytes of data every month with Amazon Athena and Upsolver

Post Syndicated from Yossi Wasserman original https://aws.amazon.com/blogs/big-data/how-similarweb-analyze-hundreds-of-terabytes-of-data-every-month-with-amazon-athena-and-upsolver/

This is a guest post by Yossi Wasserman, a data collection & innovation team leader at Similar Web.

SimilarWeb, in their own words: SimilarWeb is the pioneer of market intelligence and the standard for understanding the digital world. SimilarWeb provides granular insights about any website or mobile app across all industries in every region. SimilarWeb is changing the way businesses make decisions by empowering marketers, analysts, sales teams, investors, executives and more with the insights they need to succeed in a digital world.

SimilarWeb is a market intelligence company that provides insights on what is happening across the digital world. Thousands of customers use these insights to make critical decisions on how to improve strategies in marketing, drive sales, make investments and more. The importance of the decision making that our solution empowers puts a huge emphasis on our capacity to effectively collect and utilize this information.

Specifically, the team I lead is charged with overseeing SimilarWeb’s mobile data collection. We currently process hundreds of TB of anonymous data every month.

The  data-collection process is mission-critical for SimilarWeb, because we can’t provide our customers’ insights based on a flawed or incomplete data. The data collection team needs analytics to track new types of data, partner integrations, overall performance and more with great effectiveness as quickly as possible. The earlier my team can identify and address anomalies, the better. Any tool that supports this process gives us a significant advantage.

Technical challenges of SimilarWeb mobile data collection

Hundreds of TB of data is streamed into SimilarWeb every month from different sources. The data is complex.  It containins hundreds of fields, many of which are deeply nested, in addition to some with null values. This complexity creates a technical challenge because the data must be cleaned, normalized and prepared for querying.

The first option was to use our existing on-premises Hadoop cluster, which processes all of SimilarWeb’s data in a daily batch process that takes a few hours to run. For our business-critical monitoring, a 24-hour delay is not acceptable.

We considered developing a new process using Hadoop. But that would require our team to focus away from daily operations to code, scale, and maintain extract, transform and load (ETL) jobs. Additionall, having to deal with different databases will deflect our team’s focus back to operations. Therefore, we wanted an agile solution where every team member could create new reports, investigate discrepancies, and add automated tests.

We also had a count distinct problem, which caused a compute bottleneck. The count distinct problem is the difficulty of finding the number of distinct elements in a data stream containing repeated elements. We track the number of unique visitors for billions of possible segments; for example, by device, operating system, or country. Count distinct is a non-additive aggregation, so calculating an accurate number of unique visitors usually requires many memory-intensive compute nodes.

Why we chose Amazon Athena

We chose Amazon Athena to solve these.  Athena offered us:

  1. Fast queries using SQL — our team wants to use SQL to query the data, but traditional SQL databases are hard to scale to hundreds of terabytes. Athena works for us because it runs distributed SQL queries on Amazon S3 using Apache Presto.
  2. Easy to maintain — Athena is a serverless platform that doesn’t require IT operating costs.
  3. Low cost — storing so much data in a data warehouse would be very expensive, because of the number of database nodes required to store all the data. We query only a small portion of the data, so the Athena price of $0.005 per GB scanned is appealing.

Why we chose Upsolver

As another part of our solution, we chose Upsolver. Upsolver bridges together the data lake and the analytics users who aren’t big data engineers. Its cloud data lake platform helps organizations efficiently manage a data lake. Upsolver enables a single user to control big streaming data from ingestion to management and preparation for analytics platforms like Athena, Amazon Redshift and Amazon Elasticsearch Service (Amazon ES).

We chose Upsolver for these reasons:

  1. Upsolver allows us to provide hourly statistics to Athena and S3. As a result, we can identify anomalies after 1 hour instead of 24 hours.
  2. Upsolver is easy to configure at scale by using its graphical user interface (GUI). It took us about an hour to build a pipeline from Kafka to Athena, including the number of distinct visitors for every segment we track.
  3. Upsolver’s Stream Discovery tool was very helpful when we created our tables in Athena. We saw the exact distribution of values for every field in our JSON, which helped us find the fields we wanted and make necessary corrections.
  4. Upsolver is easy to maintain. Upsolver is a serverless platform with little IT overhead.
  5. Our data remains private in our own S3 bucket. Although our data is anonymous, we didn’t want it out of our control.

Our solution

Our solution includes Athena for SQL analytics, S3 for events storage, and Upsolver for data preparation. The following diagram shows our solution end-to-end.

In the following sections, we walk through our solution step-by-step.

Step 1: Get the raw data from Kafka to S3

Our data resides on-premises in Kafka. We use Upsolver’s connector to read events from Kafka and store them on S3. We needed to do this only once.  The following example shows how we created the data source in Kafka.

Step 2: Create a reduced stream on S3

Storing hundreds of TB on S3 wasn’t necessary for the kind of analytics we needed to perform. Our full stream includes about 400 fields, although we need only 20–30 of them. We used Upsolver to create a reduced stream. This reduced stream contains some of the fields from the original stream, plus some new calculated fields we added with Upsolver. We store the output in S3.

We also knew that our Kafka topic contained events that weren’t relevant to our use case. As another part of reducing our stream, we filtered out those events using Upsolver. Although we keep the raw data for only one day, we store the reduced stream for one year. The reduced stream enables us to stay dynamic — using a one-year event source, but at a much lower cost than storing the full raw data.

Step 3: Create and manage tables in Athena

AWS Glue Data Catalog and Amazon S3 are critical when using Athena. Athena takes advantage of a concept known as external tables. This means tables’ schema definitions are managed in the AWS Glue Data Catalog and the data itself is managed in S3.

We mapped the nested JSON source files to a flat output table backed by Parquet files by using the Upsolver Create Output feature. When Upsolver is run, it creates two different types of output:

  • Flat Parquet files on S3, as shown in the following screenshot. We frequently use the aggregation option, because we track the number of distinct users for various segments on an hourly basis.

  • Four added columns for our tables, as shown in the following example. Our tables’ schema change over time as a result of schema evolution and partition management. Because we’re using daily partitions, Upsolver added the four columns to our tables. Upsolver manages all of these changes for us directly in the AWS Glue Data Catalog.

Step 4: Perform SQL analytics in Athena

Our developers use the Athena SQL console to look for anomalies in new data. Usually a spike or a drop in the number of distinct users per tracked dimension indicates an anomaly. Examples of tracked dimensions can be SDK versions, devices, or operating systems.

We also track various fields, in real-time, for production debugging. Also, in some cases, we add queries to continuous integrations jobs.

Conclusion

In this post, I discussed our main challenges when dealing with large amounts of data.  I provided you with a glimpse into our thought process for selecting Amazon Athena and Upsolver to build a performant, cost effective, and efficient solution that everyone on the team can use.  Overall, this new pipeline helped improve our efficiency and reduce the time from ingestion to insight from 24 hours to minutes.


Additional Reading

If you found this post useful, be sure to check out Build a Data Lake Foundation with AWS Glue and Amazon S3 and Work with partitioned data in AWS Glue.

 


About the Author

Yossi Wasserman is a data collection & innovation team leader at SimilarWeb. He holds a BSc in Software Engineering, and over then 10 years of experience in the field of mobile app development.

 

 

 

 

How Pagely implemented a serverless data lake in AWS to facilitate customer support analytics

Post Syndicated from Joshua Eichorn original https://aws.amazon.com/blogs/big-data/how-pagely-implemented-a-serverless-data-lake-in-aws-to-facilitate-customer-support-analytics/

Pagely is an AWS Advanced Technology Partner providing managed WordPress hosting services. Our customers continuously push us to improve visibility into usage, billing, and service performance. To better serve these customers, the service team requires an efficient way to access the logs created by the application servers.

Historically, we relied on a shell script that gathered basic statistics on demand. When processing the logs for our largest customer, it took more than 8 hours to produce one report using an unoptimized process running on an Amazon EC2 instance—sometimes crashing due to resource limitations. Instead of putting more effort into fixing a legacy process, we decided it was time to implement a proper analytics platform.

All of our customer logs are stored in Amazon S3 as compressed JSON files. We use Amazon Athena to run SQL queries directly against these logs. This approach is great because there is no need for us to prepare the data. We simply define the table and query away. Although JSON is a supported format for Amazon Athena, it is not the most efficient format for use with regards to performance and cost. JSON files must be read in their entirety, even if you are only returning one or two fields from each row of data, resulting in scanning more data than is required. Additionally, the inefficiencies of processing JSON cause longer query times.

Querying the logs of our largest customers was not ideal with Athena, as we ran into the 30-minute query timeout limit. This limit can be increased, but the query was already taking longer than we wanted.

In this post, we discuss how Pagely worked with Beyondsoft, an AWS Advanced Consulting Partner, to use ConvergDB, an open-source tool developed by Beyondsoft, to build a DevOps-centric data pipeline. This pipeline uses AWS Glue to transform application logs into optimized tables that can be queried quickly and cost effectively using Amazon Athena.

Engaging Beyondsoft

We knew that we needed to do something to make our data easily accessible to our engineers with as little overhead as possible. We wanted to get the data into a more optimal file format to reduce query times. Being a lean shop, we didn’t have the bandwidth to dive into the technologies. To bridge the gap, we engaged Beyondsoft to determine the best solution to optimize and better manage our data lake.

What is ConvergDB?

ConvergDB is open-source software for the creation and management of data lakes. Users define the structure of the source and target tables and map them to concrete cloud resources. Then ConvergDB creates all of the scripts used to build and manage the necessary resources in AWS. The scripts created by ConvergDB are deployed through the use of HashiCorp Terraform, an open-source tool for managing infrastructure as code.

With ConvergDB, we can define our data lake with metadata to drive the table-creation and ETL (extract, transform, and load) processes. The schema files are used to define tables, including field-level SQL expressions that are used to transform the incoming data as it is being loaded. These expressions are used to derive calculated fields, in addition to the fields that are used for data partitioning.

After the schema is defined, the deployment file allows us to place the tables into an ETL job that is used to manage them. The ETL job schedule is specified in the deployment file and in optional fields such as the target S3 bucket and number of AWS Glue DPUs to use at runtime.

ConvergDB is a command line binary and does not need to be installed on a server. All of the artifacts are files that can be managed using source control. The ConvergDB binary takes in all the configuration files and then outputs a Terraform configuration containing all the artifacts necessary to deploy the data lake. These artifacts can be ETL scripts, table and database definitions, and IAM policies necessary to run the jobs. They can also include Amazon SNS notification topics, and even an Amazon CloudWatch dashboard showing the volume of data processed by ConvergDB ETL jobs.

Speed bumps

No implementation goes perfectly. In the following sections, Jeremy Winters, a Beyondsoft engineer, explains the problems we ran into and how they were addressed.

Partitioning and columnar formats

Two of the key best practices for structuring data for SQL analytic queries in Amazon S3 are partitioning and columnar file structure.

Partitioning is the process of splitting data into different prefixes or folders on S3 with a naming convention that’s most suitable to efficient retrieval of data. This allows Athena to skip over data that is not relevant to the particular query that is being executed.

Apache Parquet is a columnar format popular with tools in the Hadoop ecosystem. Parquet stores the columns of the data in separate, contiguous regions in the file. Directed by metadata footers, tools like Athena read only the sections of the file that are needed to fulfill the query. This process helps eliminate a large portion of I/O and network data transfer.

Reducing I/O through partitioning and Parquet files not only increases query performance, but it dramatically reduces the cost of using Athena.

For more information about best practices for data lake optimization, see the blog post Top 10 Performance Tuning Tips for Amazon Athena.

Small files problem

A classic issue encountered with Hadoop ecosystem tools is known as the “small files problem.” Processing a large number of small files creates a lot of overhead for the system, causing job execution times to skyrocket and potentially fail. Pagely had approximately 4 TB of history across 30 million files. Of these files, 29.5 million represented only 1.2 TB of the data in S3.

To analyze this issue, we enabled S3 inventory reporting on the source data bucket. The report is delivered daily in an ORC (optimized row columnar) format. From there, it is very easy to create an Athena table to analyze the bucket contents using SQL.

We used Athena to identify S3 prefixes that were “hot spots,” that is, those having a large number of small files. We identified 14,000 prefixes with less than 1 GB of data that we could consolidate. So… 29.5 million files consolidated into 14,000 files.

The following query is a way to identify small-file hot spots. The GROUP BY expression can be suited to your data. The example shows a way of grouping by the first “folder” in the bucket.

SELECT
  -- we are looking at the first string in a / delimited path
  -- if the key is path/2017-10-10.json it will group on path_to_data
  split_part(key,'/',1) AS prefix
  
  -- calculate the total size in mb for all files in prefix
  ,SUM(size)/CAST(1024*1024 AS DOUBLE) AS mb
  
  -- count of objects in the prefix
  ,COUNT(*) AS object_count
FROM
  pagely_gateway_logs
WHERE
  -- assumes that versioning is disabled
  -- you should use the latest date after
  -- refreshing all partitions
  dt = '2018-03-28-08-00'
GROUP BY
  1
HAVING
  -- only return prefixes with a total size of less than 1 gb
  -- and a file count greater than 8
  SUM(size)/CAST(1024*1024 AS DOUBLE) < 1024
  AND
  COUNT(*) >= 8

 

The results show prefixes in your object paths that can and should be consolidated. Anything less than 1 GB with more than eight files can then be consolidated into a single object, replacing the originals.

After the hot spots are identified, the small files must be consolidated. The results in the preceding image show that files with a prefix of 14184 total 33.7 MB spread across 502 files. To reduce the overhead of this many small files, we combine all of the files into a single file. Our files use gzip compression, which allows for combining them through simple concatenation, as opposed to decompressing, concatenating the raw JSON data, and then recompressing. There are many ways to achieve this, such as the Linux cat command, which is shown in the following example:

$ ls -1

14184_file1.gz

14184_file2.gz

14184_file3.gz

$ cat *.gz > combined.gz

$ gzip -d combined.gz

You can run these commands on any gzip files, and then test that the resulting file is a valid gzip by using the -d flag, which decompresses the file for you. For some use cases, you can use the Amazon S3 Multipart Copy API, but this approach requires that your small files be at least 5 MB in size.

For the Pagely dataset, we wrote a shell script to pull down all the files with a given prefix, concatenate them into a single gzip, upload the concatenated file, validate the upload, and then delete the smaller files. This script was run using AWS Fargate containers, each of which would handle a single prefix. The details of this process would be a blog post on its own, but using a service like AWS Batch can make a job like this easier to manage. The total cost of concatenating all of the small historical files was $27.

Historical data

Daily data volumes for Pagely logs are in the tens of gigabytes per day, easily handled by the smallest AWS Glue configuration. Transforming the 4 TB of compressed (~28 TB uncompressed) historical data was a bit more challenging.

ConvergDB batches the data into smaller chunks. In the case of a very long-running historical transformation job failing, only the last batch is lost, resulting in around one hour of compute being lost. ConvergDB uses its own state-tracking mechanism to communicate the failure to the next run of the job, which cleans up any mess before trying to process the batch again. Batching is an automatic feature of the ETL job created by ConvergDB, based upon the size of the AWS Glue cluster.

Post-deployment at Pagely

Running our legacy report for a medium-size application (several gigabytes in S3) took 91 seconds. Now that our data lake is in production, the same report for a medium-size application takes 5 seconds with Athena—an 18x performance gain. Our largest dataset (~1 TB in S3) fails with our legacy process. It’s also not sufficiently performant when querying the raw JSON directly with Athena. However, the new Parquet-based tables using Athena complete the analysis in 24 seconds.

Legacy process Athena with JSON Athena with Parquet
Medium customer 1 minute, 31 seconds 1 minute 6 seconds
Largest customer > 8 hours > 30 minutes 24 seconds

 

Although these numbers are obviously important, the biggest advantage is that now we don’t have to worry about performance and cost, and the engineers can focus on solving problems. Just 15 minutes of writing queries, and the entire team now has access to new data. I was able to upgrade the legacy process with queries dispatched to Athena through the AWS SDK. This process can now run on any lightweight machine (like my laptop) while Athena does the heavy lifting.

About Pagely

Pagely, in their own words: Pagely is an AWS Advanced Technology, SaaS, and Public Sector Partner providing managed WordPress hosting. We service enterprise-level customers like BMC, UNICEF, Northwestern University, and the City of Boston, offering flexibility in our solutions and the industries best expert-only, tier-less support. Pagely uses a proprietary tech stack that accelerates WordPress sites through the use of our own ARES™ Web Application gateway, PressCACHE™ and PressCDN™ technologies, and open source tools such as Redis and NGINX.

About Beyondsoft Consulting, Inc.

Beyondsoft Consulting, Inc., in their own words: Beyondsoft Consulting, Inc. is an Amazon Web Services Advanced Consulting Partner with delivery centers across the US and Asia. Beyondsoft delivers IT solutions and services to leading technology companies and enterprises across many verticals. Our team of highly skilled professionals, coupled with our focus on customer success, truly separate us as a preferred AWS Partner for many of our clients.

Beyondsoft Amazon Partner Page

Contact: Eric Valenzuela

If you have questions or suggestions, please comment below.

 


Additional Reading

If you found this post useful, be sure to check out Build a Data Lake Foundation with AWS Glue and Amazon S3 and Work with partitioned data in AWS Glue.

 


About the Authors

Joshua Eichorn is CTO of Pagely. An engineering leader with experience leading small and large teams. As an individual contributor, manager, and director he has done it all, from writing line one on a new application to a 6 month rewrite of a massive site. Josh loves solving new challenges and building great products.

 

 

Jeremy Winters is the creator of ConvergDB, and has been working in Business Intelligence across a variety of industries for 18 years, with the past 8 years being focused on building data lakes, data warehouses, and other applications in AWS.

 

 

 

How Goodreads offloads Amazon DynamoDB tables to Amazon S3 and queries them using Amazon Athena

Post Syndicated from Joe Feeney original https://aws.amazon.com/blogs/big-data/how-goodreads-offloads-amazon-dynamodb-tables-to-amazon-s3-and-queries-them-using-amazon-athena/

At Goodreads, we’re currently in the process of decomposing our monolithic Rails application into microservices. For the vast majority of those services, we’ve decided to use Amazon DynamoDB as the primary data store. We like DynamoDB because it gives us consistent, single-digit-millisecond performance across a variety of our storage and throughput needs.

Although DynamoDB excels at high-throughput read and write workloads, it’s not optimized to support one-time, ad hoc queries or data warehouse workloads. However, by combining AWS Data Pipeline, Amazon S3, AWS Glue, and Amazon Athena we can export our dataset from DynamoDB to S3 and use Athena to run SQL queries against that dataset.

Architecture overview

Our architecture for this process is as follows:

  • AWS Data Pipeline scans data from a DynamoDB table and writes it to S3 as JSON. It broadcasts on an Amazon SNS topic when it is finished.
  • An AWS Glue job invoked by AWS Lambda converts the JSON data into Parquet.
  • An AWS Glue crawler adds the Parquet data in S3 to the AWS Glue Data Catalog, making it available to Athena for queries.

We could have queried the data in the JSON format, thus removing the extra step of the Parquet conversion. However, we decided to make the additional effort because Parquet is space-efficient and high-performing. For larger datasets, this approach translates not only into faster queries, but also cost savings.

Major architecture components

The major components of our architecture are described following.

Data Pipeline is an orchestration service for spinning up Amazon EMR clusters and running fault-tolerant jobs using big data technology like Apache Pig, Apache Hive, or Apache Spark. Data Pipeline provides a template for exporting data from an arbitrary DynamoDB table to Amazon S3. We use a slightly modified version of the standard export template.

In this version, we add the ability to send success or failure messages on Amazon SNS. Doing this lets us use Lambda to kick off further processing outside of the Data Pipeline service.

AWS Glue is used in three different ways in this architecture:

  • A serverless Apache Spark environment runs a job that converts the JSON export from Data Pipeline into the Apache Parquet format.
  • An AWS Glue crawler automatically crawls and infers the schema of our dataset and adds it to the AWS Glue Data Catalog.
  • The AWS Glue Data Catalog is the metadata store for our dataset so we can query the data with Athena.

Athena is used after the data is in the AWS Glue Data Catalog. At this point, you can query it in Athena with ANSI SQL.

Setting up infrastructure

In this process, we use AWS CloudFormation to manage our AWS resources. We’ve split the various AWS resources across three stacks to make them more composable.

The reviews.yaml template defines an example DynamoDB table called Reviews. The common.yaml template contains IAM and S3 resources that are shared across stacks. The dynamodb-exports.yaml template defines a Data Pipeline, Lambda function, AWS Glue job, and AWS Glue crawlers.

Working with the Reviews stack

The reviews.yaml CloudFormation template contains a simple DynamoDB table definition for storing user reviews on books. We’re using a hash key and sort key structure that nests each review on a book under a particular user. This structure allows an application to check if a user has a review on a book in a simple get operation and also to list all reviews by a user.

Working with the DynamoDB table schema

The table defined in reviews.yaml is a hash key and sort key table. The User attribute is the hash key, and the Book attribute is the sort key. If you build an application on this table, you might add additional Global Secondary Indexes (GSIs) to accommodate other access patterns, for example showing the highest weighted reviews for a book.

First, you create a CloudFormation stack:

  1. Click on this this Launch Stack button:
  2. Choose Next at the bottom of the screen.
  3. On the Options screen, leave everything set to the default and choose Next at the bottom of the screen.
  4. Choose Create in the Review

Next, you create test items in Reviews table. After the ReviewsStack status is CREATE_COMPLETE, you can open up the DynamoDB console and explore the table. Let’s add a few items to the table:

  1. Open DynamoDB in the AWS Management Console.
  2. Choose Tables from the left navigation pane
  1. Choose the Reviews table, and then choose the Items

  1. Choose Create item, and in the Create item box, for Tree choose Text.

  1. Remove the existing text, and copy and paste the item following into the text box.
{
  "User": "Tristan",
  "Book": "Harry Potter and the Philosopher's Stone",
  "Rating": 5,
  "Review": "A thrilling journey through the world of Hogwarts",
  "Author": "J.K. Rowling"
}
  1. Choose Save.

Now let’s add one more item.

{
  "User": "Adeline",
  "Book": "Harry Potter and the Sorcerer's Stone",
  "Rating": 4,
  "Review": "Harry is pretty brave, but Hermione is the clear hero",
  "Author": "J.K. Rowling"
}

You can see that we’ve added a few different fields that were not specified in the table schema. Notably, these are: Rating, Review, and Author. Because DynamoDB is a NoSQL database, we can add new attributes as our application evolves. However, to aggregate against those attributes efficiently, they either need to be a part of the primary key schema at table creation or defined on an index.

The Goodreads reviews table is not dissimilar from our example table. However, we have used our maximum of five Global Secondary Indexes (GSIs) to support the access patterns that our users need the most. It’s no longer an option for us to create short-lived GSIs to answer arbitrary questions we have about our data. Even if we could, we have so much data that creating a GSI creates a few days.

Now imagine that our product team wants to run queries over the reviews data for arbitrary authors. We can’t add an additional GSI, and the access pattern isn’t required in production. However, by using the architecture described in this blog post we can unlock our dataset for our product team.

Feel free to add more items to the table, because the more data you have in the table when we export it the more interesting SQL queries you can run in Athena.

Creating the common stack

The common.yaml CloudFormation template creates a variety of IAM and EC2 permissions that Data Pipeline, Lambda, and AWS Glue use. In addition, the template creates a S3 bucket to store DynamoDB exports. The resources that need to be referenced across stacks are declared in the Outputs section.

Create the CloudFormation stack as follows:

  1. Click on this Launch Stack button:
  2. Choose Next at the bottom of the screen.
  3. On the Options screen, leave everything set to the default and choose Next at the bottom of the screen.
  4. In the Capabilities section of Review, choose I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Create.

Creating the DynamoDB exports stack

The dynamodb-exports.yaml template is a self-contained template to create a Data Pipeline, SNS topics, Lambda trigger, AWS Glue job, and an AWS Glue crawler for any given DynamoDB table. If you have multiple DynamoDB tables you want to export, you can reuse the dynamodb-exports.yaml template and create a stack for each table.

The most interesting part of this stack is the AWS Glue job script that converts an arbitrary DynamoDB export file created by the Data Pipeline task into Parquet. It also removes DynamoDB type information from the raw JSON by using Boto3, which is available in the PySpark AWS Glue environment. The code is well-documented, so don’t hesitate to dive in here if you’re interested in how to write a custom AWS Glue job.

Create the CloudFormation stack as follows:

  1. Click on this Launch Stack button:
  2. For parameters, enter the following and then choose Next:

ExportTimeout: 1

MaxConsumedReadThroughput: 0.5

TableName: Reviews

  1. On the Options screen, leave everything set to default and then choose Next.
  2. In the Review section, scroll to the bottom and choose Create.

Watching your data flow from DynamoDB to the AWS Glue Data Catalog

The pipeline from DynamoDB to the Apache Hive catalog is fully automated. After the CloudFormation stack to export Reviews is deployed, the data pipeline begins. You can query the data in Athena soon.

Monitor the data pipeline:

  1. Open AWS Data Pipeline in the console.
  2. Choose the pipeline with the name ReviewsExport.

  1. Monitor the pipeline as it goes through the various stages from provisioning a cluster to running your job.

  1. When the status is Finished, the data is in S3.

The pipeline sends a message on the export success SNS topic. Doing so triggers Lambda to invoke the AWS Glue job to convert the JSON export into Parquet.

Let’s monitor the AWS Glue job:

  1. Open AWS Glue in the console.
  2. Choose Jobs under the ETL header in the left navigation pane.
  3. Choose the check box next to ReviewsExportToParquet to view the job’s run history and other details. At this point, Run Status is in the Running

  1. The AWS Glue job is finished when the Run status reaches the Succeeded

Next, run the AWS Glue crawler:

  1. From the AWS Glue console page, choose Crawlers on the left navigation pane.
  2. Choose the check box next to ReviewsParquetCrawler.
  3. Choose Run crawler at the top of the page.

The first time the crawler runs, it adds the reviews table to the dynamodb-exports database in the AWS Glue Data Catalog. If you accumulate more export snapshots after you run the crawler, subsequent runs of the crawler add new partitions to the table,

Inspecting the reviews table in the AWS Glue Data Catalog

Next, look at the reviews table:

  1. From the AWS Glue console page, choose Tables.

  1. Choose reviews.

The AWS Glue Data Catalog is an Apache Hive–compatible metastore that stores the schema of the dataset. It stores properties such as object count and dataset location in S3, among other data.

Taking a look at the schema, you might notice the ddb_export_timestamp column, which wasn’t originally a part of the attributes that we added to the items in DynamoDB. Under the key column, ddb_export_timestamp is marked as Partition (0).  Partition columns are just like regular columns, and when they are used in WHERE clauses in Athena they allow you to restrict the amount of data scanned. The Athena partitions documentation is a great place to start if you want to know more.

The Lambda function that invokes the Parquet conversion script provides this extra metadata. So, when the AWS Glue crawler infers the schema, the partition is given a useful name, as opposed to the default partition_N name that is given if no partition name is present.

Using Athena to query the dataset

To use Athena to query the dataset, take these steps:

  1. Open Athena on the console.
  2. If you haven’t done so already, upgrade Athena to use the Hive Catalog.
  3. For Database on the left navigation pane, choose dynamodb-exports.

  1. Under Tables, you can see reviews.
  2. Choose the ellipsis at right of reviews, and choose Preview table.

You just ran a SQL query over your DynamoDB dataset! Let’s run an aggregation to count how many reviews J.K. Rowling has. As you might recall, this access pattern isn’t well-supported by our DynamoDB table design.

SELECT COUNT(author) as num_reviews FROM "dynamodb-exports"."reviews"
WHERE author = 'J.K. Rowling';

You might see different results if you added more items, but here are the results we see in our table.

With Athena, as your data grows in size or complexity, you can pull insights out of data from DynamoDB using ANSI SQL.

Next steps

Here are a few ways that you can extend this work:

  • Modify the Data Pipeline to run the DynamoDB export every night at midnight local time to you.
  • Run the AWS Glue Crawler every day at 4 a.m. local time so you always have the latest snapshot of your data in DynamoDB.
  • Use the export success topic to trigger more complex pipelines or aggregations.
  • Combine this approach with building a data lake in S3.

Conclusion

In this post, we show you how to export data from a DynamoDB table, convert it into a more efficient format with AWS Glue, and query the data with Athena. This approach gives you a way to pull insights from your data stored in DynamoDB.


Additional Reading

If you found this post useful, be sure to check these out as well:

 


About the Author

Joe Feeney is a Software Engineer on the Amazon Author team where he leverages all of Amazon’s data to provide Authors with unique, actionable insights. He enjoys losing to his wife and kids at Mario Kart, and building and modifying guitars.

 

 

 

Orchestrate Multiple ETL Jobs Using AWS Step Functions and AWS Lambda

Post Syndicated from Moataz Anany original https://aws.amazon.com/blogs/big-data/orchestrate-multiple-etl-jobs-using-aws-step-functions-and-aws-lambda/

Extract, transform, and load (ETL) operations collectively form the backbone of any modern enterprise data lake. It transforms raw data into useful datasets and, ultimately, into actionable insight. An ETL job typically reads data from one or more data sources, applies various transformations to the data, and then writes the results to a target where data is ready for consumption. The sources and targets of an ETL job could be relational databases in Amazon Relational Database Service (Amazon RDS) or on-premises, a data warehouse such as Amazon Redshift, or object storage such as Amazon Simple Storage Service (Amazon S3) buckets. Amazon S3 as a target is especially commonplace in the context of building a data lake in AWS.

AWS offers AWS Glue, which is a service that helps author and deploy ETL jobs. AWS Glue is a fully managed extract, transform, and load service that makes it easy for customers to prepare and load their data for analytics. Other AWS Services also can be used to implement and manage ETL jobs. They include: AWS Database Migration Service (AWS DMS), Amazon EMR (using the Steps API), and even Amazon Athena.

The challenge of orchestrating an ETL workflow

How can we orchestrate an ETL workflow that involves a diverse set of ETL technologies? AWS Glue, AWS DMS, Amazon EMR, and other services support Amazon CloudWatch Events, which we could use to chain ETL jobs together. Amazon S3, the central data lake store, also supports CloudWatch Events. But relying on CloudWatch Events alone means that there’s no single visual representation of the ETL workflow. Also, tracing the overall ETL workflow’s execution status and handling error scenarios can become a challenge.

In this post, I show you how to use AWS Step Functions and AWS Lambda for orchestrating multiple ETL jobs involving a diverse set of technologies in an arbitrarily-complex ETL workflow. AWS Step Functions is a web service that enables you to coordinate the components of distributed applications and microservices using visual workflows. You build applications from individual components. Each component performs a discrete function, or task, allowing you to scale and change applications quickly.

Let’s look at an example ETL workflow.

Example datasets for the ETL workflow

For our example, we’ll use two publicly available Amazon QuickSight datasets.

The first dataset is a sales pipeline dataset, which contains a list of slightly more than 20,000 sales opportunity records for a fictitious business. Each record has fields that specify:

  • A date, potentially when an opportunity was identified.
  • The salesperson’s name.
  • A market segment to which the opportunity belongs.
  • Forecasted monthly revenue.

The second dataset is an online marketing metrics dataset. This dataset contains records of marketing metrics, aggregated by day. The metrics describe user engagement across various channels, such as websites, mobile, and social media, plus other marketing metrics. The two datasets are unrelated, but for the purpose of this example we’ll assume that they are related.

The example ETL workflow requirements

Imagine there’s a business user who needs to answer questions based on both datasets. Perhaps the user wants to explore the correlations between online user engagement metrics on the one hand, and forecasted sales revenue and opportunities generated on the other hand. The user engagement metrics include website visits, mobile users, and desktop users.

The steps in the ETL workflow are:

Process the Sales dataset (PSD). Read the Sales dataset. Group records by day, aggregating the Forecasted Monthly Revenue field. Rename fields to replace white space with underscores. Output the intermediary results to Amazon S3 in compressed Parquet format. Overwrite any previous output.

Process the Marketing dataset (PMD). Read the Marketing dataset. Rename fields to replace white space with underscores. Send the intermediary results to Amazon S3 in compressed Parquet format. Overwrite any previous output.

Join Marketing and Sales datasets (JMSD). Read the output of the processed Sales and Marketing datasets. Perform an inner join of both datasets on the date field. Sort in ascending order by date. Send the final joined dataset to Amazon S3, and overwrite any previous output.

So far, this ETL workflow can be implemented with AWS Glue, with the ETL jobs being chained by using job triggers. But you might have other requirements outside of AWS Glue that are part of your end-to-end data processing workflow, such as the following:

  • Both Sales and Marketing datasets are uploaded to an S3 bucket at random times in an interval of up to a week. The PSD job should start as soon as the Sales dataset file is uploaded. The PMD job should start as soon as the Marketing dataset file is uploaded. Parallel ETL jobs can start and finish anytime, but the final JMSD job can start only after all parallel ETL jobs are complete.
  • In addition to PSD and PMD jobs, the orchestration must support more parallel ETL jobs in the future that contribute to the final dataset aggregated by the JMSD job. The additional ETL jobs could be managed by AWS services, such as AWS Database Migration Service, Amazon EMR, Amazon Athena or other non-AWS services.

The data engineer takes these requirements and builds the following ETL workflow chart.

To fulfill the requirements, we need a generic ETL orchestration solution. A serverless solution is even better.

The ETL orchestration architecture and events

Let’s see how we can orchestrate an ETL workflow to fulfill the requirements using AWS Step Functions and AWS Lambda. The following diagram shows the ETL orchestration architecture and the main flow of events.

The main flow of events starts with an AWS Step Functions state machine. This state machine defines the steps in the orchestrated ETL workflow. A state machine can be triggered through Amazon CloudWatch based on a schedule, through the AWS Command Line Interface (AWS CLI), or using the various AWS SDKs in an AWS Lambda function or some other execution environment.

As the state machine execution progresses, it invokes the ETL jobs. As shown in the diagram, the invocation happens indirectly through intermediary AWS Lambda functions that you author and set up in your account. We’ll call this type of function an ETL Runner.

While the architecture in the diagram shows Amazon Athena, Amazon EMR, and AWS Glue, the accompanying code sample (aws-etl-orchestrator) includes a single ETL Runner, labeled AWS Glue Runner Function in the diagram. You can use this ETL Runner to orchestrate AWS Glue jobs. You can also follow the pattern and implement more ETL Runners to orchestrate other AWS services or non-AWS tools.

ETL Runners are invoked by activity tasks in Step Functions. Because of the way AWS Step Functions’ activity tasks work, ETL Runners need to periodically poll the AWS Step Functions state machine for tasks. The state machine responds by providing a Task object. The Task object contains inputs which enable an ETL Runner to run an ETL job.

As soon as an ETL Runner receives a task, it starts the respective ETL job. An ETL Runner maintains a state of active jobs in an Amazon DynamoDB table. Periodically, the ETL Runner checks the status of active jobs. When an active ETL job completes, the ETL Runners notifies the AWS Step Functions state machine. This allows the ETL workflow in AWS Step Functions to proceed to the next step.

An important question may come up. Why does an ETL Runner run independently from your Step Functions state machine and poll for tasks? Can’t we instead directly invoke an AWS Lambda function from the Step Functions state machine? Then can’t we have that function start and monitor an ETL job until it completes?

The answer is that AWS Lambda functions have a maximum execution duration per request of 300 seconds, or 5 minutes. For more information, see AWS Lambda Limits. ETL jobs typically take more than 5 minutes to complete. If an ETL Runner function is invoked directly, it will likely time out before the ETL job completes. Thus, we follow the long-running worker approach with activity tasks. The worker in this code sample – the ETL Runner – is an AWS Lambda function that gets triggered on a schedule using CloudWatch Events. If you want to avoid managing the polling schedule through CloudWatch Events, you can implement a polling loop in your ETL workflow’s state machine. Check the AWS Big Data blog post Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy for an example.

Finally, let’s discuss how we fulfill the requirement of waiting for Sales and Marketing datasets to arrive in an S3 bucket at random times. We implement these waits as two separate activity tasks: Wait for Sales Data and Wait for Marketing Data. A state machine halts execution when it encounters either of these activity tasks. A CloudWatch Events event handler is then configured on an Amazon S3 bucket, so that when Sales or Marketing dataset files are uploaded to the bucket, Amazon S3 invokes an AWS Lambda function. The Lambda function then signals the waiting state machine to exit the activity task corresponding to the uploaded dataset. The subsequent ETL job is then invoked by the state machine.

Set up your own ETL orchestration

The aws-etl-orchestrator GitHub repository provides source code you can customize to set up the ETL orchestration architecture in your AWS account. The following steps show what you need to do to start orchestrating your ETL jobs using the architecture shown in this post:

  1. Model the ETL orchestration workflow in AWS Step Functions
  2. Build your ETL Runners (or use an existing AWS Glue ETL Runner)
  3. Customize AWS CloudFormation templates and create stacks
  4. Invoke the ETL orchestration state machine
  5. Upload sample Sales and Marketing datasets to Amazon S3

Model the ETL orchestration workflow in AWS Step Functions.  Use AWS Step Functions to model the ETL workflow described in this post as a state machine. A state machine in Step Functions consists of a set of states and the transitions between these states. A state machine is defined in Amazon States Language, which is a JSON-based notation. For a few examples of state machine definitions, see Sample Projects.

The following snapshot from the AWS Step Functions console shows our example ETL workflow modeled as a state machine. This workflow is what we provide you in the code sample.

When you start an execution of this state machine, it will branch to run two ETL jobs in parallel: Process Sales Data (PSD) and Process Marketing Data (PMD). But, according to the requirements, both ETL jobs should not start until their respective datasets are uploaded to Amazon S3. Hence, we implement Wait activity tasks before both PSD and PMD. When a dataset file is uploaded to Amazon S3, this triggers an AWS Lambda function that notifies the state machine to exit the Wait states. When both PMD and PSD jobs are successful, the JMSD job runs to produce the final dataset.

Finally, to have this ETL workflow execute once per week, you will need to configure a state machine execution to start once per week using a CloudWatch Event.

Build your ETL Runners (or use an existing AWS Glue ETL Runner)The code sample includes an AWS Glue ETL Runner. For simplicity, we implemented the ETL workflow using only AWS Glue jobs. However, nothing prevents you from using a different ETL technology to implement PMD or PSD jobs. You’ll need to build an ETL Runner for the technology that follows the AWS Glue ETL Runner example.

Customize AWS CloudFormation templates and create stacks. The sample published in the aws-etl-orchestrator repository includes three separate AWS CloudFormation templates. We organized resources into three templates following AWS CloudFormation best practices. The three resource groups are logically distinct and likely to have separate lifecycles and ownerships. Each template has an associated AWS CloudFormation parameters file (“*-params.json” files). Parameters in those files must be customized. The details about the three AWS CloudFormation templates are as follows:

  1. A template responsible for setting up AWS Glue resources.For our example ETL workflow, the sample template creates three AWS Glue jobs: PSD, PMD, and JMSD. The scripts for these jobs are pulled by AWS CloudFormation from an Amazon S3 bucket that you own.
  2. A template where the AWS Step Functions state machine is defined.The state machine definition in Amazon States Language is embedded in a StateMachine resource within the Step Functions template.
  3. A template that sets up resources required by the ETL Runner for AWS Glue.The AWS Glue ETL Runner is a Python script that is written to be run as an AWS Lambda function.

Invoke the ETL orchestration state machine. Finally, it is time to start a new state machine execution in AWS Step Functions. For our ETL example, the AWS CloudFormation template creates a state machine named MarketingAndSalesETLOrchestrator. You can start an execution from the AWS Step Functions console, or through an AWS CLI command. When you start an execution, the state machine will immediately enter Wait for Data states, waiting for datasets to be uploaded to Amazon S3.

Upload sample Sales and Marketing datasets to Amazon S3

Upload datasets provided to the S3 bucket that you specified in the code sample configuration. This uploaded datasets signal the state machine to continue execution.

The state machine may take a while to complete execution. You can monitor progress in the AWS Step Functions console. If the execution is successful, the output shown in the following diagram appears.

Congratulations! You’ve orchestrated the example ETL workflow to a successful completion.

Handling failed ETL jobs

What if a job in the ETL workflow fails? In such a case, there are error-handling strategies available to the ETL workflow developer, from simply notifying an administrator, to fully undoing the effects of the previous jobs through compensating ETL jobs. Detecting and responding to a failed ETL job can be implemented using the AWS Step Functions’ Catch mechanism. For more information, see Handling Error Conditions Using a State Machine. In the sample state machine, errors are handled by a do-nothing Pass state.

Try it out. Stop any of the example ETL workflow’s jobs while executing through the AWS Glue console or the AWS CLI. You’ll notice the state machine transitioning to the ETL Job Failed Fallback state.

Conclusion

In this post, I showed you how to implement your ETL logic as an orchestrated workflow. I presented a serverless solution for ETL orchestration that allows you to control ETL job execution using AWS Step Functions and AWS Lambda.  You can use the concepts and the code described in this post to build arbitrarily complex ETL state machines.

For more information and to download the source code, see the aws-etl-orchestrator GitHub repository. If you have questions about this post, send them our way in the Comments section below.


Additional Reading

If you found this post useful, be sure to check out Build a Data Lake Foundation with AWS Glue and Amazon S3 and Orchestrate Apache Spark applications using AWS Step Functions and Apache Livy.

 


About the Author

Moataz Anany is a senior solutions architect with AWS. He enjoys partnering with customers to help them leverage AWS and the cloud in creative ways. He dedicates most of his spare time to his wife and little ones. The rest is spent building and breaking things in side projects.

 

 

 

 

Build a blockchain analytic solution with AWS Lambda, Amazon Kinesis, and Amazon Athena

Post Syndicated from Jonathan Shapiro-Ward original https://aws.amazon.com/blogs/big-data/build-a-blockchain-analytic-solution-with-aws-lambda-amazon-kinesis-and-amazon-athena/

There are many potential benefits to using a blockchain. A blockchain is a distributed data structure that can record transactions in a verifiable and immutable manner. Depending upon the use case, there are opportunities for reducing costs, improving speed and efficiency, stronger regulatory compliance, and greater resilience and scalability.

Early adopters of the blockchain are finding innovative ways of using it in such areas as finance, healthcare, eGovernment, and non-profit organizations. The blockchain was even initially pioneered as the key technology behind the cryptocurrency Bitcoin.

Many of the opportunities to use blockchains arise from their design. They are typically large-scale distributed systems that often consist of many thousands of nodes. It can be challenging to gain insight into user activity, events, anomalies, and other state changes on a blockchain. But AWS analytics services provide the ability to analyze blockchain applications and provide meaningful information about these areas.

Walkthrough

In this post, we’ll show you how to:

You can readily adapt this Ethereum deployment and the blockchain analytics for use with a wide range of blockchain scenarios.

Prerequisites

This post assumes that you are familiar with AWS and Ethereum. The following documentation provides background reading to help you perform the steps described in this post:

Additionally, it’s useful to be familiar with Amazon Kinesis, AWS Lambda, Amazon QuickSight, and Amazon Athena to get the most out of this blog post. For more information, see:

For an introduction to serverless computing with AWS Lambda, see Introduction to AWS Lambda – Serverless Compute on Amazon Web Services.

Blockchain 101

Before we proceed with the solution in this post, we’ll provide a short discussion regarding blockchains and Ethereum, which is the blockchain implementation used in this solution.

In short, blockchains are a means for achieving consensus. The motivation behind blockchain was in allowing the Bitcoin network to agree upon the order of financial transactions while resisting vulnerability, malicious threats, and omission errors. Other blockchain implementations are used to agree upon the state of generic computation. This is achieved through a process called mining, whereby an arbitrary computational problem is solved to make falsifying transactions computationally challenging.

Ethereum is a major blockchain implementation. Unlike Bitcoin and other earlier blockchain systems, Ethereum is not solely a cryptocurrency platform, though it does have its own cryptocurrency called Ether. Ethereum extends the blockchain concept by building an Ethereum virtual machine (VM) that is Turing-complete on top of the blockchain. This allows for the development of smart contracts, which are programs that run on the blockchain. The appeal of smart contracts is the ability to translate natural language contracts, such as insurance contracts, into code that can run on Ethereum. This allows contractual agreements to be built without the need for a centralized authority, such as a bank or notary, potentially decreasing time to market and reducing costs.

An overview of the blockchain solution

The following is an overview of the solution provided in this post. The solution consists of:

  • An Ethereum blockchain running on Amazon Elastic Container Service (Amazon ECS) via the AWS Blockchain Template
  • An Application Load Balancer, providing access to the various Ethereum APIs.
  • A Lambda function, which deploys a smart contract to the blockchain
  • A Lambda function, which runs transactions against the smart contract
  • A Lambda function, which listens for events on the smart contract and pushes those events to Amazon Kinesis
  • An Amazon DynamoDB table used to share the blockchain state between Lambda functions
  • A blockchain analytics pipeline that uses Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, Amazon Kinesis Data Streams, and Amazon Athena.
  • An analytics dashboard built using Amazon QuickSight

The solution is presented in the following architectural diagram:

As shown, the solution is comprised of two main portions:

  • The blockchain hosted on Amazon Elastic Compute Cloud (Amazon EC2) and the Lambda functions that interact with the blockchain.
  • The analytics pipeline based around Kinesis that consumes data from the blockchain.

The AWS CloudFormation template we provide deploys the left side of that architecture diagram up to and including Kinesis Data Streams. It is the right side of the diagram that we’re going to build in this post.

Create the initial resources

  1. First, download the AWS CloudFormation template from: https://s3.amazonaws.com/blockchainblog/blockchainblogpost.template
  2. Use AWS CloudFormation to launch the template. The AWS CloudFormation stack deploys a virtual private cloud (VPC), two subnets, and a series of Lambda functions, which interact with the blockchain. This provides a foundation on which to build the analytics pipeline. You can either provide your own CIDR blocks or use the default parameters. Each subnet must have at least eight IP addresses.
  3. Deploy the AWS Blockchain Templates. The AWS Blockchain Templates make it efficient to deploy Ethereum and Hyperledger blockchain networks on AWS. In this case, we’re deploying an Ethereum network into the VPC created by the AWS CloudFormation template in step 2.
  4. Launch the following AWS CloudFormation template: https://aws-blockchain-templates-us-east-1.s3.us-east-1.amazonaws.com/ethereum/templates/latest/ethereum-network.template.yaml This template requires a number of parameters:
  • Set the Initial List of Accounts to the following predefined accounts the Lambda functions use:
0x34db0A1D7FE9D482C389b191e703Bf0182E0baE3,0xB3Bbce5d76aF28EcE4318c28479565F802f96808,0x877108a8825222cf669Ca9bFA3397D6973fE1640,0xb272056E07C94C7E762F642685bE822df6d08D03,0x0c00e92343f7AA255e0BBC17b21a02f188b53D6C,0xaDf205a5fcb846C4f8D5e9f5228196e3c157e8E0,0x1373a92b9BEbBCda6B87a4B5F94137Bc64E47261,0x9038284431F878f17F4387943169d5263eA55650,0xe1cd3399F6b0A1Ef6ac8Cebe228D7609B601ca8a,0x0A67cCC3FD9d664D815D229CEA7EF215d4C00A0a
  • In VPC Network Configuration:
    • Set the VPC ID to the blockchainblog VPC created by the first AWS CloudFormation template.
    • Add the blockchainblog-public subnet to the list of subnets to use.
    • Add blockchainblog-public and blockchainblog-private to the list of ALB subnets.
  • In Security Configuration:
    • Choose your Amazon EC2 key pair.
    • Provide the blockchainblog security group.
    • Provide the blockchainblog-ec2-role for the Amazon EC2 role.
    • Provide the blockchainblog-ecs-role for the Amazon ECS role.
    • Set the ALB security group to the blockchainblog security group.
  1. Leave all other variables unchanged, create the template, and wait for all resources to be deployed. This deploys an Ethereum blockchain, starts the mining process, and exposes the Web3 API through an Application Load Balancer.

After the resources are created, move on to deploying the smart contract.

Deploy a smart contract

To use the blockchain, deploy a smart contract to it. This smart contract is not complex — it provides the functions for holding an auction.

The auction contract represents a public auction, which is an auction whereby all parties involved can be identified. The user offering the item to be auctioned deploys the contract and other users can bid using the contract. The auction is considered completed after a pre-defined number of blocks have been mined. When the auction ends, losing bids can then be withdrawn and the funds returned to the bidders. Later, the user who created the auction can withdraw the funds of the winning bid.

Note that the contract does nothing to ensure that the winner receives the commodity in question. In fact, this contract is entirely separate from what is being auctioned. The contract could be extended to provide this functionality, but for the scope of this post, we’re keeping the contract simple.

The auction contract is located at https://s3.amazonaws.com/blockchainblog/Auction.sol.

Examine the auction contract

The auction contract is automatically pulled by our Lambda function and deployed to our blockchain. This contract is written in a domain-specific language called Solidity. The syntax is inspired by the C family of languages; however, unlike C it doesn’t compile to object code. Instead, it compiles to bytecode, which runs on the Ethereum VM.

This smart contract has two functions: bid and withdraw. Bid allows users to bid in the auction, and withdraw allows users to withdraw funds from the contract when the auction has finished. The auction owner can obtain the winning bid and the losers can recoup their funds. Note that the data structure BidEvent is similar to a C struct, and is how we’ll trigger Solidity events. The Solidity events are captured and sent to our analytics pipeline.

Now it’s time to deploy our smart contract, run transactions against it, and listen for events by using pre-built Lambda functions. The following diagram shows the interactions of these Lambda functions:

DeployContract is a Lambda function created by the AWS CloudFormation stack that we deployed earlier. This function takes our Solidity source code from the Amazon Simple Storage Service (Amazon S3) bucket, compiles it to EVM bytecode using the solc compiler, deploys that to our blockchain, and stores the blockchain address of the contract in a DynamoDB table. The function interacts with the Ethereum blockchain on our Amazon EC2 instance via the web3 1.0.0 API. You can see the source code for this function at https://s3.amazonaws.com/blockchainblog/DeployContract.zip.

After deploying the AWS CloudFormation template, wait about 5 minutes before deploying the contract to give the blockchain time to start the mining process. The majority of this time is the blockchain generating the initial directed acyclic graph (DAG).

DeployContract can be invoked in the Lambda console by testing it with an empty test event. Before invoking the function, provide it with the address of the blockchain. To do this, locate the output of the AWS Blockchain Template and obtain the EthJSONRPCURL value from the output. Later, provide this value in an environment variable named BLOCKCHAIN_HOST, for the DeployContract function, as shown in the following example:

Now invoke the DeployContract function. It should print various states, including the blockchain address of the deployed contract and the JSON ABI of the contract. After the function completes, the contract is deployed to our private blockchain and available for use. If the function produces an error, it’s likely because the blockchain has not yet been initialized. Wait a few minutes after creating the AWS CloudFormation template before invoking DeployContract.

Execute Transactions

To generate some transaction data to analyze, we must first have some transactions. To get transactions, we are using a second Lambda function named ExecuteTransactions.

In the smart contract, an event is specified at the start of the file. Events are a useful mechanism in Solidity that can be used as a callback to code outside of the blockchain. The final Lambda function, ListenForTransactions, listens for events occurring against the contract and then sends those events to Kinesis for analysis.

Ethereum currently does not support sending events directly to Kinesis. So we’ll run the ListenForTransactions function to pull events from the blockchain. We can do this manually by invoking the function with an empty test event. ListenForTransactions pulls all events from the blockchain since the last time it was run. However, if we wanted transactions to be pulled from the blockchain in real time, we’d want the function running perpetually. In the following section, you can optionally schedule the Lambda function to run periodically or regularly. Once again, provide the address of the Ethereum RPC endpoint via the BLOCKCHAIN_HOST environment variable, per DeployContract for both ListenForTransactions and for ExecuteTransactions.

Optional: Use an Amazon CloudWatch event to schedule ListenForTransactions

To have ListenForTransactions run continually, we’ll use Amazon CloudWatch Events as a trigger for our Lambda function. In the Amazon CloudWatch console, choose the Triggers tab, and add a new Amazon CloudWatch Events trigger, with the schedule pattern rate(5). This ensures that the function is continually running and thus ensure that all events are sent to Kinesis as soon as they occur. This allows us to do near real-time processing of events against our smart contract. However, if we want to reduce costs, or if real-time analytics isn’t a key objective, we could run our ListenForTransactions function periodically. Running the function periodically fetches all events since the last time it was run; however, this is less timely than having it wait for events to occur.

To configure a CloudWatch event to trigger ListenForTransactions:

  1. In the designer section on the Lambda console for ListenForTransactions, select CloudWatch events
  2. Click on configure and scroll down to the CloudWatch event configuration
  3. Select Create New Rule from the rule dropdown menu
  4. Name the rule and provide a description
  5. Select schedule expression
  6. Provide the expression: rate(5)
  7. Select enable trigger
  8. Click add

After the function is scheduled, we can then generate some events against our contract. We can run ExecuteTransactions, with an empty test event. We can do this any number of times to generate more transactions to send to our analytics pipeline. ExecuteTransactions produces batches of 10 transactions at a time.

Analyze Transactions with Kinesis Data Analytics

Because our Lambda function is listening to events on our smart contract, all voting activity is sent to a Kinesis Data Stream that was already by an AWS CloudFormation called BlockchainBlogEvents.

Right now, all events go to Kinesis but no further. We’ll persist our events for analysis with Athena later on. To do so, navigate to the Kinesis Data Streams console and choose the BlockchainBlog stream that was created for you.

  1. In the upper right-hand corner, choose Connect to Firehose. This forwards all events to a Kinesis Data Firehose stream, which delivers them to an S3 bucket.
  2. Name the delivery stream choose Next, and don’t enable record transformation.
  3. Provide an S3 bucket in which to store your results. Remember so you can use it later with Athena.

All events coming from the blockchain should now be persisted to Amazon S3.

Now that our events are being persisted, we’ll use Kinesis Data Analytics to perform a series of real-time analytics on the Kinesis Data Stream. Later, we’ll perform batch analytics on the data stored in Amazon S3 via Athena.

First, look at Kinesis Data Analytics. Our ListenForTransactions Lambda function sends a message to a stream each time a transaction is run against our Auction smart contract.

The message is a JSON object. It contains the address of the bidder who initiated the transaction, how much they bid, the contract they bid on, when the transaction was run, and which block the transaction was added to.

Kinesis Data Analytics processes each incoming message to the stream and lets us perform analysis over the stream. In this example, we use Kinesis Data Analytics to:

  1. Calculate the amount of Ether being bid in each block within a sliding window of 15 seconds.
  2. Detect the number of unique bidders occurring within a sliding window of 15 seconds.

Our sliding window is 15 seconds because this is the Ethereum target block time. This is the measure of how long it takes to add a block to the blockchain. By setting the sliding window to 15 seconds, we can gain insight into transactions occurring within the mining interval. We could also set the window to be longer to learn how it pertains to our auction application.

To start with our real time analytics, we must create a Kinesis data analytics application. To do so:

  1. Navigate to the Kinesis data analytics application console on the AWS Management Console.
  2. Create a new Kinesis data analytics application with appropriate name and description, then specify the pre-made blockchainblog Kinesis Data Stream as the source.
  3. Run ExecuteTransactions to send a set of transactions to Kinesis and automatically discover the schema.
  4. Open the SQL editor for the application.

Next, we’re going to add SQL to our Kinesis data analytics application to find out the amount of Ether being sent in each block. This includes all bids sent to the contract and all funds withdrawn from a completed auction.

Copy the following SQL, paste it into the SQL editor in Kinesis Data Analytics, then execute it.

CREATE OR REPLACE STREAM "SPEND_PER_BLOCK_STREAM" (block INTEGER, spend INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "SPEND_PER_BLOCK_STREAM"

SELECT STREAM "Block", SUM("Amount") AS block_sum
FROM "SOURCE_SQL_STREAM_001"
GROUP BY "Block", STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '15' SECOND);

This simple piece of SQL provides some insight into our smart contract. The output of SPEND_PER_BLOCK_STREAM yields the block number and the volume of funds, from our contract, in that block. This output explains how much cryptocurrency is spent in relation to our smart contract and when it’s spent.

Make sure that there is data for the Kinesis data analytics application to process by running the ExecuteTransactions and ListenForTransactions functions. You can run these functions either with an Amazon CloudWatch event or manually.

Now, we’ll modify our application to detect the number of unique bidders placing bids within a 15-second window. This is about the time required to add a block to the blockchain. To do so, add the following code to our Kinesis data analytics application:

CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
    NUMBER_OF_DISTINCT_ITEMS BIGINT
);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
   INSERT INTO "DESTINATION_SQL_STREAM" 
      SELECT STREAM * 
      FROM TABLE(COUNT_DISTINCT_ITEMS_TUMBLING(
          CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
            'Bidder',                                     
            10                                                 
      )
);

The resulting output of this code is the count of unique bidders occurring within the 15-second window. This is useful in helping us understand who is running transactions against our contract. For example, if it’s a large number of blockchain addresses responsible for the bids or if it is a smaller number of addresses bidding.

Finally, as denoted in our architectural diagram, we can add a destination stream to our Kinesis data analytics application. We’ll send the output of our application to Kinesis Data Firehose to persist the results. Then we’ll enable the resulting data to be used in batch analytics with Athena or other tools. To send the output, create a destination for the analytics output stream and point it at a Kinesis Data Firehose stream.

This section has shown real time analytics that can be run on blockchain transactions. The next section shows using Athena to run batch analytics against our stored transactions.

Analyze Transactions with Athena

In this section, we’ll create a table in Athena so we can query our transaction data in batch form.

  1. Create a database in Athena and then create a table, specifying the location that you provided earlier to Kinesis Data Firehose. Your configuration should look like the following example:

  1. Choose Next, choose JSON as the input format, then click next.
  2. In Columns, provide the data types for each of our columns. Choose Bulk add columns, then copy and paste the following column information:
Block int, Blockhash string, Bidder string, Maxbidder string, Contractowner string, Amount int, Auction string, EventTimestamp string
Column Description
Block The block that this event pertains to.
Auction Which auction smart contract the event pertains to
ContractOwner The address of the owner of the contract
Bidder The address of the bidder
BlockHash The SHA hash of the block
Address The address of the transaction
MaxBidder The address of the currently winning bidder (current to when the event was generated)
Amount The amount of the bid

 

  1. Click next and then create the table.

After you configure Athena, you can then explore the data. First, look at whether the user who created the auction has bid in their own auction. Most auctions typically disallow this bidding, but our smart contract doesn’t prohibit this. We could solve this by modifying the contract, but for now let’s see if we can detect this via Athena. Run the following query:

select * from events where contractowner=bidder

The result should resemble the following:

You should see at least one instance where the contract owner has bid on their own contract. Note that the Lambda function running transactions does this at random. Bidding on one’s own contract could be permissible or it might violate the terms of the auction. In that scenario, we can easily detect this violation.

This scenario is an example of using analytics to detect and enforce compliance in a blockchain-backed system. Compliance remains an open question for many blockchain users, as detecting regulatory and compliance issues involving smart contracts often involves significant complexity. Analytics is one way to gain insight and answer these regulatory questions.

Useful queries for analyzing transactions

This section provides some other queries that we can use to analyze our smart contract transactions.

Find the number of transactions per block

SELECT block, COUNT(amount) as transactions FROM events Group By block    

This query yields results similar to the following:

Find the winning bid for each auction

SELECT DISTINCT t.auction, t.amount
    FROM events t
        INNER JOIN (SELECT auction, MAX(amount) AS maxamount
                        FROM events
                        GROUP BY auction) q
            ON t.auction = q.auction
                AND t.amount = q.maxamount

This query yields a set of results such as the following:

The results show each auction that you’ve created on the blockchain and the resulting highest bid.

Visualize queries with Amazon QuickSight

Instead of querying data in plain SQL, it is often beneficial to have a graphical representation of your analysis. You can do this with Amazon QuickSight, which can use Athena as a data source. As a result, with little effort we can build a dashboard solution on top of what we’ve already built. We’re going to use Amazon QuickSight to visualize data stored in Amazon S3, via Athena.

In Amazon QuickSight, we can create a new data source and use the Athena database and table that we created earlier.

To create a new data source

  1. Open the Amazon QuickSight console, then choose New Dataset.
  2. From the list of data sources, choose Athena, then name your data source.

  1. Choose the database and table in Athena that you created earlier.

  1. Import the data into SPICE. SPICE is instrumental for faster querying and visualization of data, without having to go directly to the source data. For more information about SPICE, see the Amazon QuickSight Documentation.
  2. Choose Visualize to start investigating the data.

With Amazon QuickSight, we can visualize the behavior of our simulated blockchain users. We’ll choose Amount as our measurement and Auction as our dimension from teh QuickSight side pane. This shows us how much ether has been bid in each auction. Doing so yields results similar to the following:

The amount depends on the number of times you ran the ExecuteTransactions function.

If we look at MaxBidder, we see a pie chart. In the chart, we can see which blockchain address (user) is most often our highest bidder. This looks like the following:

This sort of information can be challenging to obtain from within a blockchain-based application. But in Amazon QuickSight, with our analytics pipeline, getting the information can be easier.

Finally, we can look at the mining time in Amazon QuickSight by choosing Eventtimestamp as the x-axis, choosing block as the y-axis, and using the minimum aggregate function. This produces a line graph that resembles the following:

The graph shows that we start at around block 9200 and have a steady rate of mining occurring. This is roughly consistent with around a 15 to 20 second block mining time. Note that the time stamp is in Unix time.

This section has shown analysis that can be performed on a blockchain event to understand the behavior of both the blockchain and the smart contracts deployed to it. Using the same methodology, you can build your own analytics pipelines that perform useful analytics that shed light on your blockchain-backed applications.

Conclusion

Blockchain is an emerging technology with a great deal of potential. AWS analytics services provide a means to gain insight into blockchain applications that run over thousands of nodes and deal with millions of transactions. This allows developers to better understand the complexities of blockchain applications and aid in the creation of new applications. Moreover, the analytics portion can all be done without provisioning servers, reducing the need for managing infrastructure. This allows you to focus on building the blockchain applications that you want.

Important: Remember to destroy the stacks created by AWS CloudFormation. Also delete the resources you deployed, including the scheduled Lambda function that listens for blockchain events.


Additional Reading

If you found this post useful, be sure to check out Analyze Apache Parquet optimized data using 10 visualizatinos to try in Amazon QuickSight with sample data and Analyzing Bitcoin Data: AWS CloudFormation Support for AWS Glue.

 


About the Author

Dr. Jonathan Shapiro-Ward is an AWS Solutions Architect based in Toronto. He helps customers across Canada to build industry leading cloud solutions. He has a background in distributed systems and big data and holds a PhD from the University of St Andrews.

 

 

Amazon Neptune Generally Available

Post Syndicated from Randall Hunt original https://aws.amazon.com/blogs/aws/amazon-neptune-generally-available/

Amazon Neptune is now Generally Available in US East (N. Virginia), US East (Ohio), US West (Oregon), and EU (Ireland). Amazon Neptune is a fast, reliable, fully-managed graph database service that makes it easy to build and run applications that work with highly connected datasets. At the core of Neptune is a purpose-built, high-performance graph database engine optimized for storing billions of relationships and querying the graph with millisecond latencies. Neptune supports two popular graph models, Property Graph and RDF, through Apache TinkerPop Gremlin and SPARQL, allowing you to easily build queries that efficiently navigate highly connected datasets. Neptune can be used to power everything from recommendation engines and knowledge graphs to drug discovery and network security. Neptune is fully-managed with automatic minor version upgrades, backups, encryption, and fail-over. I wrote about Neptune in detail for AWS re:Invent last year and customers have been using the preview and providing great feedback that the team has used to prepare the service for GA.

Now that Amazon Neptune is generally available there are a few changes from the preview:

Launching an Amazon Neptune Cluster

Launching a Neptune cluster is as easy as navigating to the AWS Management Console and clicking create cluster. Of course you can also launch with CloudFormation, the CLI, or the SDKs.

You can monitor your cluster health and the health of individual instances through Amazon CloudWatch and the console.

Additional Resources

We’ve created two repos with some additional tools and examples here. You can expect continuous development on these repos as we add additional tools and examples.

  • Amazon Neptune Tools Repo
    This repo has a useful tool for converting GraphML files into Neptune compatible CSVs for bulk loading from S3.
  • Amazon Neptune Samples Repo
    This repo has a really cool example of building a collaborative filtering recommendation engine for video game preferences.

Purpose Built Databases

There’s an industry trend where we’re moving more and more onto purpose-built databases. Developers and businesses want to access their data in the format that makes the most sense for their applications. As cloud resources make transforming large datasets easier with tools like AWS Glue, we have a lot more options than we used to for accessing our data. With tools like Amazon Redshift, Amazon Athena, Amazon Aurora, Amazon DynamoDB, and more we get to choose the best database for the job or even enable entirely new use-cases. Amazon Neptune is perfect for workloads where the data is highly connected across data rich edges.

I’m really excited about graph databases and I see a huge number of applications. Looking for ideas of cool things to build? I’d love to build a web crawler in AWS Lambda that uses Neptune as the backing store. You could further enrich it by running Amazon Comprehend or Amazon Rekognition on the text and images found and creating a search engine on top of Neptune.

As always, feel free to reach out in the comments or on twitter to provide any feedback!

Randall

Analyze Apache Parquet optimized data using Amazon Kinesis Data Firehose, Amazon Athena, and Amazon Redshift

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/analyzing-apache-parquet-optimized-data-using-amazon-kinesis-data-firehose-amazon-athena-and-amazon-redshift/

Amazon Kinesis Data Firehose is the easiest way to capture and stream data into a data lake built on Amazon S3. This data can be anything—from AWS service logs like AWS CloudTrail log files, Amazon VPC Flow Logs, Application Load Balancer logs, and others. It can also be IoT events, game events, and much more. To efficiently query this data, a time-consuming ETL (extract, transform, and load) process is required to massage and convert the data to an optimal file format, which increases the time to insight. This situation is less than ideal, especially for real-time data that loses its value over time.

To solve this common challenge, Kinesis Data Firehose can now save data to Amazon S3 in Apache Parquet or Apache ORC format. These are optimized columnar formats that are highly recommended for best performance and cost-savings when querying data in S3. This feature directly benefits you if you use Amazon Athena, Amazon Redshift, AWS Glue, Amazon EMR, or any other big data tools that are available from the AWS Partner Network and through the open-source community.

Amazon Connect is a simple-to-use, cloud-based contact center service that makes it easy for any business to provide a great customer experience at a lower cost than common alternatives. Its open platform design enables easy integration with other systems. One of those systems is Amazon Kinesis—in particular, Kinesis Data Streams and Kinesis Data Firehose.

What’s really exciting is that you can now save events from Amazon Connect to S3 in Apache Parquet format. You can then perform analytics using Amazon Athena and Amazon Redshift Spectrum in real time, taking advantage of this key performance and cost optimization. Of course, Amazon Connect is only one example. This new capability opens the door for a great deal of opportunity, especially as organizations continue to build their data lakes.

Amazon Connect includes an array of analytics views in the Administrator dashboard. But you might want to run other types of analysis. In this post, I describe how to set up a data stream from Amazon Connect through Kinesis Data Streams and Kinesis Data Firehose and out to S3, and then perform analytics using Athena and Amazon Redshift Spectrum. I focus primarily on the Kinesis Data Firehose support for Parquet and its integration with the AWS Glue Data Catalog, Amazon Athena, and Amazon Redshift.

Solution overview

Here is how the solution is laid out:

 

 

The following sections walk you through each of these steps to set up the pipeline.

1. Define the schema

When Kinesis Data Firehose processes incoming events and converts the data to Parquet, it needs to know which schema to apply. The reason is that many times, incoming events contain all or some of the expected fields based on which values the producers are advertising. A typical process is to normalize the schema during a batch ETL job so that you end up with a consistent schema that can easily be understood and queried. Doing this introduces latency due to the nature of the batch process. To overcome this issue, Kinesis Data Firehose requires the schema to be defined in advance.

To see the available columns and structures, see Amazon Connect Agent Event Streams. For the purpose of simplicity, I opted to make all the columns of type String rather than create the nested structures. But you can definitely do that if you want.

The simplest way to define the schema is to create a table in the Amazon Athena console. Open the Athena console, and paste the following create table statement, substituting your own S3 bucket and prefix for where your event data will be stored. A Data Catalog database is a logical container that holds the different tables that you can create. The default database name shown here should already exist. If it doesn’t, you can create it or use another database that you’ve already created.

CREATE EXTERNAL TABLE default.kfhconnectblog (
  awsaccountid string,
  agentarn string,
  currentagentsnapshot string,
  eventid string,
  eventtimestamp string,
  eventtype string,
  instancearn string,
  previousagentsnapshot string,
  version string
)
STORED AS parquet
LOCATION 's3://your_bucket/kfhconnectblog/'
TBLPROPERTIES ("parquet.compression"="SNAPPY")

That’s all you have to do to prepare the schema for Kinesis Data Firehose.

2. Define the data streams

Next, you need to define the Kinesis data streams that will be used to stream the Amazon Connect events.  Open the Kinesis Data Streams console and create two streams.  You can configure them with only one shard each because you don’t have a lot of data right now.

3. Define the Kinesis Data Firehose delivery stream for Parquet

Let’s configure the Data Firehose delivery stream using the data stream as the source and Amazon S3 as the output. Start by opening the Kinesis Data Firehose console and creating a new data delivery stream. Give it a name, and associate it with the Kinesis data stream that you created in Step 2.

As shown in the following screenshot, enable Record format conversion (1) and choose Apache Parquet (2). As you can see, Apache ORC is also supported. Scroll down and provide the AWS Glue Data Catalog database name (3) and table names (4) that you created in Step 1. Choose Next.

To make things easier, the output S3 bucket and prefix fields are automatically populated using the values that you defined in the LOCATION parameter of the create table statement from Step 1. Pretty cool. Additionally, you have the option to save the raw events into another location as defined in the Source record S3 backup section. Don’t forget to add a trailing forward slash “ / “ so that Data Firehose creates the date partitions inside that prefix.

On the next page, in the S3 buffer conditions section, there is a note about configuring a large buffer size. The Parquet file format is highly efficient in how it stores and compresses data. Increasing the buffer size allows you to pack more rows into each output file, which is preferred and gives you the most benefit from Parquet.

Compression using Snappy is automatically enabled for both Parquet and ORC. You can modify the compression algorithm by using the Kinesis Data Firehose API and update the OutputFormatConfiguration.

Be sure to also enable Amazon CloudWatch Logs so that you can debug any issues that you might run into.

Lastly, finalize the creation of the Firehose delivery stream, and continue on to the next section.

4. Set up the Amazon Connect contact center

After setting up the Kinesis pipeline, you now need to set up a simple contact center in Amazon Connect. The Getting Started page provides clear instructions on how to set up your environment, acquire a phone number, and create an agent to accept calls.

After setting up the contact center, in the Amazon Connect console, choose your Instance Alias, and then choose Data Streaming. Under Agent Event, choose the Kinesis data stream that you created in Step 2, and then choose Save.

At this point, your pipeline is complete.  Agent events from Amazon Connect are generated as agents go about their day. Events are sent via Kinesis Data Streams to Kinesis Data Firehose, which converts the event data from JSON to Parquet and stores it in S3. Athena and Amazon Redshift Spectrum can simply query the data without any additional work.

So let’s generate some data. Go back into the Administrator console for your Amazon Connect contact center, and create an agent to handle incoming calls. In this example, I creatively named mine Agent One. After it is created, Agent One can get to work and log into their console and set their availability to Available so that they are ready to receive calls.

To make the data a bit more interesting, I also created a second agent, Agent Two. I then made some incoming and outgoing calls and caused some failures to occur, so I now have enough data available to analyze.

5. Analyze the data with Athena

Let’s open the Athena console and run some queries. One thing you’ll notice is that when we created the schema for the dataset, we defined some of the fields as Strings even though in the documentation they were complex structures.  The reason for doing that was simply to show some of the flexibility of Athena to be able to parse JSON data. However, you can define nested structures in your table schema so that Kinesis Data Firehose applies the appropriate schema to the Parquet file.

Let’s run the first query to see which agents have logged into the system.

The query might look complex, but it’s fairly straightforward:

WITH dataset AS (
  SELECT 
    from_iso8601_timestamp(eventtimestamp) AS event_ts,
    eventtype,
    -- CURRENT STATE
    json_extract_scalar(
      currentagentsnapshot,
      '$.agentstatus.name') AS current_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        currentagentsnapshot,
        '$.agentstatus.starttimestamp')) AS current_starttimestamp,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.firstname') AS current_firstname,
    json_extract_scalar(
      currentagentsnapshot,
      '$.configuration.lastname') AS current_lastname,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.username') AS current_username,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') AS               current_outboundqueue,
    json_extract_scalar(
      currentagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as current_inboundqueue,
    -- PREVIOUS STATE
    json_extract_scalar(
      previousagentsnapshot, 
      '$.agentstatus.name') as prev_status,
    from_iso8601_timestamp(
      json_extract_scalar(
        previousagentsnapshot, 
       '$.agentstatus.starttimestamp')) as prev_starttimestamp,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.firstname') as prev_firstname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.lastname') as prev_lastname,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.username') as prev_username,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.defaultoutboundqueue.name') as current_outboundqueue,
    json_extract_scalar(
      previousagentsnapshot, 
      '$.configuration.routingprofile.inboundqueues[0].name') as prev_inboundqueue
  from kfhconnectblog
  where eventtype <> 'HEART_BEAT'
)
SELECT
  current_status as status,
  current_username as username,
  event_ts
FROM dataset
WHERE eventtype = 'LOGIN' AND current_username <> ''
ORDER BY event_ts DESC

The query output looks something like this:

Here is another query that shows the sessions each of the agents engaged with. It tells us where they were incoming or outgoing, if they were completed, and where there were missed or failed calls.

WITH src AS (
  SELECT
     eventid,
     json_extract_scalar(currentagentsnapshot, '$.configuration.username') as username,
     cast(json_extract(currentagentsnapshot, '$.contacts') AS ARRAY(JSON)) as c,
     cast(json_extract(previousagentsnapshot, '$.contacts') AS ARRAY(JSON)) as p
  from kfhconnectblog
),
src2 AS (
  SELECT *
  FROM src CROSS JOIN UNNEST (c, p) AS contacts(c_item, p_item)
),
dataset AS (
SELECT 
  eventid,
  username,
  json_extract_scalar(c_item, '$.contactid') as c_contactid,
  json_extract_scalar(c_item, '$.channel') as c_channel,
  json_extract_scalar(c_item, '$.initiationmethod') as c_direction,
  json_extract_scalar(c_item, '$.queue.name') as c_queue,
  json_extract_scalar(c_item, '$.state') as c_state,
  from_iso8601_timestamp(json_extract_scalar(c_item, '$.statestarttimestamp')) as c_ts,
  
  json_extract_scalar(p_item, '$.contactid') as p_contactid,
  json_extract_scalar(p_item, '$.channel') as p_channel,
  json_extract_scalar(p_item, '$.initiationmethod') as p_direction,
  json_extract_scalar(p_item, '$.queue.name') as p_queue,
  json_extract_scalar(p_item, '$.state') as p_state,
  from_iso8601_timestamp(json_extract_scalar(p_item, '$.statestarttimestamp')) as p_ts
FROM src2
)
SELECT 
  username,
  c_channel as channel,
  c_direction as direction,
  p_state as prev_state,
  c_state as current_state,
  c_ts as current_ts,
  c_contactid as id
FROM dataset
WHERE c_contactid = p_contactid
ORDER BY id DESC, current_ts ASC

The query output looks similar to the following:

6. Analyze the data with Amazon Redshift Spectrum

With Amazon Redshift Spectrum, you can query data directly in S3 using your existing Amazon Redshift data warehouse cluster. Because the data is already in Parquet format, Redshift Spectrum gets the same great benefits that Athena does.

Here is a simple query to show querying the same data from Amazon Redshift. Note that to do this, you need to first create an external schema in Amazon Redshift that points to the AWS Glue Data Catalog.

SELECT 
  eventtype,
  json_extract_path_text(currentagentsnapshot,'agentstatus','name') AS current_status,
  json_extract_path_text(currentagentsnapshot, 'configuration','firstname') AS current_firstname,
  json_extract_path_text(currentagentsnapshot, 'configuration','lastname') AS current_lastname,
  json_extract_path_text(
    currentagentsnapshot,
    'configuration','routingprofile','defaultoutboundqueue','name') AS current_outboundqueue,
FROM default_schema.kfhconnectblog

The following shows the query output:

Summary

In this post, I showed you how to use Kinesis Data Firehose to ingest and convert data to columnar file format, enabling real-time analysis using Athena and Amazon Redshift. This great feature enables a level of optimization in both cost and performance that you need when storing and analyzing large amounts of data. This feature is equally important if you are investing in building data lakes on AWS.

 


Additional Reading

If you found this post useful, be sure to check out Analyzing VPC Flow Logs with Amazon Kinesis Firehose, Amazon Athena, and Amazon QuickSight and Work with partitioned data in AWS Glue.


About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan cheering his team on and hanging out with his family.

 

 

 

Spring 2018 AWS SOC Reports are Now Available with 11 Services Added in Scope

Post Syndicated from Chris Gile original https://aws.amazon.com/blogs/security/spring-2018-aws-soc-reports-are-now-available-with-11-services-added-in-scope/

Since our last System and Organization Control (SOC) audit, our service and compliance teams have been working to increase the number of AWS Services in scope prioritized based on customer requests. Today, we’re happy to report 11 services are newly SOC compliant, which is a 21 percent increase in the last six months.

With the addition of the following 11 new services, you can now select from a total of 62 SOC-compliant services. To see the full list, go to our Services in Scope by Compliance Program page:

• Amazon Athena
• Amazon QuickSight
• Amazon WorkDocs
• AWS Batch
• AWS CodeBuild
• AWS Config
• AWS OpsWorks Stacks
• AWS Snowball
• AWS Snowball Edge
• AWS Snowmobile
• AWS X-Ray

Our latest SOC 1, 2, and 3 reports covering the period from October 1, 2017 to March 31, 2018 are now available. The SOC 1 and 2 reports are available on-demand through AWS Artifact by logging into the AWS Management Console. The SOC 3 report can be downloaded here.

Finally, prospective customers can read our SOC 1 and 2 reports by reaching out to AWS Compliance.

Want more AWS Security news? Follow us on Twitter.

Analyze data in Amazon DynamoDB using Amazon SageMaker for real-time prediction

Post Syndicated from YongSeong Lee original https://aws.amazon.com/blogs/big-data/analyze-data-in-amazon-dynamodb-using-amazon-sagemaker-for-real-time-prediction/

Many companies across the globe use Amazon DynamoDB to store and query historical user-interaction data. DynamoDB is a fast NoSQL database used by applications that need consistent, single-digit millisecond latency.

Often, customers want to turn their valuable data in DynamoDB into insights by analyzing a copy of their table stored in Amazon S3. Doing this separates their analytical queries from their low-latency critical paths. This data can be the primary source for understanding customers’ past behavior, predicting future behavior, and generating downstream business value. Customers often turn to DynamoDB because of its great scalability and high availability. After a successful launch, many customers want to use the data in DynamoDB to predict future behaviors or provide personalized recommendations.

DynamoDB is a good fit for low-latency reads and writes, but it’s not practical to scan all data in a DynamoDB database to train a model. In this post, I demonstrate how you can use DynamoDB table data copied to Amazon S3 by AWS Data Pipeline to predict customer behavior. I also demonstrate how you can use this data to provide personalized recommendations for customers using Amazon SageMaker. You can also run ad hoc queries using Amazon Athena against the data. DynamoDB recently released on-demand backups to create full table backups with no performance impact. However, it’s not suitable for our purposes in this post, so I chose AWS Data Pipeline instead to create managed backups are accessible from other services.

To do this, I describe how to read the DynamoDB backup file format in Data Pipeline. I also describe how to convert the objects in S3 to a CSV format that Amazon SageMaker can read. In addition, I show how to schedule regular exports and transformations using Data Pipeline. The sample data used in this post is from Bank Marketing Data Set of UCI.

The solution that I describe provides the following benefits:

  • Separates analytical queries from production traffic on your DynamoDB table, preserving your DynamoDB read capacity units (RCUs) for important production requests
  • Automatically updates your model to get real-time predictions
  • Optimizes for performance (so it doesn’t compete with DynamoDB RCUs after the export) and for cost (using data you already have)
  • Makes it easier for developers of all skill levels to use Amazon SageMaker

All code and data set in this post are available in this .zip file.

Solution architecture

The following diagram shows the overall architecture of the solution.

The steps that data follows through the architecture are as follows:

  1. Data Pipeline regularly copies the full contents of a DynamoDB table as JSON into an S3
  2. Exported JSON files are converted to comma-separated value (CSV) format to use as a data source for Amazon SageMaker.
  3. Amazon SageMaker renews the model artifact and update the endpoint.
  4. The converted CSV is available for ad hoc queries with Amazon Athena.
  5. Data Pipeline controls this flow and repeats the cycle based on the schedule defined by customer requirements.

Building the auto-updating model

This section discusses details about how to read the DynamoDB exported data in Data Pipeline and build automated workflows for real-time prediction with a regularly updated model.

Download sample scripts and data

Before you begin, take the following steps:

  1. Download sample scripts in this .zip file.
  2. Unzip the src.zip file.
  3. Find the automation_script.sh file and edit it for your environment. For example, you need to replace 's3://<your bucket>/<datasource path>/' with your own S3 path to the data source for Amazon ML. In the script, the text enclosed by angle brackets—< and >—should be replaced with your own path.
  4. Upload the json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar file to your S3 path so that the ADD jar command in Apache Hive can refer to it.

For this solution, the banking.csv  should be imported into a DynamoDB table.

Export a DynamoDB table

To export the DynamoDB table to S3, open the Data Pipeline console and choose the Export DynamoDB table to S3 template. In this template, Data Pipeline creates an Amazon EMR cluster and performs an export in the EMRActivity activity. Set proper intervals for backups according to your business requirements.

One core node(m3.xlarge) provides the default capacity for the EMR cluster and should be suitable for the solution in this post. Leave the option to resize the cluster before running enabled in the TableBackupActivity activity to let Data Pipeline scale the cluster to match the table size. The process of converting to CSV format and renewing models happens in this EMR cluster.

For a more in-depth look at how to export data from DynamoDB, see Export Data from DynamoDB in the Data Pipeline documentation.

Add the script to an existing pipeline

After you export your DynamoDB table, you add an additional EMR step to EMRActivity by following these steps:

  1. Open the Data Pipeline console and choose the ID for the pipeline that you want to add the script to.
  2. For Actions, choose Edit.
  3. In the editing console, choose the Activities category and add an EMR step using the custom script downloaded in the previous section, as shown below.

Paste the following command into the new step after the data ­­upload step:

s3://#{myDDBRegion}.elasticmapreduce/libs/script-runner/script-runner.jar,s3://<your bucket name>/automation_script.sh,#{output.directoryPath},#{myDDBRegion}

The element #{output.directoryPath} references the S3 path where the data pipeline exports DynamoDB data as JSON. The path should be passed to the script as an argument.

The bash script has two goals, converting data formats and renewing the Amazon SageMaker model. Subsequent sections discuss the contents of the automation script.

Automation script: Convert JSON data to CSV with Hive

We use Apache Hive to transform the data into a new format. The Hive QL script to create an external table and transform the data is included in the custom script that you added to the Data Pipeline definition.

When you run the Hive scripts, do so with the -e option. Also, define the Hive table with the 'org.openx.data.jsonserde.JsonSerDe' row format to parse and read JSON format. The SQL creates a Hive EXTERNAL table, and it reads the DynamoDB backup data on the S3 path passed to it by Data Pipeline.

Note: You should create the table with the “EXTERNAL” keyword to avoid the backup data being accidentally deleted from S3 if you drop the table.

The full automation script for converting follows. Add your own bucket name and data source path in the highlighted areas.

#!/bin/bash
hive -e "
ADD jar s3://<your bucket name>/json-serde-1.3.6-SNAPSHOT-jar-with-dependencies.jar ; 
DROP TABLE IF EXISTS blog_backup_data ;
CREATE EXTERNAL TABLE blog_backup_data (
 customer_id map<string,string>,
 age map<string,string>, job map<string,string>, 
 marital map<string,string>,education map<string,string>, 
 default map<string,string>, housing map<string,string>,
 loan map<string,string>, contact map<string,string>, 
 month map<string,string>, day_of_week map<string,string>, 
 duration map<string,string>, campaign map<string,string>,
 pdays map<string,string>, previous map<string,string>, 
 poutcome map<string,string>, emp_var_rate map<string,string>, 
 cons_price_idx map<string,string>, cons_conf_idx map<string,string>,
 euribor3m map<string,string>, nr_employed map<string,string>, 
 y map<string,string> ) 
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
LOCATION '$1/';

INSERT OVERWRITE DIRECTORY 's3://<your bucket name>/<datasource path>/' 
SELECT concat( customer_id['s'],',', 
 age['n'],',', job['s'],',', 
 marital['s'],',', education['s'],',', default['s'],',', 
 housing['s'],',', loan['s'],',', contact['s'],',', 
 month['s'],',', day_of_week['s'],',', duration['n'],',', 
 campaign['n'],',',pdays['n'],',',previous['n'],',', 
 poutcome['s'],',', emp_var_rate['n'],',', cons_price_idx['n'],',',
 cons_conf_idx['n'],',', euribor3m['n'],',', nr_employed['n'],',', y['n'] ) 
FROM blog_backup_data
WHERE customer_id['s'] > 0 ; 

After creating an external table, you need to read data. You then use the INSERT OVERWRITE DIRECTORY ~ SELECT command to write CSV data to the S3 path that you designated as the data source for Amazon SageMaker.

Depending on your requirements, you can eliminate or process the columns in the SELECT clause in this step to optimize data analysis. For example, you might remove some columns that have unpredictable correlations with the target value because keeping the wrong columns might expose your model to “overfitting” during the training. In this post, customer_id  columns is removed. Overfitting can make your prediction weak. More information about overfitting can be found in the topic Model Fit: Underfitting vs. Overfitting in the Amazon ML documentation.

Automation script: Renew the Amazon SageMaker model

After the CSV data is replaced and ready to use, create a new model artifact for Amazon SageMaker with the updated dataset on S3.  For renewing model artifact, you must create a new training job.  Training jobs can be run using the AWS SDK ( for example, Amazon SageMaker boto3 ) or the Amazon SageMaker Python SDK that can be installed with “pip install sagemaker” command as well as the AWS CLI for Amazon SageMaker described in this post.

In addition, consider how to smoothly renew your existing model without service impact, because your model is called by applications in real time. To do this, you need to create a new endpoint configuration first and update a current endpoint with the endpoint configuration that is just created.

#!/bin/bash
## Define variable 
REGION=$2
DTTIME=`date +%Y-%m-%d-%H-%M-%S`
ROLE="<your AmazonSageMaker-ExecutionRole>" 


# Select containers image based on region.  
case "$REGION" in
"us-west-2" )
    IMAGE="174872318107.dkr.ecr.us-west-2.amazonaws.com/linear-learner:latest"
    ;;
"us-east-1" )
    IMAGE="382416733822.dkr.ecr.us-east-1.amazonaws.com/linear-learner:latest" 
    ;;
"us-east-2" )
    IMAGE="404615174143.dkr.ecr.us-east-2.amazonaws.com/linear-learner:latest" 
    ;;
"eu-west-1" )
    IMAGE="438346466558.dkr.ecr.eu-west-1.amazonaws.com/linear-learner:latest" 
    ;;
 *)
    echo "Invalid Region Name"
    exit 1 ;  
esac

# Start training job and creating model artifact 
TRAINING_JOB_NAME=TRAIN-${DTTIME} 
S3OUTPUT="s3://<your bucket name>/model/" 
INSTANCETYPE="ml.m4.xlarge"
INSTANCECOUNT=1
VOLUMESIZE=5 
aws sagemaker create-training-job --training-job-name ${TRAINING_JOB_NAME} --region ${REGION}  --algorithm-specification TrainingImage=${IMAGE},TrainingInputMode=File --role-arn ${ROLE}  --input-data-config '[{ "ChannelName": "train", "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri": "s3://<your bucket name>/<datasource path>/", "S3DataDistributionType": "FullyReplicated" } }, "ContentType": "text/csv", "CompressionType": "None" , "RecordWrapperType": "None"  }]'  --output-data-config S3OutputPath=${S3OUTPUT} --resource-config  InstanceType=${INSTANCETYPE},InstanceCount=${INSTANCECOUNT},VolumeSizeInGB=${VOLUMESIZE} --stopping-condition MaxRuntimeInSeconds=120 --hyper-parameters feature_dim=20,predictor_type=binary_classifier  

# Wait until job completed 
aws sagemaker wait training-job-completed-or-stopped --training-job-name ${TRAINING_JOB_NAME}  --region ${REGION}

# Get newly created model artifact and create model
MODELARTIFACT=`aws sagemaker describe-training-job --training-job-name ${TRAINING_JOB_NAME} --region ${REGION}  --query 'ModelArtifacts.S3ModelArtifacts' --output text `
MODELNAME=MODEL-${DTTIME}
aws sagemaker create-model --region ${REGION} --model-name ${MODELNAME}  --primary-container Image=${IMAGE},ModelDataUrl=${MODELARTIFACT}  --execution-role-arn ${ROLE}

# create a new endpoint configuration 
CONFIGNAME=CONFIG-${DTTIME}
aws sagemaker  create-endpoint-config --region ${REGION} --endpoint-config-name ${CONFIGNAME}  --production-variants  VariantName=Users,ModelName=${MODELNAME},InitialInstanceCount=1,InstanceType=ml.m4.xlarge

# create or update the endpoint
STATUS=`aws sagemaker describe-endpoint --endpoint-name  ServiceEndpoint --query 'EndpointStatus' --output text --region ${REGION} `
if [[ $STATUS -ne "InService" ]] ;
then
    aws sagemaker  create-endpoint --endpoint-name  ServiceEndpoint  --endpoint-config-name ${CONFIGNAME} --region ${REGION}    
else
    aws sagemaker  update-endpoint --endpoint-name  ServiceEndpoint  --endpoint-config-name ${CONFIGNAME} --region ${REGION}
fi

Grant permission

Before you execute the script, you must grant proper permission to Data Pipeline. Data Pipeline uses the DataPipelineDefaultResourceRole role by default. I added the following policy to DataPipelineDefaultResourceRole to allow Data Pipeline to create, delete, and update the Amazon SageMaker model and data source in the script.

{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Effect": "Allow",
 "Action": [
 "sagemaker:CreateTrainingJob",
 "sagemaker:DescribeTrainingJob",
 "sagemaker:CreateModel",
 "sagemaker:CreateEndpointConfig",
 "sagemaker:DescribeEndpoint",
 "sagemaker:CreateEndpoint",
 "sagemaker:UpdateEndpoint",
 "iam:PassRole"
 ],
 "Resource": "*"
 }
 ]
}

Use real-time prediction

After you deploy a model into production using Amazon SageMaker hosting services, your client applications use this API to get inferences from the model hosted at the specified endpoint. This approach is useful for interactive web, mobile, or desktop applications.

Following, I provide a simple Python code example that queries against Amazon SageMaker endpoint URL with its name (“ServiceEndpoint”) and then uses them for real-time prediction.

=== Python sample for real-time prediction ===

#!/usr/bin/env python
import boto3
import json 

client = boto3.client('sagemaker-runtime', region_name ='<your region>' )
new_customer_info = '34,10,2,4,1,2,1,1,6,3,190,1,3,4,3,-1.7,94.055,-39.8,0.715,4991.6'
response = client.invoke_endpoint(
    EndpointName='ServiceEndpoint',
    Body=new_customer_info, 
    ContentType='text/csv'
)
result = json.loads(response['Body'].read().decode())
print(result)
--- output(response) ---
{u'predictions': [{u'score': 0.7528127431869507, u'predicted_label': 1.0}]}

Solution summary

The solution takes the following steps:

  1. Data Pipeline exports DynamoDB table data into S3. The original JSON data should be kept to recover the table in the rare event that this is needed. Data Pipeline then converts JSON to CSV so that Amazon SageMaker can read the data.Note: You should select only meaningful attributes when you convert CSV. For example, if you judge that the “campaign” attribute is not correlated, you can eliminate this attribute from the CSV.
  2. Train the Amazon SageMaker model with the new data source.
  3. When a new customer comes to your site, you can judge how likely it is for this customer to subscribe to your new product based on “predictedScores” provided by Amazon SageMaker.
  4. If the new user subscribes your new product, your application must update the attribute “y” to the value 1 (for yes). This updated data is provided for the next model renewal as a new data source. It serves to improve the accuracy of your prediction. With each new entry, your application can become smarter and deliver better predictions.

Running ad hoc queries using Amazon Athena

Amazon Athena is a serverless query service that makes it easy to analyze large amounts of data stored in Amazon S3 using standard SQL. Athena is useful for examining data and collecting statistics or informative summaries about data. You can also use the powerful analytic functions of Presto, as described in the topic Aggregate Functions of Presto in the Presto documentation.

With the Data Pipeline scheduled activity, recent CSV data is always located in S3 so that you can run ad hoc queries against the data using Amazon Athena. I show this with example SQL statements following. For an in-depth description of this process, see the post Interactive SQL Queries for Data in Amazon S3 on the AWS News Blog. 

Creating an Amazon Athena table and running it

Simply, you can create an EXTERNAL table for the CSV data on S3 in Amazon Athena Management Console.

=== Table Creation ===
CREATE EXTERNAL TABLE datasource (
 age int, 
 job string, 
 marital string , 
 education string, 
 default string, 
 housing string, 
 loan string, 
 contact string, 
 month string, 
 day_of_week string, 
 duration int, 
 campaign int, 
 pdays int , 
 previous int , 
 poutcome string, 
 emp_var_rate double, 
 cons_price_idx double,
 cons_conf_idx double, 
 euribor3m double, 
 nr_employed double, 
 y int 
)
ROW FORMAT DELIMITED 
FIELDS TERMINATED BY ',' ESCAPED BY '\\' LINES TERMINATED BY '\n' 
LOCATION 's3://<your bucket name>/<datasource path>/';

The following query calculates the correlation coefficient between the target attribute and other attributes using Amazon Athena.

=== Sample Query ===

SELECT corr(age,y) AS correlation_age_and_target, 
 corr(duration,y) AS correlation_duration_and_target, 
 corr(campaign,y) AS correlation_campaign_and_target,
 corr(contact,y) AS correlation_contact_and_target
FROM ( SELECT age , duration , campaign , y , 
 CASE WHEN contact = 'telephone' THEN 1 ELSE 0 END AS contact 
 FROM datasource 
 ) datasource ;

Conclusion

In this post, I introduce an example of how to analyze data in DynamoDB by using table data in Amazon S3 to optimize DynamoDB table read capacity. You can then use the analyzed data as a new data source to train an Amazon SageMaker model for accurate real-time prediction. In addition, you can run ad hoc queries against the data on S3 using Amazon Athena. I also present how to automate these procedures by using Data Pipeline.

You can adapt this example to your specific use case at hand, and hopefully this post helps you accelerate your development. You can find more examples and use cases for Amazon SageMaker in the video AWS 2017: Introducing Amazon SageMaker on the AWS website.

 


Additional Reading

If you found this post useful, be sure to check out Serving Real-Time Machine Learning Predictions on Amazon EMR and Analyzing Data in S3 using Amazon Athena.

 


About the Author

Yong Seong Lee is a Cloud Support Engineer for AWS Big Data Services. He is interested in every technology related to data/databases and helping customers who have difficulties in using AWS services. His motto is “Enjoy life, be curious and have maximum experience.”

 

 

10 visualizations to try in Amazon QuickSight with sample data

Post Syndicated from Karthik Kumar Odapally original https://aws.amazon.com/blogs/big-data/10-visualizations-to-try-in-amazon-quicksight-with-sample-data/

If you’re not already familiar with building visualizations for quick access to business insights using Amazon QuickSight, consider this your introduction. In this post, we’ll walk through some common scenarios with sample datasets to provide an overview of how you can connect yuor data, perform advanced analysis and access the results from any web browser or mobile device.

The following visualizations are built from the public datasets available in the links below. Before we jump into that, let’s take a look at the supported data sources, file formats and a typical QuickSight workflow to build any visualization.

Which data sources does Amazon QuickSight support?

At the time of publication, you can use the following data methods:

  • Connect to AWS data sources, including:
    • Amazon RDS
    • Amazon Aurora
    • Amazon Redshift
    • Amazon Athena
    • Amazon S3
  • Upload Excel spreadsheets or flat files (CSV, TSV, CLF, and ELF)
  • Connect to on-premises databases like Teradata, SQL Server, MySQL, and PostgreSQL
  • Import data from SaaS applications like Salesforce and Snowflake
  • Use big data processing engines like Spark and Presto

This list is constantly growing. For more information, see Supported Data Sources.

Answers in instants

SPICE is the Amazon QuickSight super-fast, parallel, in-memory calculation engine, designed specifically for ad hoc data visualization. SPICE stores your data in a system architected for high availability, where it is saved until you choose to delete it. Improve the performance of database datasets by importing the data into SPICE instead of using a direct database query. To calculate how much SPICE capacity your dataset needs, see Managing SPICE Capacity.

Typical Amazon QuickSight workflow

When you create an analysis, the typical workflow is as follows:

  1. Connect to a data source, and then create a new dataset or choose an existing dataset.
  2. (Optional) If you created a new dataset, prepare the data (for example, by changing field names or data types).
  3. Create a new analysis.
  4. Add a visual to the analysis by choosing the fields to visualize. Choose a specific visual type, or use AutoGraph and let Amazon QuickSight choose the most appropriate visual type, based on the number and data types of the fields that you select.
  5. (Optional) Modify the visual to meet your requirements (for example, by adding a filter or changing the visual type).
  6. (Optional) Add more visuals to the analysis.
  7. (Optional) Add scenes to the default story to provide a narrative about some aspect of the analysis data.
  8. (Optional) Publish the analysis as a dashboard to share insights with other users.

The following graphic illustrates a typical Amazon QuickSight workflow.

Visualizations created in Amazon QuickSight with sample datasets

Visualizations for a data analyst

Source:  https://data.worldbank.org/

Download and Resources:  https://datacatalog.worldbank.org/dataset/world-development-indicators

Data catalog:  The World Bank invests into multiple development projects at the national, regional, and global levels. It’s a great source of information for data analysts.

The following graph shows the percentage of the population that has access to electricity (rural and urban) during 2000 in Asia, Africa, the Middle East, and Latin America.

The following graph shows the share of healthcare costs that are paid out-of-pocket (private vs. public). Also, you can maneuver over the graph to get detailed statistics at a glance.

Visualizations for a trading analyst

Source:  Deutsche Börse Public Dataset (DBG PDS)

Download and resources:  https://aws.amazon.com/public-datasets/deutsche-boerse-pds/

Data catalog:  The DBG PDS project makes real-time data derived from Deutsche Börse’s trading market systems available to the public for free. This is the first time that such detailed financial market data has been shared freely and continually from the source provider.

The following graph shows the market trend of max trade volume for different EU banks. It builds on the data available on XETRA engines, which is made up of a variety of equities, funds, and derivative securities. This graph can be scrolled to visualize trade for a period of an hour or more.

The following graph shows the common stock beating the rest of the maximum trade volume over a period of time, grouped by security type.

Visualizations for a data scientist

Source:  https://catalog.data.gov/

Download and resources:  https://catalog.data.gov/dataset/road-weather-information-stations-788f8

Data catalog:  Data derived from different sensor stations placed on the city bridges and surface streets are a core information source. The road weather information station has a temperature sensor that measures the temperature of the street surface. It also has a sensor that measures the ambient air temperature at the station each second.

The following graph shows the present max air temperature in Seattle from different RWI station sensors.

The following graph shows the minimum temperature of the road surface at different times, which helps predicts road conditions at a particular time of the year.

Visualizations for a data engineer

Source:  https://www.kaggle.com/

Download and resources:  https://www.kaggle.com/datasnaek/youtube-new/data

Data catalog:  Kaggle has come up with a platform where people can donate open datasets. Data engineers and other community members can have open access to these datasets and can contribute to the open data movement. They have more than 350 datasets in total, with more than 200 as featured datasets. It has a few interesting datasets on the platform that are not present at other places, and it’s a platform to connect with other data enthusiasts.

The following graph shows the trending YouTube videos and presents the max likes for the top 20 channels. This is one of the most popular datasets for data engineers.

The following graph shows the YouTube daily statistics for the max views of video titles published during a specific time period.

Visualizations for a business user

Source:  New York Taxi Data

Download and resources:  https://data.cityofnewyork.us/Transportation/2016-Green-Taxi-Trip-Data/hvrh-b6nb

Data catalog: NYC Open data hosts some very popular open data sets for all New Yorkers. This platform allows you to get involved in dive deep into the data set to pull some useful visualizations. 2016 Green taxi trip dataset includes trip records from all trips completed in green taxis in NYC in 2016. Records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts.

The following graph presents maximum fare amount grouped by the passenger count during a period of time during a day. This can be further expanded to follow through different day of the month based on the business need.

The following graph shows the NewYork taxi data from January 2016, showing the dip in the number of taxis ridden on January 23, 2016 across all types of taxis.

A quick search for that date and location shows you the following news report:

Summary

Using Amazon QuickSight, you can see patterns across a time-series data by building visualizations, performing ad hoc analysis, and quickly generating insights. We hope you’ll give it a try today!

 


Additional Reading

If you found this post useful, be sure to check out Amazon QuickSight Adds Support for Combo Charts and Row-Level Security and Visualize AWS Cloudtrail Logs Using AWS Glue and Amazon QuickSight.


Karthik Odapally is a Sr. Solutions Architect in AWS. His passion is to build cost effective and highly scalable solutions on the cloud. In his spare time, he bakes cookies and cupcakes for family and friends here in the PNW. He loves vintage racing cars.

 

 

 

Pranabesh Mandal is a Solutions Architect in AWS. He has over a decade of IT experience. He is passionate about cloud technology and focuses on Analytics. In his spare time, he likes to hike and explore the beautiful nature and wild life of most divine national parks around the United States alongside his wife.

 

 

 

 

Implement continuous integration and delivery of serverless AWS Glue ETL applications using AWS Developer Tools

Post Syndicated from Prasad Alle original https://aws.amazon.com/blogs/big-data/implement-continuous-integration-and-delivery-of-serverless-aws-glue-etl-applications-using-aws-developer-tools/

AWS Glue is an increasingly popular way to develop serverless ETL (extract, transform, and load) applications for big data and data lake workloads. Organizations that transform their ETL applications to cloud-based, serverless ETL architectures need a seamless, end-to-end continuous integration and continuous delivery (CI/CD) pipeline: from source code, to build, to deployment, to product delivery. Having a good CI/CD pipeline can help your organization discover bugs before they reach production and deliver updates more frequently. It can also help developers write quality code and automate the ETL job release management process, mitigate risk, and more.

AWS Glue is a fully managed data catalog and ETL service. It simplifies and automates the difficult and time-consuming tasks of data discovery, conversion, and job scheduling. AWS Glue crawls your data sources and constructs a data catalog using pre-built classifiers for popular data formats and data types, including CSV, Apache Parquet, JSON, and more.

When you are developing ETL applications using AWS Glue, you might come across some of the following CI/CD challenges:

  • Iterative development with unit tests
  • Continuous integration and build
  • Pushing the ETL pipeline to a test environment
  • Pushing the ETL pipeline to a production environment
  • Testing ETL applications using real data (live test)
  • Exploring and validating data

In this post, I walk you through a solution that implements a CI/CD pipeline for serverless AWS Glue ETL applications supported by AWS Developer Tools (including AWS CodePipeline, AWS CodeCommit, and AWS CodeBuild) and AWS CloudFormation.

Solution overview

The following diagram shows the pipeline workflow:

This solution uses AWS CodePipeline, which lets you orchestrate and automate the test and deploy stages for ETL application source code. The solution consists of a pipeline that contains the following stages:

1.) Source Control: In this stage, the AWS Glue ETL job source code and the AWS CloudFormation template file for deploying the ETL jobs are both committed to version control. I chose to use AWS CodeCommit for version control.

To get the ETL job source code and AWS CloudFormation template, download the gluedemoetl.zip file. This solution is developed based on a previous post, Build a Data Lake Foundation with AWS Glue and Amazon S3.

2.) LiveTest: In this stage, all resources—including AWS Glue crawlers, jobs, S3 buckets, roles, and other resources that are required for the solution—are provisioned, deployed, live tested, and cleaned up.

The LiveTest stage includes the following actions:

  • Deploy: In this action, all the resources that are required for this solution (crawlers, jobs, buckets, roles, and so on) are provisioned and deployed using an AWS CloudFormation template.
  • AutomatedLiveTest: In this action, all the AWS Glue crawlers and jobs are executed and data exploration and validation tests are performed. These validation tests include, but are not limited to, record counts in both raw tables and transformed tables in the data lake and any other business validations. I used AWS CodeBuild for this action.
  • LiveTestApproval: This action is included for the cases in which a pipeline administrator approval is required to deploy/promote the ETL applications to the next stage. The pipeline pauses in this action until an administrator manually approves the release.
  • LiveTestCleanup: In this action, all the LiveTest stage resources, including test crawlers, jobs, roles, and so on, are deleted using the AWS CloudFormation template. This action helps minimize cost by ensuring that the test resources exist only for the duration of the AutomatedLiveTest and LiveTestApproval

3.) DeployToProduction: In this stage, all the resources are deployed using the AWS CloudFormation template to the production environment.

Try it out

This code pipeline takes approximately 20 minutes to complete the LiveTest test stage (up to the LiveTest approval stage, in which manual approval is required).

To get started with this solution, choose Launch Stack:

This creates the CI/CD pipeline with all of its stages, as described earlier. It performs an initial commit of the sample AWS Glue ETL job source code to trigger the first release change.

In the AWS CloudFormation console, choose Create. After the template finishes creating resources, you see the pipeline name on the stack Outputs tab.

After that, open the CodePipeline console and select the newly created pipeline. Initially, your pipeline’s CodeCommit stage shows that the source action failed.

Allow a few minutes for your new pipeline to detect the initial commit applied by the CloudFormation stack creation. As soon as the commit is detected, your pipeline starts. You will see the successful stage completion status as soon as the CodeCommit source stage runs.

In the CodeCommit console, choose Code in the navigation pane to view the solution files.

Next, you can watch how the pipeline goes through the LiveTest stage of the deploy and AutomatedLiveTest actions, until it finally reaches the LiveTestApproval action.

At this point, if you check the AWS CloudFormation console, you can see that a new template has been deployed as part of the LiveTest deploy action.

At this point, make sure that the AWS Glue crawlers and the AWS Glue job ran successfully. Also check whether the corresponding databases and external tables have been created in the AWS Glue Data Catalog. Then verify that the data is validated using Amazon Athena, as shown following.

Open the AWS Glue console, and choose Databases in the navigation pane. You will see the following databases in the Data Catalog:

Open the Amazon Athena console, and run the following queries. Verify that the record counts are matching.

SELECT count(*) FROM "nycitytaxi_gluedemocicdtest"."data";
SELECT count(*) FROM "nytaxiparquet_gluedemocicdtest"."datalake";

The following shows the raw data:

The following shows the transformed data:

The pipeline pauses the action until the release is approved. After validating the data, manually approve the revision on the LiveTestApproval action on the CodePipeline console.

Add comments as needed, and choose Approve.

The LiveTestApproval stage now appears as Approved on the console.

After the revision is approved, the pipeline proceeds to use the AWS CloudFormation template to destroy the resources that were deployed in the LiveTest deploy action. This helps reduce cost and ensures a clean test environment on every deployment.

Production deployment is the final stage. In this stage, all the resources—AWS Glue crawlers, AWS Glue jobs, Amazon S3 buckets, roles, and so on—are provisioned and deployed to the production environment using the AWS CloudFormation template.

After successfully running the whole pipeline, feel free to experiment with it by changing the source code stored on AWS CodeCommit. For example, if you modify the AWS Glue ETL job to generate an error, it should make the AutomatedLiveTest action fail. Or if you change the AWS CloudFormation template to make its creation fail, it should affect the LiveTest deploy action. The objective of the pipeline is to guarantee that all changes that are deployed to production are guaranteed to work as expected.

Conclusion

In this post, you learned how easy it is to implement CI/CD for serverless AWS Glue ETL solutions with AWS developer tools like AWS CodePipeline and AWS CodeBuild at scale. Implementing such solutions can help you accelerate ETL development and testing at your organization.

If you have questions or suggestions, please comment below.

 


Additional Reading

If you found this post useful, be sure to check out Implement Continuous Integration and Delivery of Apache Spark Applications using AWS and Build a Data Lake Foundation with AWS Glue and Amazon S3.

 


About the Authors

Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.

 
Luis Caro is a Big Data Consultant for AWS Professional Services. He works with our customers to provide guidance and technical assistance on big data projects, helping them improving the value of their solutions when using AWS.