Tag Archives: AWS Glue

Perform biomedical informatics without a database using MIMIC-III data and Amazon Athena

Post Syndicated from James Wiggins original https://aws.amazon.com/blogs/big-data/perform-biomedical-informatics-without-a-database-using-mimic-iii-data-and-amazon-athena/

Biomedical researchers require access to accurate, detailed data. The MIT MIMIC-III dataset is a popular resource. Using Amazon Athena, you can execute standard SQL queries against MIMIC-III without first loading the data into a database. Your analyses always reference the most recent version of the MIMIC-III dataset.

This post describes how to make the MIMIC-III dataset available in Athena and provide automated access to an analysis environment for MIMIC-III on AWS. We also compare a MIMIC-III reference bioinformatics study using a traditional database to that same study using Athena.

Overview

A dataset capturing a variety of measures longitudinally over time, across many patients, can drive analytics and machine learning toward research discovery and improved clinical decision-making. These features describe the MIT Laboratory of Computational Physiology (LCP) MIMIC-III dataset. In the words of LCP researchers:

“MIMIC-III is a large, publicly available database comprising de-identified health-related data associated with approximately 60K admissions of patients who stayed in critical care units of the Beth Israel Deaconess Medical Center between 2001 and 2012. …MIMIC supports a diverse range of analytic studies spanning epidemiology, clinical decision-rule improvement, and electronic tool development. It is notable for three factors: it is publicly and freely available, it encompasses a diverse and large population of ICU patients, and it contains high temporal resolution data including lab results, electronic documentation, and bedside monitor trends and waveforms.”

Recently, the Registry of Open Data on AWS (RODA) program made the MIMIC-III dataset available through the AWS Cloud. You can now use the MIMIC-III dataset without having to download, copy, or pay to store it. Instead, you can analyze the MIMIC-III dataset in the AWS Cloud using AWS services like Amazon EC2, Athena, AWS Lambda, or Amazon EMR. AWS Cloud availability enables quicker and cheaper research into the dataset.

Services like Athena also offer you new analytical approaches to the MIMIC-III dataset. Using Athena, you can execute standard SQL queries against MIMIC-III without first loading the data into a database. Because you can reference the MIMIC-III dataset hosted in the RODA program, your analyses always reference the most recent version of the MIMIC-III dataset. Live hosting reduces upfront time and effort, eliminates data synchronization issues, improves data analysis, and reduces overall study costs.

Transforming MIMIC-III data

Historically, the MIMIC team distributed the MIMIC-III dataset in compressed (gzipped) CSV format. The choice of CSV format reflects the loading of MIMIC-III data into a traditional relational database for analysis.

In contrast, the MIMIC team provides the MIMIC-III dataset to the RODA program in the following formats:

  • The traditional CSV format
  • An Apache Parquet format optimized for modern data processing technologies such as Athena

Apache Parquet stores data by column, so queries that fetch specific columns can run without reading the whole table. This optimization helps improve the performance and lower the cost of many query types common to biomedical informatics.

To convert the original MIMIC-III CSV dataset to Apache Parquet, we created a data transformation job using AWS Glue. The MIMIC team schedules the AWS Glue job to run as needed, updating the Parquet files in the RODA program with any changes to the CSV dataset. These scheduled updates keep the Parquet files current without interfering with the MIMIC team’s CSV dataset creation procedures. Find the code for this AWS Glue job in the mimic-code GitHub repo. Use the same code as a basis to develop other CSV-to-Parquet conversions.

The code that converts the MIMIC-III NOTEEVENTS table offers a valuable resource. This table stores long medical notes made up of multiple lines, with commas, double-quotes, and other characters that can be challenging to transform. For example, you can use the following Spark read statement to interpret that CSV file:

df = spark.read.csv('s3://'+mimiccsvinputbucket+'/NOTEEVENTS.csv.gz',\
	header=True,\
	schema=schema,\
	multiLine=True,\
	quote='"',\
	escape='"')

Executing the indwelling arterial catheter study using MIMIC-III

For example, the MIMIC team provides code for the MIMIC indwelling arterial catheter (IAC) aline study. The study uses the MIMIC-III dataset to reproduce findings from the published study, The Association Between Indwelling Arterial Catheters and Mortality in Hemodynamically Stable Patients With Respiratory Failure.

You can obtain the aline study code, as well as many other analytic examples, in the MIT Laboratory of Computational Physiology MIMIC Code Repository in GitHub. Learn more about the provided code in the JAMA paper, The MIMIC Code Repository: enabling reproducibility in critical care research.

The aline study code comprises about 1400 lines of SQL. It was developed for a PostgreSQL database, and is executed and further analyzed by Python and R code. We ran the aline study against the Parquet MIMIC-III dataset using Athena instead of PostgreSQL to answer the following questions:

  • What modifications would be required, if any, to run the study using Athena, instead of PostgreSQL?
  • How would performance differ?
  • How would cost differ?

Some SQL features used in the study work differently in Athena than they do in PostgreSQL. As a result, about 5% of the SQL statements needed modification. Specifically, the SQL statements require the following types of modification for use with Athena:

Instead of using materialized views, in Athena, create a new table. For instance, the CREATE MATERIALIZED VIEW statement can be written as CREATE TABLE.

  • Instead of using schema context statements like SET SEARCH_PATH, in Athena, explicitly declare the schema of referenced tables. For instance, instead of having a SET SEARCH_PATH statement before your query and referencing tables without a schema as follows:

SET SEARCH_PATH TO mimiciii;

left join chartevents ce

You declare the schema as a part of every table reference:

left join mimiciii.chartevents ce

  • Epoch time, as well as timestamp arithmetic, works differently in Athena than PostgreSQL. So, instead of arithmetic, use the date_diff() and specify seconds as the return value.

extract(epoch from endtime-starttime)/24.0/60.0/60.0 

The preceding statement can be written as:

date_diff('second',starttime, endtime)/24.0/60.0/60.0

  • Athena uses the “double” data type instead of the “numeric” data type.

ROUND( cast(f.height_first as numeric), 2) AS height_first,

The preceding statement can be written as:

ROUND( cast(f.height_first as double), 2) AS height_first,

  • If VARCHAR fields are empty, they are not detected as NULL. Instead, compare a VARCHAR to an empty string. For instance, consider the following field:

where ce.value IS NOT NULL

If ce.value is a VARCHAR, it can be written as the following:

where ce.value <> ''

After making these minor edits to the aline study SQL statements, the study executes successfully using Athena instead of PostgreSQL. The output produced from both methods is identical.

Next, consider the speed of Athena compared to PostgreSQL in analyzing the aline study. As shown in the following graph, the aline study using Athena queries of the Parquet-formatted MIMIC-III dataset run 10 times faster than queries of the same data in an Amazon RDS PostgreSQL database.

Compare the costs of running the aline study in Athena to running it in PostgreSQL. RDS PostgreSQL databases bill in one-second increments for the time the database runs, while Athena queries bill per gigabyte of data each query scans. Because of this fundamental pricing difference, you must make some assumptions to compare costs.

You could assume that running the aline study in RDS PostgreSQL involves the following steps, taking up an eight-hour working day:

  1. Deploy a new RDS PostgreSQL database.
  2. Load the MIMIC-III dataset.
  3. Connect a Jupyter notebook.
  4. Run the aline study.
  5. Terminate the RDS database.

You’d then compare the cost of running an RDS PostgreSQL database for eight hours (at $0.37 per hour, db.m5.xlarge with 100-GB EBS storage) to the cost of running the aline study against a dataset using Athena (9.93 GB of data scanned at $0.005/GB). As the following graph shows, this results in an almost 60x cost savings.

Working with MIMIC-III in AWS

MIMIC-III is a publicly available dataset containing detailed information about the clinical care of patients. For this reason, the MIMIC team requires that you complete a training course and submit a formal request before gaining access. For more information, see Requesting access.

After you obtain access to the MIMIC-III dataset, log in to the PhysioNet website and, under your profile settings, provide your AWS account ID. Then click the request access link, under Files, on the MIMIC-III Clinical Database page.  You then have access to the MIMIC-III dataset in AWS.

Choose Launch Stack below to begin working with the MIMIC-III dataset in your AWS account. This launches an AWS CloudFormation template, which deploys an index of the MIMIC-III Parquet-formatted dataset into your AWS Glue Data Catalog. It also deploys an Amazon SageMaker notebook instance pre-loaded with the aline study code and many other MIMIC-provided analytic examples.

After the AWS CloudFormation template deploys, choose Outputs, and follow the link to access Jupyter Notebooks. From there, you can open and execute the aline study code by browsing the filesystem to the path ./mimic-code/notebooks/aline-aws/aline-awsathena.ipynb.

After you’ve completed your analysis, you can stop your Jupyter Notebook instance to suspend charges for compute.  Any time that you want to continue working, just start the instance and continue where you left off.

Conclusion

The RODA program provides quick and inexpensive access to the global biomedical research resources of the MIMIC-III dataset. Cloud-native analytic tools like AWS Glue and Athena accelerate research, and groups like the MIT Laboratory of Computational Physiology are pioneering data availability in modern data formats like Apache Parquet.

The analytic approaches and code demonstrated in this post can be applied generally to help increase agility and decrease cost. Consider using them as you enhance existing or perform new biomedical informatics research.

 


About the Author


James Wiggins is a senior healthcare solutions architect at AWS. He is passionate about using technology to help organizations positively impact world health. He also loves spending time with his wife and three children.

 

 

 


Alistair Johnson is a research scientist and the Massachusetts Institute of Technology. Alistair has extensive experience and expertise in working with ICU data, having published the MIMIC-III and eICU-CRD datasets. His current research focuses on clinical epidemiology and machine learning for decision support.

 

 

Build, secure, and manage data lakes with AWS Lake Formation

Post Syndicated from Nikki Rouda original https://aws.amazon.com/blogs/big-data/building-securing-and-managing-data-lakes-with-aws-lake-formation/

A data lake is a centralized store of a variety of data types for analysis by multiple analytics approaches and groups. Many organizations are moving their data into a data lake. In this post, I explore how you can use AWS Lake Formation to build, secure, and manage data lakes.

Traditionally, organizations have kept data in a rigid, single-purpose system, such as an on-premises data warehouse appliance. Similarly, they have analyzed data using a single method, such as predefined BI reports. Moving data between databases or for use with different approaches, like machine learning (ML) or improvised SQL querying, required “extract, transform, load” (ETL) processing before analysis. At best, these traditional methods have created inefficiencies and delays. At worst, they have complicated security.

By contrast, cloud-based data lakes open structured and unstructured data for more flexible analysis. Any amount of data can be aggregated, organized, prepared, and secured by IT staff in advance. Analysts and data scientists can then access it in place with the analytics tools of their choice, in compliance with appropriate usage policies.

Data lakes let you combine analytics methods, offering valuable insights unavailable through traditional data storage and analysis. In a retail scenario, ML methods discovered detailed customer profiles and cohorts on non-personally identifiable data gathered from web browsing behavior, purchase history, support records, and even social media. The exercise showed the deployment of ML models on real-time, streaming, interactive customer data.

Such models could analyze shopping baskets and serve up “next best offers” in the moment, or deliver instant promotional incentives. Marketing and support staff could explore customer profitability and satisfaction in real time and define new tactics to improve sales. Around a data lake, combined analytics techniques like these can unify diverse data streams, providing insights unobtainable from siloed data.

The challenges of building data lakes

Unfortunately, the complex and time-consuming process for building, securing, and starting to manage a data lake often takes months. Even building a data lake in the cloud requires many manual and time-consuming steps:

  • Setting up storage.
  • Moving, cleaning, preparing, and cataloging data.
  • Configuring and enforcing security policies for each service.
  • Manually granting access to users.

You want data lakes to centralize data for processing and analysis with multiple services. But organizing and securing the environment requires patience.

Currently, IT staff and architects spend too much time creating the data lake, configuring security, and responding to data requests. They could spend this time acting as curators of data resources, or as advisors to analysts and data scientists. Analysts and data scientists must wait for access to needed data throughout the setup.

The following diagram shows the data lake setup process:

Setting up storage

Data lakes hold massive amounts of data. Before doing anything else, you must set up storage to hold all that data. If you are using AWS, configure Amazon S3 buckets and partitions. If you are building the data lake on premises, acquire hardware and set up large disk arrays to store all the data.

Moving data

Connect to different data sources — on-premises and in the cloud — then collect data on IoT devices. Next, collect and organize the relevant datasets from those sources, crawl the data to extract the schemas, and add metadata tags to the catalog. You can use a collection of file transfer and ETL tools:

Cleaning and preparing data

Next, collected data must be carefully partitioned, indexed, and transformed to columnar formats to optimize for performance and cost. You must clean, de-duplicate, and match related records.

Today, organizations accomplish these tasks using rigid and complex SQL statements that perform unreliably and are difficult to maintain. This complex process of collecting, cleaning, and transforming the incoming data requires manual monitoring to avoid errors. Many customers use AWS Glue for this task.

Configuring and enforcing policies

Customers and regulators require that organizations secure sensitive data. Compliance involves creating and applying data access, protection, and compliance policies. For example, you restrict access to personally identifiable information (PII) at the table, column, or row level, encrypt all data, and keep audit logs of who is accessing the data.

Today, you can secure data using access control lists on S3 buckets or third-party encryption and access control software. You create and maintain data access, protection, and compliance policies for each analytics service requiring access to the data. For example, if you are running analysis against your data lake using Amazon Redshift and Amazon Athena, you must set up access control rules for each of these services.

Many customers use AWS Glue Data Catalog resource policies to configure and control metadata access to their data. Some choose to use Apache Ranger. But these approaches can be painful and limiting. S3 policies provide at best table-level access. And you must maintain data and metadata policies separately. With Apache Ranger, you can configure metadata access to only one cluster at a time. Also, policies can become wordy as the number of users and teams accessing the data lake grows within an organization.

Making it easy to find data

Users with different needs, like analysts and data scientists, may struggle to find and trust relevant datasets in the data lake. To make it easy for users to find relevant and trusted data, you must clearly label the data in a data lake catalog. Provide users with the ability to access and analyze this data without making requests to IT.

Today, each of these steps involves a lot of manual work. Customer labor includes building data access and transformation workflows, mapping security and policy settings, and configuring tools and services for data movement, storage, cataloging, security, analytics, and ML. With all these steps, a fully productive data lake can take months to implement.

The wide range of AWS services provides all the building blocks of a data lake, including many choices for storage, computing, analytics, and security. In the nearly 13 years that AWS has been operating Amazon S3 with exabytes of data, it’s also become the clear first choice for data lakes. AWS Glue adds a data catalog and server-less transformation capabilities. Amazon EMR brings managed big data processing frameworks like Apache Spark and Apache Hadoop. Amazon Redshift Spectrum offers data warehouse functions directly on data in Amazon S3. Athena brings server-less SQL querying.

With all these services available, customers have been building data lakes on AWS for years. AWS runs over 10,000 data lakes on top of S3, many using AWS Glue for the shared AWS Glue Data Catalog and data processing with Apache Spark.

AWS has learned from the thousands of customers running analytics on AWS that most customers who want to do analytics also want to build a data lake. But many of you want this process to be easier and faster than it is today.

AWS Lake Formation (now generally available)

At AWS re:Invent 2018, AWS introduced Lake Formation: a new managed service to help you build a secure data lake in days. If you missed it, watch Andy Jassy’s keynote announcement. Lake Formation has several advantages:

  • Identify, ingest, clean, and transform data: With Lake Formation, you can move, store, catalog, and clean your data faster.
  • Enforce security policies across multiple services: After your data sources are set up, you then define security, governance, and auditing policies in one place, and enforce those policies for all users and all applications.
  • Gain and manage new insights:With Lake Formation, you build a data catalog that describes available datasets and their appropriate business uses. This catalog makes your users more productive by helping them find the right dataset to analyze.

The following screenshot illustrates Lake Formation and its capabilities.

How to create a data lake

S3 forms the storage layer for Lake Formation. If you already use S3, you typically begin by registering existing S3 buckets that contain your data. Lake Formation creates new buckets for the data lake and import data into them. AWS always stores this data in your account, and only you have direct access to it.

There is no lock-in to Lake Formation for your data. Because AWS stores data in standard formats like CSV, ORC, or Parquet, it can be used with a wide variety of AWS or third-party analytics tools.

Lake Formation also optimizes the partitioning of data in S3 to improve performance and reduce costs. The raw data you load may reside in partitions that are too small (requiring extra reads) or too large (reading more data than needed). Lake Formation organizes your data by size, time, or relevant keys to allow fast scans and parallel, distributed reads for the most commonly used queries.

How to load data and catalog metadata

Lake Formation uses the concept of blueprints for loading and cataloging data. You can run blueprints one time for an initial load or set them up to be incremental, adding new data and making it available.

With Lake Formation, you can import data from MySQL, Postgres, SQL Server, MariaDB, and Oracle databases running in Amazon RDS or hosted in Amazon EC2. You can also import from on-premises databases by connecting with Java Database Connectivity (JDBC).

Point Lake Formation to the data source, identify the location to load it into the data lake, and specify how often to load it. Blueprints discovers the source table schema, automatically convert data to the target data format, partition the data based on the partitioning schema, and track data that was already processed. All these actions can be customized.

Blueprints rely on AWS Glue as a support service. AWS Glue crawlers connect and discover the raw data that to be ingested. AWS Glue code generation and jobs generate the ingest code to bring that data into the data lake. Lake Formation uses the same data catalog for organizing the metadata. AWS Glue stitches together crawlers and jobs and allows for monitoring for individual workflows. In these ways, Lake Formation is a natural extension of AWS Glue capabilities.

The following graphics show the Blueprint Workflow and Import screens:

How to transform and prepare data for analysis

In addition to supporting all the same ETL capabilities as AWS Glue, Lake Formation introduces new Amazon ML Transforms. This feature includes a fuzzy logic blocking algorithm that can de-duplicate 400M+ records in less than 2.5 hours, which is magnitudes better than earlier approaches.

To match and de-duplicate your data using Amazon ML Transforms: First, merge related datasets. Amazon ML Transforms divides these sets into training and testing samples, then scans for exact and fuzzy matches. You can provide more data and examples for greater accuracy, putting these into production to process new data as it arrives to your data lake. The partitioning algorithm requires minimal tuning. The confidence level reflects the quality of the grouping, improving on earlier, more improvised algorithms. The following diagram shows this matching and de-duplicating workflow.

Amazon.com is currently using and vetting Amazon ML Transforms internally, at scale, for retail workloads. Lake Formation now makes these algorithms available to customers, so you can avoid the frustration of creating complex and fragile SQL statements to handle record matching and de-duplication. Amazon ML Transforms help improve data quality before analysis. For more information, see Fuzzy Matching and Deduplicating Data with Amazon ML Transforms for AWS Lake Formation.

How to set access control permissions

Lake Formation lets you define policies and control data access with simple “grant and revoke permissions to data” sets at granular levels. You can assign permissions to IAM users, roles, groups, and Active Directory users using federation. You specify permissions on catalog objects (like tables and columns) rather than on buckets and objects.

You can easily view and audit all the data policies granted to a user—in one place. Search and view the permissions granted to a user, role, or group through the dashboard; verify permissions granted; and when necessary, easily revoke policies for a user. The following screenshots show the Grant permissions console:

How to make data available for analytics

Lake Formation offers unified, text-based, faceted search across all metadata, giving users self-serve access to the catalog of datasets available for analysis. This catalog includes discovered schemas (as discussed previously) and lets you add attributes like data owners, stewards, and other business-specific attributes as table properties.

At a more granular level, you can also add data sensitivity level, column definitions, and other attributes as column properties. You can explore data by any of these properties. But access is subject to user permissions. See the following screenshot of the AWS Glue tables tab:

How to monitor activity

With Lake Formation, you can also see detailed alerts in the dashboard, and then download audit logs for further analytics.

Amazon CloudWatch publishes all data ingestion events and catalog notifications. In this way, you can identify suspicious behavior or demonstrate compliance with rules.

To monitor and control access using Lake Formation, first define the access policies, as described previously. Users who want to conduct analysis access data directly through an AWS analytics service, such as Amazon EMR for Spark, Amazon Redshift, or Athena. Or, they access data indirectly with Amazon QuickSight or Amazon SageMaker.

A service forwards the user credentials to Lake Formation for the validation of access permissions. Then Lake Formation returns temporary credentials granting access to the data in S3, as shown in the following diagrams. After a user gains access, actual reads and writes of data operate directly between the analytics service and S3. This approach removes the need for an intermediary in the critical data-processing path.

The following screenshot and diagram show how to monitor and control access using Lake Formation.

Conclusion

With just a few steps, you can set up your data lake on S3 and start ingesting data that is readily queryable. To get started, go to the Lake Formation console and add your data sources. Lake Formation crawls those sources and moves the data into your new S3 data lake.

Lake Formation can automatically lay out the data in S3 partitions; change it into formats for faster analytics, like Apache Parquet and ORC; and increase data quality through machine-learned record matching and de-duplication.

From a single dashboard, you can set up all the permissions for your data lake. Those permissions are implemented for every service accessing this data – including analytics and ML services (Amazon Redshift, Athena, and Amazon EMR for Apache Spark workloads). Lake Formation saves you the hassle of redefining policies across multiple services and provides consistent enforcement of and compliance with those policies.

Learn how to start using AWS Lake Formation.


About the Authors

Nikki Rouda is the principal product marketing manager for data lakes and big data at AWS. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 

 

Prajakta Damle is a Principle Product Manager at Amazon Web Services.

 

 

 

 

 

 

AWS Lake Formation – Now Generally Available

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/aws-lake-formation-now-generally-available/

As soon as companies started to have data in digital format, it was possible for them to build a data warehouse, collecting data from their operational systems, such as Customer relationship management (CRM) and Enterprise resource planning (ERP) systems, and use this information to support their business decisions.

The reduction in costs of storage, together with an even greater reduction in complexity for managing large quantities of data, made possible by services such as Amazon S3, has allowed companies to retain more information, including raw data that is not structured, such as logs, images, video, and scanned documents.

This is the idea of a data lake: to store all your data in one, centralized repository, at any scale. We are seeing this approach with customers like Netflix, Zillow, NASDAQ, Yelp, iRobot, FINRA, and Lyft. They can run their analytics on this larger dataset, from simple aggregations to complex machine learning algorithms, to better discover patterns in their data and understand their business.

Last year at re:Invent we introduced in preview AWS Lake Formation, a service that makes it easy to ingest, clean, catalog, transform, and secure your data and make it available for analytics and machine learning. I am happy to share that Lake Formation is generally available today!

With Lake Formation you have a central console to manage your data lake, for example to configure the jobs that move data from multiple sources, such as databases and logs, to your data lake. Having such a large and diversified amount of data makes configuring the right access permission also critical. You can secure access to metadata in the Glue Data Catalog and data stored in S3 using a single set of granular data access policies defined in Lake Formation. These policies allow you to define table and column-level data access.

One thing I like the most of Lake Formation is that it works with your data already in S3! You can easily register your existing data with Lake Formation, and you don’t need to change existing processes loading your data to S3. Since data remains in your account, you have full control.

You can also use Glue ML Transforms to easily deduplicate your data. Deduplication is important to reduce the amount of storage you need, but also to make analyzing your data more efficient because you don’t have neither the overhead nor the possible confusion of looking at the same data twice. This problem is trivial if duplicate records can be identified by a unique key, but becomes very challenging when you have to do a “fuzzy match”. A similar approach can be used for record linkage, that is when you are looking for similar items in different tables, for example to do a “fuzzy join” of two databases that do not share a unique key.

In this way, implementing a data lake from scratch is much faster, and managing a data lake is much easier, making these technologies available to more customers.

Creating a Data Lake
Let’s build a data lake using the Lake Formation console. First I register the S3 buckets that are going to be part of my data lake. Then I create a database and grant permission to the IAM users and roles that I am going to use to manage my data lake. The database is registered in the Glue Data Catalog and holds the metadata required to analyze the raw data, such as the structure of the tables that are going to be automatically generated during data ingestion.

Managing permissions is one of the most complex tasks for a data lake. Consider for example the huge amount of data that can be part of it, the sensitive, mission-critical nature of some of the data, and the different structured, semi-structured, and unstructured formats in which data can reside. Lake Formation makes it easier with a central location where you can give IAM users, roles, groups, and Active Directory users (via federation) access to databases, tables, optionally allowing or denying access to specific columns within a table.

To simplify data ingestion, I can use blueprints that create the necessary workflows, crawlers and jobs on AWS Glue for common use cases. Workflows enable orchestration of your data loading workloads by building dependencies between Glue entities, such as triggers, crawlers and jobs, and allow you to track visually the status of the different nodes in the workflows on the console, making it easier to monitor progress and troubleshoot issues.

Database blueprints help load data from operational databases. For example, if you have an e-commerce website, you can ingest all your orders in your data lake. You can load a full snapshot from an existing database, or incrementally load new data. In case of an incremental load, you can select a table and one or more of its columns as bookmark keys (for example, a timestamp in your orders) to determine previously imported data.

Log file blueprints simplify ingesting logging formats used by Application Load Balancers, Elastic Load Balancers, and AWS CloudTrail. Let’s see how that works more in depth.

Security is always a top priority, and I want to be able to have a forensic log of all management operations across my account, so I choose the CloudTrail blueprint. As source, I select a trail collecting my CloudTrail logs from all regions into an S3 bucket. In this way, I’ll be able to query account activity across all my AWS infrastructure. This works similarly for a larger organization having multiple AWS accounts: they just need, when configuring the trail in the CloudTrial console, to apply the trail to their whole organization.

I then select the target database, and the S3 location for the data lake. As data format I use Parquet, a columnar storage format that will make querying the data faster and cheaper. The import frequency can be hourly to monthly, with the option to choose the day of the week and the time. For now, I want to run the workflow on demand. I can do that from the console or programmatically, for example using any AWS SDK or the AWS Command Line Interface (CLI).

Finally, I give the workflow a name, the IAM role to use during execution, and a prefix for the tables that will be automatically created by this workflow.

I start the workflow from the Lake Formation console and select to view the workflow graph. This opens the AWS Glue console, where I can visually see the steps of the workflow and monitor the progress of this run.

When the workflow is completed a new table is available in my data lake database. The source data remain as logs in the S3 bucket output of CloudTrail, but now I have them consolidated, in Parquet format and partitioned by date, in my data lake S3 location. To optimize costs, I can set up an S3 lifecycle policy that automatically expires data in the source S3 bucket after a safe amount of time has passed.

Securing Access to the Data Lake
Lake Formation provides secure and granular access to data stores in the data lake, via a new grant/revoke permissions model that augments IAM policies. It is simple to set up these permissions, for example using the console:

I simply select the IAM user or role I want to grant access to. Then I select the database and optionally the tables and the columns I want to provide access to. It is also possible to select which type of access to provide. For this demo, simple select permissions are sufficient.

Accessing the Data Lake
Now I can query the data using tools like Amazon Athena or Amazon Redshift. For example, I open the query editor in the Athena console. First, I want to use my new data lake to look into which source IP addresses are most common in my AWS Account activity:

SELECT sourceipaddress, count(*)
FROM my_trail_cloudtrail
GROUP BY  sourceipaddress
ORDER BY  2 DESC;

Looking at the result of the query, you can see which are the AWS API endpoints that I use the most. Then, I’d like to check which user identity types are used. That is an information stored in JSON format inside one of the columns. I can use some of the JSON functions available with Amazon Athena to get that information in my SQL statements:

SELECT json_extract_scalar(useridentity, '$.type'), count(*)
FROM "mylake"."my_trail_cloudtrail"
GROUP BY  json_extract_scalar(useridentity, '$.type')
ORDER BY  2 DESC;

Most of the times, AWS services are the ones creating activities in my trail. These queries are just an example, but give me quickly a deeper insight in what is happening in my AWS account.

Think of what could be a similar impact for your business! Using database and logs blueprints, you can quickly create workflows to ingest data from multiple sources within your organization, set the right permission at column level of who can have access to any information collected, clean and prepare your data using machine learning transforms, and correlate and visualize the information using tools like Amazon Athena, Amazon Redshift, and Amazon QuickSight.

Customizing Data Access with Column-Level Permissions
In order to follow data privacy guidelines and compliance, the mission-critical data stored in a data lake requires to create custom views for different stakeholders inside the company. Let’s compare the visibility of two IAM users in my AWS account, one that has full permissions on a table, and one that has only select access to a subset of the columns of the same table.

I already have a user with full access to the table containing my CloudTrail data, it’s called danilop. I create a new limitedview IAM user and I give it access to the Athena console. In the Lake Formation console, I only give this new user select permissions on three of the columns.

To verify the different access to the data in the table, I log in with one user at a time and go to the Athena console. On the left I can explore which tables and columns the logged-in user can see in the Glue Data Catalog. Here’s a comparison for the two users, side-by-side:

The limited user has access only to the three columns that I explicitly configured, and to the four columns used for partitioning the table, whose access is required to see any data. When I query the table in the Athena console with a select * SQL statement, logged in as the limitedview user, I only see data from those seven columns:

Available Now
There is no additional cost in using AWS Lake Formation, you pay for the use of the underlying services such as Amazon S3 and AWS Glue. One of the core benefits of Lake Formation are the security policies it is introducing. Previously you had to use separate policies to secure data and metadata access, and these policies only allowed table-level access. Now you can give access to each user, from a central location, only to the the columns they need to use.

AWS Lake Formation is now available in US East (N. Virginia), US East (Ohio), US West (Oregon), Europe (Ireland), and Asia Pacific (Tokyo). Redshift integration with Lake Formation requires Redshift cluster version 1.0.8610 or higher, your clusters should have been automatically updated by the time you read this. Support for Apache Spark with Amazon EMR will follow over the next few months.

I only scratched the surface of what you can do with Lake Formation. Building and managing a data lake for your business is now much easier, let me know how you are using these new capabilities!

Danilo

Extracting Salesforce.com data using AWS Glue and analyzing with Amazon Athena

Post Syndicated from Behram Irani original https://aws.amazon.com/blogs/big-data/extracting-salesforce-com-data-using-aws-glue-and-analyzing-with-amazon-athena/

Salesforce is a popular and widely used customer relationship management (CRM) platform. It lets you store and manage prospect and customer information—like contact info, accounts, leads, and sales opportunities—in one central location. You can derive a lot of useful information by combining the prospect information stored in Salesforce with other structured and unstructured data in your data lake.

In this post, I show you how to use AWS Glue to extract data from a Salesforce.com account object and save it to Amazon S3. You then use Amazon Athena to generate a report by joining the account object data from Salesforce.com with the orders data from a separate order management system.

Preparing your data

I signed up for a free Salesforce.com account, which comes with a handful of sample records populated with many of the Salesforce.com objects. You can use your organization’s development Salesforce.com account and pull data from multiple objects at the same time by modifying the SOQL query in your AWS Glue code. To demonstrate extracting data from these objects, only use the Account object to keep the query simple.

To demonstrate joining Salesforce.com data with data from another system using Amazon Athena, you create a sample data file showing orders coming from an order management system.

Setting up an AWS Glue job

Use the open source springml library to connect Apache Spark with Salesforce.com. The library comes with plenty of handy features that allow you to read, write, and update Salesforce.com objects using the Apache Spark framework.

You can compile the jars from the springml GitHub repo or download with dependencies from the Maven repo. Upload these JAR files to your S3 bucket and make a note of the full path for each.

force-partner-api-40.0.0.jar
force-wsc-40.0.0.jar
salesforce-wave-api-1.0.9.jar
spark-salesforce_2.11-1.1.1.jar 

In the AWS Management Console, choose AWS Glue in the Region where you want to run the service. Choose Jobs, Add Job. Follow the wizard by filling in the necessary details.

Under the Security configuration, script libraries, and job parameters (optional) section, for Dependent jars path, list the paths for the four JAR files listed previously, separated by commas.

For this job, I allocated Maximum capacity as “2.” This field defines the number of AWS Glue data processing units (DPUs) that the system can allocate when this job runs. A DPU is a relative measure of processing power that consists of four vCPUs of compute capacity and 16 GB of memory. When you specify an Apache Spark ETL job, you can allocate 2–100 DPUs. The default is 10 DPUs.

Execute the AWS Glue job to extract data from the Salesforce.com object

The following Scala code extracts a few fields from the Account object in Salesforce.com and writes them as a table to S3 in Apache Parquet file format.

import com.amazonaws.services.glue.util.GlueArgParser  
import com.amazonaws.services.glue.util.Job  
import com.amazonaws.services.glue.util.JsonOptions  
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}  
import org.apache.spark.SparkContext  
import scala.collection.JavaConverters.mapAsJavaMapConverter  
  
object SfdcExtractData {  
  def main(sysArgs: Array[String]) {  
      
    val sparkContext: SparkContext = new SparkContext()  
    val glueContext: GlueContext = new GlueContext(sparkContext)  
    val sparkSession = glueContext.getSparkSession  
      
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)  
    Job.init(args("JOB_NAME"), glueContext, args.asJava)  
      
    val soql = "select name, accountnumber, industry, type, billingaddress, sic from account"  
    val df = sparkSession.read.format("com.springml.spark.salesforce").option("soql",soql).option("username", "username").option("password","password+securitytoken").load()
     
    val datasource0 = DynamicFrame(df, glueContext).withName("datasource0").withTransformationContext("datasource0")  
        
    val datasink1 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> "s3://replace-with-your-s3-bucket/sfdc-output", "partitionKeys" -> Seq("Industry"))), format = "parquet", transformationContext = "datasink1").writeDynamicFrame(datasource0)  
  
    Job.commit()  
  }  
}

This code relies on a few key components:

val df = sparkSession.read.format("com.springml.spark.salesforce").option("soql",soql).option("username", "username").option("password","password+securitytoken").load()

This code example establishes a Salesforce.com connection, submits a SOQL-compatible query for the Account object, and loads the returned records into a Spark DataFrame. Don’t forget to replace username with your Salesforce.com username and password as a combination of your password and the security token of your profile.

Best practices suggest storing and retrieving the password using AWS Secrets Manager instead of hardcoding it. For simplicity, I left it hardcoded in this example.

Keep in mind that this query is simple and returns only a handful of records. For large volumes of data, you might want to limit the results returned by your query or use other techniques like bulk query and chunking. Check the springml page to learn more about the functionality that Salesforce.com supports.

val datasink1 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> "s3://replace-with-your-s3-bucket/sfdc-output", "partitionKeys" -> Seq("Industry"))), format = "parquet", transformationContext = "datasink1").writeDynamicFrame(datasource0)  

This code does all the writing to your S3 bucket. In this example, you want to aggregate data by Industry segments. Because of that, you should partition the data by the Industry field.

Also, the code writes in Parquet format. Athena charges you by the amount of data scanned per query. You can save on costs and get better performance when you partition the data, compress data, or convert it to columnar formats like Parquet.

After you run this code in AWS Glue, you can go to your S3 bucket where the sink points and find something like the following structure:

Query the data with Athena

After the code drops your Salesforce.com data into your S3 bucket with the correct partition and format, AWS Glue can crawl the dataset. It creates the appropriate schema in the AWS Glue Data Catalog. Wait for AWS Glue to create the table. Then, Athena can query the table and join with other tables in the catalog.

First, use the AWS Glue crawler to discover the Salesforce.com Account data that you previously stored in the S3 bucket. For details about how to use the crawler, see populating the AWS Glue Data Catalog.

In this example, point the crawler to the S3 output prefix where you stored your Salesforce.com Account data, and run it. The crawler creates a new catalog table before it finally stops.

The AWS Glue Data Catalog table automatically captures all the column names, types, and partition column used, and stores everything in your S3 bucket in Parquet file format. You can now query this table with Athena. A simple SELECT query on that table shows the results of scanning the data from the S3 bucket.

Now your Salesforce.com data is ready for Athena to query. For this example, join this data with the sample orders from this sample order management system in S3. After your AWS Glue crawler finishes cataloging the sample orders data, Athena can query it.

Finally, use Athena to join both tables in an aggregation query.

Conclusion

In this post, I showed a simple example for extracting any Salesforce.com object data using AWS Glue and Apache Spark, and saving it to S3. You can then catalog your S3 data in AWS Glue Data Catalog, allowing Athena to query it. With this mechanism in place, you can easily incorporate Salesforce data into your AWS based data lake.

If you have comments or feedback, please leave them below.

 


About the Author

 

Behram Irani is a Data Architect at Amazon Web Services.

 

 

 

 

Loading ongoing data lake changes with AWS DMS and AWS Glue

Post Syndicated from Rajiv Gupta original https://aws.amazon.com/blogs/big-data/loading-ongoing-data-lake-changes-with-aws-dms-and-aws-glue/

Building a data lake on Amazon S3 provides an organization with countless benefits. It allows you to access diverse data sources, determine unique relationships, build AI/ML models to provide customized customer experiences, and accelerate the curation of new datasets for consumption. However, capturing and loading continuously changing updates from operational data stores—whether on-premises or on AWS—into a data lake can be time-consuming and difficult to manage.

The following post demonstrates how to deploy a solution that loads ongoing changes from popular database sources—such as Oracle, SQL Server, PostgreSQL, and MySQL—into your data lake. The solution streams new and changed data into Amazon S3. It also creates and updates appropriate data lake objects, providing a source-similar view of the data based on a schedule you configure. The AWS Glue Data Catalog then exposes the newly updated and de-duplicated data for analytics services to use.

Solution overview

I divide this solution into two AWS CloudFormation stacks. You can download the AWS CloudFormation templates I reference in this post from a public S3 bucket, or you can launch them using the links featured later. You can likewise download the AWS Glue jobs referenced later in this post.

The first stack contains reusable components. You only have to deploy it one time. It launches the following AWS resources:

  • AWS Glue jobs: Manages the workflow of the load process from the raw S3 files to the de-duped and optimized parquet files.
  • Amazon DynamoDB table: Persists the state of data load for each data lake table.
  • IAM role: Runs these services and accesses S3. This role contains policies with elevated privileges. Only attach this role to these services and not to IAM users or groups.
  • AWS DMS replication instance: Runs replication tasks to migrate ongoing changes via AWS DMS.

The second stack contains objects that you should deploy for each source you bring in to your data lake. It launches the following AWS resources:

  • AWS DMS replication task: Reads changes from the source database transaction logs for each table and stream that write data into an S3 bucket.
  • S3 buckets: Stores raw AWS DMS initial load and update objects, as well as query-optimized data lake objects.
  • AWS Glue trigger: Schedules the AWS Glue jobs.
  • AWS Glue crawler: Builds and updates the AWS Glue Data Catalog on a schedule.

Stack parameters

The AWS CloudFormation stack requires that you input parameters to configure the ingestion and transformation pipeline:

  • DMS source database configuration: The database connection settings that the DMS connection object needs, such as the DB engine, server, port, user, and password.
  • DMS task configuration: The settings the AWS DMS task needs, such as the replication instance ARN, table filter, schema filter, and the AWS DMS S3 bucket location. The table filter and schema filter allow you to choose which objects the replication task syncs.
  • Data lake configuration: The settings your stack passes to the AWS Glue job and crawler, such as the S3 data lake location, data lake database name, and run schedule.

Post-deployment

After you deploy the solution, the AWS CloudFormation template starts the DMS replication task and populates the DynamoDB controller table. Data does not propagate to your data lake until you review and update the DynamoDB controller table.

In the DynamoDB console, configure the following fields to control the data load process shown in the following table:

FieldDescription
ActiveFlagRequired. When set to true, it enables this table for loading.
PrimaryKeyA comma-separated list of column names. When set, the AWS Glue job uses these fields for processing update and delete transactions. When set to “null,” the AWS Glue job only processes inserts.
PartitionKeyA comma-separated list of column names. When set, the AWS Glue job uses these fields to partition the output files into multiple subfolders in S3. Partitions can be valuable when querying and processing larger tables but may overcomplicate smaller tables. When set to “null,” the AWS Glue job only loads data into one partition.
LastFullLoadDateThe data of the last full load. The AWS Glue job compares this to the date of the DMS-created full load file. Setting this field to an earlier value triggers AWS Glue to reprocess the full load file.
LastIncrementalFileThe file name of the last incremental file. The AWS Glue job compares this to any new DMS-created incremental files. Setting this field to an earlier value triggers AWS Glue to reprocess any files with a larger name.

At this point, the setup is complete. At the next scheduled interval, the AWS Glue job processes any initial and incremental files and loads them into your data lake. At the next scheduled AWS Glue crawler run, AWS Glue loads the tables into the AWS Glue Data Catalog for use in your down-stream analytical applications.

Amazon Athena and Amazon Redshift

Your pipeline now automatically creates and updates tables. If you use Amazon Athena, you can begin to query these tables right away. If you use Amazon Redshift, you can expose these tables as an external schema and begin to query.

You can analyze these tables directly or join them to tables already in your data warehouse, or use them as inputs to an extract, transform, and load (ETL) process. For more information, see Creating External Schemas for Amazon Redshift Spectrum.

AWS Lake Formation

At the time of writing this post, AWS Lake Formation has been announced but not released. AWS Lake Formation makes it easy to set up a secure data lake. To incorporate Lake Formation in this solution, add the S3 location specified during launch as a “data lake storage” location and use Lake Formation to vend credentials to your IAM users.

AWS Lake Formation eliminates the need to grant S3 access via user, group, or bucket policies and instead provides a centralized console for granting and auditing access to your data lake.

Key features

A few built-in AWS CloudFormation key configurations make this solution possible. Understanding these features helps you replicate this strategy for other purposes or customize the application for your needs.

AWS DMS

  • The first AWS CloudFormation template deploys an AWS DMS replication instance. Before launching the second AWS CloudFormation template, ensure that the replication instance connects to your on-premises data source.
  • The AWS DMS endpoint for the S3 target has an extra connection attribute: addColumnName=true. This attribute tells DMS to add column headers to the output files. The process uses this header to build the metadata for the parquet files and the AWS Glue Data Catalog.
  • When the AWS DMS replication task begins, the initial load process writes files to the following location: s3://<bucket>/<schema>/<table>/. It writes one file per table for the initial load named LOAD00000001.csv. It writes up to one file per minute for any data changes named <datetime>.csv. The load process uses these file names to process new data incrementally.
  • The AWS DMS change data capture (CDC) process adds an additional field in the dataset “Op.” This field indicates the last operation for a given key. The change detection logic uses this field, along with the primary key stored in the DynamoDB table, to determine which operation to perform on the incoming data. The process passes this field along to your data lake, and you can see it when querying data.
  • The AWS CloudFormation template deploys two roles specific to DMS (DMS-CloudWatch-logs-role, DMS-VPC-role) that may already be in place if you previously used DMS. If the stack fails to build because of these roles, you can safely remove these roles from the template.

AWS Glue

  • AWS Glue has two types of jobs: Python shell and Apache Spark. The Python shell job allows you to run small tasks using a fraction of the compute resources and at a fraction of the cost. The Apache Spark job allows you to run medium- to large-sized tasks that are more compute- and memory-intensive by using a distributed processing framework. This solution uses the Python shell jobs to determine which files to process and to maintain the state in the DynamoDB table. It also uses Spark jobs for data processing and loading.
  • As changes stream in from your relational database, you may see new transactions appear as new files within a given folder. This load process behavior minimizes the impact on already loaded data. If this causes inconsistency in your file sizes or query performance, consider incorporating a compaction (file merging) process.
  • Between job runs, AWS Glue sequences duplicate transactions to the same primary key (for example, insert, then update) by file name and order. It determines the last transaction and uses it to re-write the impacted object to S3.
  • Configuration settings allow the Spark-type AWS Glue jobs a maximum of two DPUs of processing power. If your load jobs underperform, consider increasing this value. Increasing the job DPUs is most effective for tables set up with a partition key or when the DMS process generates multiple files between executions.
  • If your organization already has a long-running Amazon EMR cluster in place, consider replacing the AWS Glue jobs with Apache Spark jobs running within your EMR cluster to optimize your expenses.

IAM

  • The solution deploys an IAM role named DMSCDC_Execution_Role. The role is attached to AWS services and is associated with AWS managed policies as well as an inline policy.
  • The AssumeRolePolicyDocument trust document for the role includes the following policies, which attach to the AWS Glue and AWS DMS services to ensure that the jobs have the necessary permissions to execute. AWS CloudFormation custom resources also use this role, backed by AWS Lambda, to initialize the environment.
       Principal :
         Service :
           - lambda.amazonaws.com
           - glue.amazonaws.com
           - dms.amazonaws.com
       Action :
         - sts:AssumeRole
    

  • The IAM role includes the following AWS managed policies. For more information, see Managed Policies and Inline Policies.
    ManagedPolicyArns:
         - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole
         - arn:aws:iam::aws:policy/AmazonS3FullAccess
         - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole

  • The IAM role includes the following inline policy. This policy includes permissions to execute the Lambda-backed AWS CloudFormation custom resources, initialize and manage the DynamoDB table, and initialize the DMS replication task.
       Action:
         - lambda:InvokeFunction
         - dynamodb:PutItem
         - dynamodb:CreateTable
         - dynamodb:UpdateItem
         - dynamodb:UpdateTable
         - dynamodb:GetItem
         - dynamodb:DescribeTable
         - iam:GetRole
         - iam:PassRole
         - dms:StartReplicationTask
         - dms:TestConnection
         - dms:StopReplicationTask
       Resource:
         - arn:aws:dynamodb:${AWS::Region}:${AWS::Account}:table/DMSCDC_*
         - arn:aws:lambda:${AWS::Region}:${AWS::Account}:function:DMSCDC_*
         - arn:aws:iam::${AWS::Account}:role/DMSCDC_*
         - arn:aws:dms:${AWS::Region}:${AWS::Account}:*:*"
       Action:
         - dms:DescribeConnections
         - dms:DescribeReplicationTasks
       Resource: '*'

Sample database

The following example illustrates what you see after deploying this solution using a sample database.

The sample database includes three tables: product, store, and productorder. After deploying the AWS CloudFormation templates, you should see a folder created for each table in your raw S3 bucket.

Each folder contains an initial load file.

The table list populates the DynamoDB table.

Set the active flag, primary key, and partition key values for these tables. In this example, I set the primary key for the product and store tables to ensure it processes the updates. I leave the primary key for the productorder table alone, because I do not expect update transactions. However, I set the partition key to ensure it partitions data by date.

When the next scheduled AWS Glue job runs, it creates a folder for each table in your data lake S3 bucket.

When the next scheduled AWS Glue crawler runs, your AWS Glue Data Catalog lists these tables. You can now query them using Athena.

Similarly, you can query the data lake from within your Amazon Redshift cluster after first cataloging the external database.

On subsequent AWS Glue job runs, the process compares the timestamp of the initial file with the “LastFullLoadDate” field in the DynamoDB table to determine if it should process the initial file again. It also compares the new incremental file names with the “LastIncrementalFile” field in the DynamoDB table to determine if it should process any incremental files. In the following example, it created a new incremental file for the product table.

Examining the file shows two transactions: an update and a delete.

When the AWS Glue job runs again, the DynamoDB table updates to list a new value for the “LastIncrementalFile.”

Finally, the solution reprocesses the parquet file. You can query the data to see the new values for the updated record and ensure that it removes the deleted record.

Summary

In this post, I provided a set of AWS CloudFormation templates that allow you to quickly and easily sync transactional databases with your AWS data lake. With data in your AWS data lake, you can perform analysis on data from multiple data sources, build machine learning models, and produce rich analytics for your data consumers.

If you have questions or suggestions, please comment below.

 


About the Author

Rajiv Gupta is a data warehouse specialist solutions architect with Amazon Web Services.

 

 

 

 

Detect fraudulent calls using Amazon QuickSight ML insights

Post Syndicated from Guy Ben Baruch original https://aws.amazon.com/blogs/big-data/detect-fraudulent-calls-using-amazon-quicksight-ml-insights/

The financial impact of fraud in any industry is massive. According to the Financial Times article Fraud Costs Telecoms Industry $17bn a Year (paid subscription required), fraud costs the telecommunications industry $17 billion in lost revenues every year.

Fraudsters constantly look for new technologies and devise new techniques. This changes fraud patterns and makes detection difficult. Companies commonly combat this with a rules-based fraud detection system. However, once the fraudsters realize their current techniques or tools are being identified, they quickly find a way around it. Also, rules-based detection systems tend to struggle and slow down with a lot of data. This makes it difficult to detect fraud and act quickly, resulting in loss of revenue.

Overview

There are several AWS services that implement anomaly detection and could be used to combat fraud, but lets focus on the following three:

When trying to detect fraud, there are two high-level challenges:

  • Scale – The amount of data to be analyzed. For example, each call generates a call detail record (CDR) event. These CDRs include many pieces of information such as originating and terminating phone numbers, and duration of call. Multiply these CDR events times the number of telephone calls placed each day and you can get an idea of the scale that operators must manage.
  • Machine learning knowledge and skill – The right set of skills to help solve business problems with machine learning. Developing these skills or hiring qualified data scientists with adequate domain knowledge is not simple.

Introducing Amazon QuickSight ML Insights

Amazon QuickSight is a fast, cloud-powered BI service that makes it easy for everyone in an organization to get business insights from their data through rich, interactive dashboards. With pay-per-session pricing and a dashboard that can be embedded into your applications, BI is now even more cost-effective and accessible to everyone.

However, as the volume of data that customers generate grows daily, it’s becoming more challenging to harness their data for business insights. This is where machine learning comes in. Amazon is a pioneer in using machine learning to automate and scale various aspects of business analytics in the supply chain, marketing, retail, and finance.

ML Insights integrates proven Amazon technologies into Amazon QuickSight to provide customers with ML-powered insights beyond visualizations.

  • ML-powered anomaly detection to help customers uncover hidden insights by continuously analyzing across billions of data points.
  • ML-powered forecasting and what-if analysis to predict key business metrics with point-and-click simplicity.
  • Auto-narratives to help customers tell the story of their dashboard in a plain-language narrative.

In this post, I demonstrate how a Telecom provider with little to no ML expertise can use Amazon QuickSight ML capabilities to detect fraudulent calls.

Prerequisites

To implement this solution, you need the following resources:

  • Amazon S3 to stage a ‘ribbon’ call detail record sample in a CSV format.
  • AWS Glue running an ETL job in PySpark.
  • AWS Glue crawlers to discover the schema of the tables and update the AWS Glue Data Catalog.
  • Amazon Athena to query the Amazon QuickSight dataset.
  • Amazon QuickSight to build visualizations and perform anomaly detection using ML Insights.

Diagram of fraudulent call-detecting architecture, using a PySpark script to prepare the data and transform it into Parquet and an AWS Glue crawler to build the AWS Glue Data Catalog.

The dataset

For this post, I use a synthetic dataset, thanks to Ribbon Communications. The data was generated by call test generators, and is not customer or sensitive data.

Inspecting the data

The example below is a typical CDR. The STOP CDR shown below is generated after a call has been terminated.


As you can see, there are a lot of values here. Most of them are not relevant for fraud identification or prevention.

Revenue shared fraud

Revenue shared fraud is one of the most common fraud schemes threatening the telecom industry today. It involves using fraudulent or stolen numbers to repeatedly call a premium rate B-number, who then shares the cash generated with the fraudster.

Say that you’d like to detect national and international revenue share fraud using Amazon QuickSight ML. Consider the typical traits of a revenue share fraud phone call. The pattern for revenue share fraud is multiple A-numbers calling the same B-number or a range of B-numbers with the same prefix. The call duration is usually higher than average and could be up to two hours, which is the maximum length of time international switches allow. Generally, the calls originate from one cell or a group of cells.

One SIM may make short test calls to a variety of B-numbers as a precursor to the fraud itself, which most often happens when the risk of detection is lowest, for example, Friday night, weekends, or holidays. Conference calling may be used to make several concurrent calls from one A-number.

Often, SIMs used for this type of fraud are sold or activated in bulk from the same distributor or group of distributors. SIMs could be topped up using fraudulent online or IVR payments, such as using stolen credit card numbers. Both PAYG credit and bundles may be used.Based on the above use case, the following pieces of information are most relevant to detecting fraud.

  • Call duration
  • Calling number (A number)
  • Called number (B number)
  • Start time of the call
  • Accounting ID

You can use this reference to help identify those fields in a CDR.

Figure 2: Decoded CDR data, highlighting the relevant fields.

I identified the columns that I need out of 235 columns in the CDR.

Inspecting the raw sample data, I quickly see that it’s missing a header.

To make life easier, I converted the raw CSV data, added the column names, and converted to Parquet.

Discovering the data

In the AWS Glue console, set up a crawler and name it CDR_CRAWLER.

Point the crawler to s3://telco-dest-bucket/blog where the Parquet CDR data resides.

Next, create a new IAM role to be used by the AWS Glue crawler.

For Frequency, leave the default definition of Run on Demand.

Next, choose Add database and define the name of the database. This database contains the table discovered by the AWS Glue crawler.

Choose next and review the crawler settings. When you’re satisfied, choose Finish.

Next, choose Crawlers, select the crawler that you just created (CDR_CRAWLER), and choose Run crawler.

The AWS Glue crawler starts crawling the database. This can take one minute or more to complete.

When it’s complete, under Data catalog, choose Databases.  You should be able to see the new database created by the AWS Glue crawler. In this case, the name of the database is blog.

To view the tables created under this database, select the relevant database and choose Tables. The crawler’s table also points to the location of the Parquet format CDRs.

To see the table’s schema, select the table created by the crawler.

Data preparation

You have defined the relevant dimensions to use in the ML model to detect fraud. Now, you can use a PySpark script that I built earlier using an Amazon SageMaker notebook and an AWS Glue endpoint. The script covers the following tasks:

  • Reduce the dataset and focus only on the relevant columns.
  • Create a timestamp column, which you need for creating an analysis using Amazon QuickSight.
  • Transform files from CSV to Parquet for improved performance.

You can run the PySpark script on the raw CSV format of the CDRs that you are using. Here is the location of the raw CSV format:

s3:/telco-source-bucket/machine-learning-for-all/v1.0.0/data/cdr-stop/cdr_stop.csv

Here is the PySpark script that I created.

import sys    
from awsglue.transforms import *    
from awsglue.utils import getResolvedOptions    
from pyspark.context import SparkContext    
from awsglue.context import GlueContext    
from awsglue.job import Job    
import pyspark.sql.functions as fn    
from awsglue.dynamicframe import DynamicFrame    
    
    
sc = SparkContext.getOrCreate()    
glueContext = GlueContext(sc)    
spark = glueContext.spark_session    
    
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "demo_ml", table_name = "cdr_stop_csv", transformation_ctx = "datasource0")    
#apply mapping from source table to destination , we pick only the relevant columns     
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col2", "string", "Accounting_ID", "string"), ("col13", "long", "Call_service_duration", "long"), ("col5", "string", "Start_Time_(MM/DD/YYYY)", "string"), ("col6", "string", "Start_Time_(HH/MM/SSs)", "string"), ("col19", "long", "Calling number", "string"), ("col20", "long", "Called number", "string")], transformation_ctx = "applymapping1")    
    
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")    
resolvechoice2.printSchema()    
    
resolvechoice3 = ResolveChoice.apply(frame = resolvechoice2, choice = "MATCH_CATALOG", database = "demo_ml", table_name = "cdr_stop_csv", transformation_ctx = "resolvechoice3")    
resolvechoice3.printSchema()    
    
customDF = resolvechoice3.toDF()    
#create timestamp column    
customDF = customDF.withColumn('timestamp', fn.concat(fn.col("Start_Time_(MM/DD/YYYY)"),fn.lit(" "),fn.col("Start_Time_(HH/MM/SSs)")))    
    
#create timestamp2 column which is a substring of timestamp column    
customDF = customDF.withColumn('timestamp2',fn.substring(fn.col("timestamp"),1,19))    
#create Date column     
customDF =customDF.withColumn("Date",fn.unix_timestamp(fn.col("timestamp2"),"MM/dd/yyyy HH:mm:ss").cast("timestamp"))    
    
#remove temporary fields     
customDF = customDF.drop('timestamp','timestamp2')    
    
customDynamicFrame = DynamicFrame.fromDF(customDF, glueContext, "customDF_df")    
#transform to parquet format and land in S3 path    
datasink4 = glueContext.write_dynamic_frame.from_options(frame = customDynamicFrame, connection_type = "s3", connection_options = {"path": "s3://telco-dest-bucket/blog/"}, format = "parquet", transformation_ctx = "datasink4")    

The dataset has been cataloged in AWS Glue Data Catalog and is queryable using Athena.

Amazon QuickSight and anomaly detection

Next, build out anomaly detection using Amazon QuickSight. To get started, follow these steps.

  1. In the Amazon QuickSight console, choose new analysis.
  2. click on create new data set
  3. select Athena
  4. enter a data source name
  5. click on create data source
  6. select from the drop down list the relevant database and table that were created by the AWS Glue crawlers and click on select
  7. select directly query your data and click visualize

Visualizing the data using Amazon QuickSight

  1. Under visual types, choose Line chart.
  2. Drag call_service_duration to the Value field well.
  3. Drag timestamp_new to the X axis field well.

Amazon QuickSight generates a dashboard, as in the following screenshot.

The x-axis is the timestamp. By default, it’s based on the aggregates of one day. This can be changed by choosing a different value.

Because I currently define the timestamp to look on one-day aggregations, the call duration is a sum of all call durations from all call records within a day. I can begin the search by looking for days where the total call duration is high.

Anomaly detection

Now look at how to start using the ML insights anomaly detection feature.

  1. On the top of the Insights panel, choose Add anomaly to sheet. This creates an insights visual for anomaly detection.
  2. On the top of the screen, choose Field Wells and add at least one field to the Categories, as in the following example. I added the calling/called number, as those become relevant for fraud use cases; for example, one A-number calling multiple B-numbers or multiple A-numbers calling B-numbers.
    The categories represent the dimensional values by which Amazon QuickSight splits the metric. For example, you can analyze anomalies on sales across all product categories and product SKUs—assuming there are 10 product categories, each with 10 product SKUs. Amazon QuickSight splits the metric by the 100 unique combinations and runs anomaly detection on each of the split metric.
  3. To configure the anomaly detection job, choose Get Started.
  4. On the anomaly detection configuration screen, set up the following options:
  • Analyze all combinations of these categories—By default, if you have selected three categories, Amazon QuickSight runs anomaly detection on the following combinations hierarchically: A, AB, ABC. If you select this option, QuickSight analyzes all combinations: A, AB, ABC, BC, AC. If your data is not hierarchical, check this option.
  • Schedule—Set this option to run anomaly detection on your data hourly, daily, weekly, or monthly, depending on your data and needs. For Start schedule on and Timezone, enter values and choose OK.Important: The schedule does not take effect until you publish the analysis as a dashboard. Within the analysis, you have the option to run the anomaly detection manually (without the schedule).Contribution analysis on anomaly – You can select up to four additional dimensions for Amazon QuickSight to analyze the top contributors when an anomaly is detected. For example, Amazon QuickSight can show you the top customers that contributed to a spike in sale. In my current example, I added one additional dimension: the accounting ID. If you think about a telecom fraud case, you can also consider fields like charging time or cell ID as additional dimensions.
  1. After setting the configuration, choose Run Now to execute the job manually, which includes the “Detecting anomalies… This may take a while…” message. Depending on the size of your dataset, this may take a few minutes or up to an hour.
  2. When the anomaly detection job is complete, any anomalies are called out in the insights visual. By default, only the top anomalies for the latest time period in the data are shown in the insights visuals.

    Anomaly detection reveals several B numbers being called from multiple A numbers with a high call service duration on August 29, 2018. That looks interesting!
  3. To explore all anomalies for this insight, select the menu on the top-right corner of the visual and choose Explore Anomalies.
  4. On the Anomalies detailed page, you can see all the anomalies for the latest period.
    In the view, you can see that two anomalies were detected, showing two time series.The title of the visuals represents the metric that is run on the unique combination of the categorical fields. In this case:
  • [All] | 9645000024
  • 3512000024 | [ALL]So the system detected anomalies for multiple A-numbers calling 9645000024, and 351200024 calling multiple B numbers. In both cases, it observed a high call duration. The labeled data point on the chart represents the most recent anomaly that is detected for that time series.
  1. To expose a date picker, choose show anomalies by date at the top-right corner. This chart shows the number of anomalies that were detected for each day (or hour, depending on your anomaly detection configuration). You can select a particular day to see the anomalies detected for that day.For example, selecting August 10, 2018 on the top chart shows the anomalies for that day:

    Important:
    The first 32 points in the dataset are used for training and are not scored by the anomaly detection algorithm. You may not see any anomalies on the first 32 data points.You can expand the filter controls on the top of the screen. With the filter controls, you can change the anomaly threshold to show high, medium, or low significance anomalies. You can choose to show only anomalies that are higher than expected or lower than expected. You can also filter by the categorical values that are present in your dataset to look at anomalies only for those categories.
  2. Look at the contributors columns. When you configured the anomaly detection, you defined the accounting ID as another dimension. If this were real call traffic instead of practice data, you would be able to single out specific accounting IDs that contribute to the anomaly.
  3. When you’re done, choose Back to analysis.

Summary

In this post, I explored a common fraud pattern called shared revenue fraud. I looked at how to extract the relevant data for training the anomaly detection model in Amazon QuickSight. I then used this data to detect anomalies based on call duration, calling party, and called party, looking at additional contributors like Accounting ID. The entire process used serverless technologies and little to no machine learning experience.

For more information about options and strategies, see Amazon QuickSight Announces General Availability of ML Insights.

If you have questions or suggestions, please comment below.

 


About the Author

Guy Ben Baruch is a solutions architect with Amazon Web Services.

 

 

 

How to export an Amazon DynamoDB table to Amazon S3 using AWS Step Functions and AWS Glue

Post Syndicated from Joe Feeney original https://aws.amazon.com/blogs/big-data/how-to-export-an-amazon-dynamodb-table-to-amazon-s3-using-aws-step-functions-and-aws-glue/

In typical AWS fashion, not a week had gone by after I published How Goodreads offloads Amazon DynamoDB tables to Amazon S3 and queries them using Amazon Athena on the AWS Big Data blog when the AWS Glue team released the ability for AWS Glue crawlers and AWS Glue ETL jobs to read from DynamoDB tables natively. I was actually pretty excited about this. Less code means fewer bugs. The original architecture had been around for at least 18 months and could be simplified significantly with a little bit of work.

Refactoring the data pipeline

The AWS Data Pipeline architecture outlined in my previous blog post is just under two years old now. We had used data pipelines as a way to back up Amazon DynamoDB data to Amazon S3 in case of a catastrophic developer error. However, with DynamoDB point-in-time recovery we have a better, native mechanism for disaster recovery. Additionally, with data pipelines we still own the operations associated with the clusters themselves, even if they are transient. A common challenge is keeping our clusters up with recent releases of Amazon EMR to help mitigate any outstanding bugs. Another is the inefficiency of needing to spin up an EMR cluster for each DynamoDB table.

I decided to take a step back and list the capabilities I wanted to have in the next iteration:

  • Export tables using AWS Glue instead of EMR.
    • AWS Glue provides a serverless ETL environment where I don’t have to worry about the underlying infrastructure. This minimizes operational tasks like keeping up with the EMR release tags.
  • Use a workflow solution that works across services like AWS Glue and Amazon Athena.
    • In the first iteration, the workflow was spread across various services. Unless you had the entire pipeline in your head, it was difficult to get a bird’s-eye view of how the pipeline was progressing.
  • Ability to select different formats.
    • For data engineering, I prefer Apache Parquet. However, customers might prefer a different format.
  • Add exported data to Athena.
    • I find that the easier it is for the data to be queried, the more likely it’s used.

Architecture overview

At a high level, this is the architecture:

  • We’re using AWS Step Functions as the workflow engine.
    • Each step is either a built-in Step Functions state, a service integration, or a simple Python AWS Lambda For example, GlueStartJobRun is using the synchronous job run service integration, as discussed in the documentation.
    • We get a visual representation of the entire pipeline.
    • It’s quick to onboard new developers.
  • An event in Amazon CloudWatch Events, which is disabled to start, triggers a Step Functions state machine with a JSON payload that contains the following:
    • AWS Glue job name
    • Export destination
    • DynamoDB table name
    • Desired read percentage
    • AWS Glue crawler name
  • AWS Glue exports a DynamoDB table in your preferred format to S3 as snapshots_your_table_name. The data is partitioned by the snapshot_timestamp
  • An AWS Glue crawler adds or updates your data’s schema and partitions in the AWS Glue Data Catalog.
  • Finally, we create an Athena view that only has data from the latest export snapshot.

A simple AWS Glue ETL job

The script that I created accepts AWS Glue ETL job arguments for the table name, read throughput, output, and format. Behind the scenes, AWS Glue scans the DynamoDB table. AWS Glue makes sure that every top-level attribute makes it into the schema, no matter how sparse your attributes are (as discussed in the DynamoDB documentation).

Here’s the script:

import sys
import datetime
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext

ARG_TABLE_NAME = "table_name"
ARG_READ_PERCENT = "read_percentage"
ARG_OUTPUT = "output_prefix"
ARG_FORMAT = "output_format"

PARTITION = "snapshot_timestamp"

args = getResolvedOptions(sys.argv,
  [
    'JOB_NAME',
    ARG_TABLE_NAME,
    ARG_READ_PERCENT,
    ARG_OUTPUT,
    ARG_FORMAT
  ]
)

table_name = args[ARG_TABLE_NAME]
read = args[ARG_READ_PERCENT]
output_prefix = args[ARG_OUTPUT]
fmt = args[ARG_FORMAT]

print("Table name:", table_name)
print("Read percentage:", read)
print("Output prefix:", output_prefix)
print("Format:", fmt)

date_str = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M')
output = "%s/%s=%s" % (output_prefix, PARTITION, date_str)

sc = SparkContext()
glueContext = GlueContext(sc)

table = glueContext.create_dynamic_frame.from_options(
  "dynamodb",
  connection_options={
    "dynamodb.input.tableName": table_name,
    "dynamodb.throughput.read.percent": read
  }
)

glueContext.write_dynamic_frame.from_options(
  frame=table,
  connection_type="s3",
  connection_options={
    "path": output
  },
  format=fmt,
  transformation_ctx="datasink"
)

There’s not a lot here. We’re creating a DynamicFrameReader of connection type dynamodb and passing in the table name and desired maximum read throughput consumption. We pass that data frame to a DynamicFrameWriter that writes the table to S3 in the specified format.

Athena views

Most teams at Amazon own applications that have multiple DynamoDB tables, including my own team. Our current application uses five primary tables. Ideally, at the end of an export workflow you can write simple, obvious queries across a consistent view of your tables. However, each exported table is partitioned by the timestamp from when the table was exported. This makes querying across one or more tables very cumbersome, because you have to add a WHERE snapshot_timestamp = clause to every table reference in your query. Additionally, each table might have a different snapshot_timestamp value for any given day!

The final step in this export workflow creates an Athena view that adds that WHERE clause for you. This means that you can interact with your DynamoDB exports as if they were one sane view of your exported DynamoDB tables.

Setting up the infrastructure

The AWS CloudFormation stacks I create are split into two stacks. The common stack contains shared infrastructure, and you need only one of these per AWS Region. The table stacks are designed in such a way that you can create one per table-format combination in any given AWS Region. It contains the CloudWatch event logic and AWS Glue components needed to export and transform DynamoDB tables.

Creating the common stack

The common stack contains the majority of the infrastructure. That includes the Step Functions state machine and Lambda functions to trigger and check the state of asynchronous jobs. It also includes IAM roles that the export stacks use, and the S3 bucket to store the exports.

To create the common stack, do the following:

  1. Choose this Launch Stack
  2. Choose I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create Stack.

Creating the table export stack

If you don’t have a DynamoDB table to export, follow the original blog post. Start with the Working with the Reviews stack section and continue until you’ve added the two Items to the table. Otherwise, feel free to point this CloudFormation stack at your favorite DynamoDB table that is using provisioned throughput. Tables that use on-demand throughput are not currently supported.

Because so much of this architecture is shareable, there’s not much in the table export stack. This stack defines the CloudWatch event used to trigger the Step Functions state machine with a JSON payload containing all the necessary metadata. Additionally, it contains the AWS Glue ETL job that exports the table and the AWS Glue Crawler that updates metadata in the AWS Glue Data Catalog.

Technically, you can define the AWS Glue ETL job in the common stack because it’s already parameterized. However, the default limit for concurrent runs for an AWS Glue job is three. This is a soft limit, but with this architecture you have headroom to export up to 25 tables before asking for a limit increase.

To create the table export stack, do the following:

  1. Choose this Launch Stack
  2. Choose an output format from the list. All the available formats are supported by Athena natively.
  3. Enter your DynamoDB table name.
  4. Enter the percentage of Read Capacity Units (RCUs) that the job should consume from your table’s currently provisioned throughput. This percentage is expressed as a float between 0.1 and 1.0 inclusive. The default is 0.25 (25 percent).

As an example: Suppose that your table’s RCUs are set to 100 and you use the default 0.25, 25 percent. Then the AWS Glue job consume 25 RCUs while running.

  1. Choose Create.

Kicking off a state machine execution

To demonstrate how this works, we run the DynamoDB export state machine manually by passing it the JSON payload that the CloudWatch event would pass to Step Functions.

Getting the JSON payload from CloudWatch Events

To get the JSON payload, do the following:

  1. Open CloudWatch in the AWS Management Console.
  2. In the left column under Events, choose Rules.
  3. Choose your rule from the list. It is prefixed by AWSBigDataBlog-.
  4. For Actions, choose Edit.
  5. Copy the JSON payload from the Configure input section of Targets.
  6. Choose Cancel to exit edit mode.

Starting a state machine execution

To start an execution of the state machine, take the following steps:

  1. Open Step Functions in the console.
  2. Choose the DynamoDBExportAndAthenaLoad state machine.
  3. Choose Start execution.
  4. Paste the JSON payload into the Input
  5. Choose Start execution.

There are a few ways to follow along with the execution. As steps are entered and exited, entries are added to the Execution event history list. This is a great way to see what state (event in Lambda speak) is passed to each step, in case you need to debug.

You can also expand the Visual workflow. It’s a great high-level view to see how the workflow is progressing.

After the workflow is finished, you see two new tables under the dynamodb_exports database in your AWS Glue Data Catalog. Your DynamoDB snapshots table name is prefixed with snapshots_. The schema is formatted for the AWS Glue Data Catalog (lowercase and hyphens transformed to underscores). You also have a view table with the same table name formatted for AWS Glue Data Catalog but without the snapshots_ prefix.

Querying your data

To showcase how having a separate view table of the most recent snapshot of a table is useful, I use the Reviews table from the previous blog post. The table has two items. I have also run the export workflow twice. As you can see when you preview the table, there are four items total. That’s because each snapshot contains two items.

From the items, the latest snapshot_timestamp is 2019-01-11T23:26. When I run the same preview query against the view table reviews, we see that there are only two items, which is what we expect. The view takes care of specifying the where snapshot_timestamp=… clause so you don’t have to.

Wrapping up

In this post, I showed you how to use AWS Glue’s DynamoDB integration and AWS Step Functions to create a workflow to export your DynamoDB tables to S3 in Parquet. I also show how to create an Athena view for each table’s latest snapshot, giving you a consistent view of your DynamoDB table exports.


About the Author

Joe Feeney is a Software Engineer at Amazon Go, where he does secret stuff and he’s quite chuffed with that. He enjoys embarrassing his family by taking Mario Kart entirely too seriously.

 

 

 

Trigger cross-region replication of pre-existing objects using Amazon S3 inventory, Amazon EMR, and Amazon Athena

Post Syndicated from Michael Sambol original https://aws.amazon.com/blogs/big-data/trigger-cross-region-replication-of-pre-existing-objects-using-amazon-s3-inventory-amazon-emr-and-amazon-athena/

In Amazon Simple Storage Service (Amazon S3), you can use cross-region replication (CRR) to copy objects automatically and asynchronously across buckets in different AWS Regions. CRR is a bucket-level configuration, and it can help you meet compliance requirements and minimize latency by keeping copies of your data in different Regions. CRR replicates all objects in the source bucket, or optionally a subset, controlled by prefix and tags.

Objects that exist before you enable CRR (pre-existing objects) are not replicated. Similarly, objects might fail to replicate (failed objects) if permissions aren’t in place, either on the IAM role used for replication or the bucket policy (if the buckets are in different AWS accounts).

In our work with customers, we have seen situations where large numbers of objects aren’t replicated for the previously mentioned reasons. In this post, we show you how to trigger cross-region replication for pre-existing and failed objects.

Methodology

At a high level, our strategy is to perform a copy-in-place operation on pre-existing and failed objects. This operation uses the Amazon S3 API to copy the objects over the top of themselves, preserving tags, access control lists (ACLs), metadata, and encryption keys. The operation also resets the Replication_Status flag on the objects. This triggers cross-region replication, which then copies the objects to the destination bucket.

To accomplish this, we use the following:

  • Amazon S3 inventory to identify objects to copy in place. These objects don’t have a replication status, or they have a status of FAILED.
  • Amazon Athena and AWS Glue to expose the S3 inventory files as a table.
  • Amazon EMR to execute an Apache Spark job that queries the AWS Glue table and performs the copy-in-place operation.

Object filtering

To reduce the size of the problem (we’ve seen buckets with billions of objects!) and eliminate S3 List operations, we use Amazon S3 inventory. S3 inventory is enabled at the bucket level, and it provides a report of S3 objects. The inventory files contain the objects’ replication status: PENDING, COMPLETED, FAILED, or REPLICA. Pre-existing objects do not have a replication status in the inventory.

Interactive analysis

To simplify working with the files that are created by S3 inventory, we create a table in the AWS Glue Data Catalog. You can query this table using Amazon Athena and analyze the objects.  You can also use this table in the Spark job running on Amazon EMR to identify the objects to copy in place.

Copy-in-place execution

We use a Spark job running on Amazon EMR to perform concurrent copy-in-place operations of the S3 objects. This step allows the number of simultaneous copy operations to be scaled up. This improves performance on a large number of objects compared to doing the copy operations consecutively with a single-threaded application.

Account setup

For the purpose of this example, we created three S3 buckets. The buckets are specific to our demonstration. If you’re following along, you need to create your own buckets (with different names).

We’re using a source bucket named crr-preexisting-demo-source and a destination bucket named crr-preexisting-demo-destination. The source bucket contains the pre-existing objects and the objects with the replication status of FAILED. We store the S3 inventory files in a third bucket named crr-preexisting-demo-inventory.

The following diagram illustrates the basic setup.

You can use any bucket to store the inventory, but the bucket policy must include the following statement (change Resource and aws:SourceAccount to match yours).

{
    "Version": "2012-10-17",
    "Id": "S3InventoryPolicy",
    "Statement": [
        {
            "Sid": "S3InventoryStatement",
            "Effect": "Allow",
            "Principal": {
                "Service": "s3.amazonaws.com"
            },
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::crr-preexisting-demo-inventory/*",
            "Condition": {
                "StringEquals": {
                    "s3:x-amz-acl": "bucket-owner-full-control",
                    "aws:SourceAccount": "111111111111"
                }
            }
        }
    ]
}

In our example, we uploaded six objects to crr-preexisting-demo-source. We added three objects (preexisting-*.txt) before CRR was enabled. We also added three objects (failed-*.txt) after permissions were removed from the CRR IAM role, causing CRR to fail.

Enable S3 inventory

You need to enable S3 inventory on the source bucket. You can do this on the Amazon S3 console as follows:

On the Management tab for the source bucket, choose Inventory.

Choose Add new, and complete the settings as shown, choosing the CSV format and selecting the Replication status check box. For detailed instructions for creating an inventory, see How Do I Configure Amazon S3 Inventory? in the Amazon S3 Console User Guide.

After enabling S3 inventory, you need to wait for the inventory files to be delivered. It can take up to 48 hours to deliver the first report. If you’re following the demo, ensure that the inventory report is delivered before proceeding.

Here’s what our example inventory file looks like:

You can also look on the S3 console on the objects’ Overview tab. The pre-existing objects do not have a replication status, but the failed objects show the following:

Register the table in the AWS Glue Data Catalog using Amazon Athena

To be able to query the inventory files using SQL, first you need to create an external table in the AWS Glue Data Catalog. Open the Amazon Athena console at https://console.aws.amazon.com/athena/home.

On the Query Editor tab, run the following SQL statement. This statement registers the external table in the AWS Glue Data Catalog.

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo (
    `bucket` string,
    key string,
    replication_status string
)
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    ESCAPED BY '\\'
    LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION 's3://crr-preexisting-demo-inventory/crr-preexisting-demo-source/crr-preexisting-demo/hive';

After creating the table, you need to make the AWS Glue Data Catalog aware of any existing data and partitions by adding partition metadata to the table. To do this, you use the Metastore Consistency Check utility to scan for and add partition metadata to the AWS Glue Data Catalog.

MSCK REPAIR TABLE crr_preexisting_demo;

To learn more about why this is required, see the documentation on MSCK REPAIR TABLE and data partitioning in the Amazon Athena User Guide.

Now that the table and partitions are registered in the Data Catalog, you can query the inventory files with Amazon Athena.

SELECT * FROM crr_preexisting_demo where dt='2019-02-24-04-00';

The results of the query are as follows.

The query returns all rows in the S3 inventory for a specific delivery date. You’re now ready to launch an EMR cluster to copy in place the pre-existing and failed objects.

Note: If your goal is to fix FAILED objects, make sure that you correct what caused the failure (IAM permissions or S3 bucket policies) before proceeding to the next step.

Create an EMR cluster to copy objects

To parallelize the copy-in-place operations, run a Spark job on Amazon EMR. To facilitate EMR cluster creation and EMR step submission, we wrote a bash script (available in this GitHub repository).

To run the script, clone the GitHub repo. Then launch the EMR cluster as follows:

$ git clone https://github.com/aws-samples/amazon-s3-crr-preexisting-objects
$ ./launch emr.sh

Note: Running the bash script results in AWS charges. By default, it creates two Amazon EC2 instances, one m4.xlarge and one m4.2xlarge. Auto-termination is enabled so when the cluster is finished with the in-place copies, it terminates.

The script performs the following tasks:

  1. Creates the default EMR roles (EMR_EC2_DefaultRole and EMR_DefaultRole).
  2. Uploads the files used for bootstrap actions and steps to Amazon S3 (we use crr-preexisting-demo-inventory to store these files).
  3. Creates an EMR cluster with Apache Spark installed using the create-cluster

After the cluster is provisioned:

  1. A bootstrap action installs boto3 and awscli.
  2. Two steps execute, copying the Spark application to the master node and then running the application.

The following are highlights from the Spark application. You can find the complete code for this example in the amazon-s3-crr-preexisting-objects repo on GitHub.

Here we select records from the table registered with the AWS Glue Data Catalog, filtering for objects with a replication_status of "FAILED" or “”.

query = """
        SELECT bucket, key
        FROM {}
        WHERE dt = '{}'
        AND (replication_status = '""'
        OR replication_status = '"FAILED"')
        """.format(inventory_table, inventory_date)

print('Query: {}'.format(query))

crr_failed = spark.sql(query)

We call the copy_object function for each key returned by the previous query.

def copy_object(self, bucket, key, copy_acls):
        dest_bucket = self._s3.Bucket(bucket)
        dest_obj = dest_bucket.Object(key)

        src_bucket = self._s3.Bucket(bucket)
        src_obj = src_bucket.Object(key)

        # Get the S3 Object's Storage Class, Metadata, 
        # and Server Side Encryption
        storage_class, metadata, sse_type, last_modified = \
            self._get_object_attributes(src_obj)

        # Update the Metadata so the copy will work
        metadata['forcedreplication'] = runtime

        # Get and copy the current ACL
        if copy_acls:
            src_acl = src_obj.Acl()
            src_acl.load()
            dest_acl = {
                'Grants': src_acl.grants,
                'Owner': src_acl.owner
            }

        params = {
            'CopySource': {
                'Bucket': bucket,
                'Key': key
            },
            'MetadataDirective': 'REPLACE',
            'TaggingDirective': 'COPY',
            'Metadata': metadata,
            'StorageClass': storage_class
        }

        # Set Server Side Encryption
        if sse_type == 'AES256':
            params['ServerSideEncryption'] = 'AES256'
        elif sse_type == 'aws:kms':
            kms_key = src_obj.ssekms_key_id
            params['ServerSideEncryption'] = 'aws:kms'
            params['SSEKMSKeyId'] = kms_key

        # Copy the S3 Object over the top of itself, 
        # with the Storage Class, updated Metadata, 
        # and Server Side Encryption
        result = dest_obj.copy_from(**params)

        # Put the ACL back on the Object
        if copy_acls:
            dest_obj.Acl().put(AccessControlPolicy=dest_acl)

        return {
            'CopyInPlace': 'TRUE',
            'LastModified': str(result['CopyObjectResult']['LastModified'])
        }

Note: The Spark application adds a forcedreplication key to the objects’ metadata. It does this because Amazon S3 doesn’t allow you to copy in place without changing the object or its metadata.

Verify the success of the EMR job by running a query in Amazon Athena

The Spark application outputs its results to S3. You can create another external table with Amazon Athena and register it with the AWS Glue Data Catalog. You can then query the table with Athena to ensure that the copy-in-place operation was successful.

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo_results (
  `bucket` string,
  key string,
  replication_status string,
  last_modified string
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
  STORED AS TEXTFILE
LOCATION 's3://crr-preexisting-demo-inventory/results';

SELECT * FROM crr_preexisting_demo_results;

The results appear as follows on the console.

Although this shows that the copy-in-place operation was successful, CRR still needs to replicate the objects. Subsequent inventory files show the objects’ replication status as COMPLETED. You can also verify on the console that preexisting-*.txt and failed-*.txt are COMPLETED.

It is worth noting that because CRR requires versioned buckets, the copy-in-place operation produces another version of the objects. You can use S3 lifecycle policies to manage noncurrent versions.

Conclusion

In this post, we showed how to use Amazon S3 inventory, Amazon Athena, the AWS Glue Data Catalog, and Amazon EMR to perform copy-in-place operations on pre-existing and failed objects at scale.

Note: Amazon S3 batch operations is an alternative for copying objects. The difference is that S3 batch operations will not check each object’s existing properties and set object ACLs, storage class, and encryption on an object-by-object basis. For more information, see Introduction to Amazon S3 Batch Operations in the Amazon S3 Console User Guide.

 


About the Authors

Michael Sambol is a senior consultant at AWS. He holds an MS in computer science from Georgia Tech. Michael enjoys working out, playing tennis, traveling, and watching Western movies.

 

 

 

 

Chauncy McCaughey is a senior data architect at AWS. His current side project is using statistical analysis of driving habits and traffic patterns to understand how he always ends up in the slow lane.

 

 

 

Build and automate a serverless data lake using an AWS Glue trigger for the Data Catalog and ETL jobs

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/build-and-automate-a-serverless-data-lake-using-an-aws-glue-trigger-for-the-data-catalog-and-etl-jobs/

Today, data is flowing from everywhere, whether it is unstructured data from resources like IoT sensors, application logs, and clickstreams, or structured data from transaction applications, relational databases, and spreadsheets. Data has become a crucial part of every business. This has resulted in a need to maintain a single source of truth and automate the entire pipeline—from data ingestion to transformation and analytics— to extract value from the data quickly.

There is a growing concern over the complexity of data analysis as the data volume, velocity, and variety increases. The concern stems from the number and complexity of steps it takes to get data to a state that is usable by business users. Often data engineering teams spend most of their time on building and optimizing extract, transform, and load (ETL) pipelines. Automating the entire process can reduce the time to value and cost of operations. In this post, we describe how to create a fully automated data cataloging and ETL pipeline to transform your data.

Architecture

In this post, you learn how to build and automate the following architecture.

You build your serverless data lake with Amazon Simple Storage Service (Amazon S3) as the primary data store. Given the scalability and high availability of Amazon S3, it is best suited as the single source of truth for your data.

You can use various techniques to ingest and store data in Amazon S3. For example, you can use Amazon Kinesis Data Firehose to ingest streaming data. You can use AWS Database Migration Service (AWS DMS) to ingest relational data from existing databases. And you can use AWS DataSync to ingest files from an on-premises Network File System (NFS).

Ingested data lands in an Amazon S3 bucket that we refer to as the raw zone. To make that data available, you have to catalog its schema in the AWS Glue Data Catalog. You can do this using an AWS Lambda function invoked by an Amazon S3 trigger to start an AWS Glue crawler that catalogs the data. When the crawler is finished creating the table definition, you invoke a second Lambda function using an Amazon CloudWatch Events rule. This step starts an AWS Glue ETL job to process and output the data into another Amazon S3 bucket that we refer to as the processed zone.

The AWS Glue ETL job converts the data to Apache Parquet format and stores it in the processed S3 bucket. You can modify the ETL job to achieve other objectives, like more granular partitioning, compression, or enriching of the data. Monitoring and notification is an integral part of the automation process. So as soon as the ETL job finishes, another CloudWatch rule sends you an email notification using an Amazon Simple Notification Service (Amazon SNS) topic. This notification indicates that your data was successfully processed.

In summary, this pipeline classifies and transforms your data, sending you an email notification upon completion.

Deploy the automated data pipeline using AWS CloudFormation

First, you use AWS CloudFormation templates to create all of the necessary resources. This removes opportunities for manual error, increases efficiency, and ensures consistent configurations over time.

Launch the AWS CloudFormation template with the following Launch stack button.

Be sure to choose the US East (N. Virginia) Region (us-east-1). Then enter the appropriate stack name, email address, and AWS Glue crawler name to create the Data Catalog. Add the AWS Glue database name to save the metadata tables. Acknowledge the IAM resource creation as shown in the following screenshot, and choose Create.

Note: It is important to enter your valid email address so that you get a notification when the ETL job is finished.

This AWS CloudFormation template creates the following resources in your AWS account:

  • Two Amazon S3 buckets to store both the raw data and processed Parquet data.
  • Two AWS Lambda functions: one to create the AWS Glue Data Catalog and another function to publish topics to Amazon SNS.
  • An Amazon Simple Queue Service (Amazon SQS) queue for maintaining the retry logic.
  • An Amazon SNS topic to inform you that your data has been successfully processed.
  • Two CloudWatch Events rules: one rule on the AWS Glue crawler and another on the AWS Glue ETL job.
  • AWS Identity and Access Management (IAM) roles for accessing AWS Glue, Amazon SNS, Amazon SQS, and Amazon S3.

When the AWS CloudFormation stack is ready, check your email and confirm the SNS subscription. Choose the Resources tab and find the details.

Follow these steps to verify your email subscription so that you receive an email alert as soon as your ETL job finishes.

  1. On the Amazon SNS console, in the navigation pane, choose Topics. An SNS topic named SNSProcessedEvent appears in the display.

  1. Choose the ARN The topic details page appears, listing the email subscription as Pending confirmation. Be sure to confirm the subscription for your email address as provided in the Endpoint column.

If you don’t see an email address, or the link is showing as not valid in the email, choose the corresponding subscription endpoint. Then choose Request confirmation to confirm your subscription. Be sure to check your email junk folder for the request confirmation link.

Configure an Amazon S3 bucket event trigger

In this section, you configure a trigger on a raw S3 bucket. So when new data lands in the bucket, you trigger GlueTriggerLambda, which was created in the AWS CloudFormation deployment.

To configure notifications:

  1. Open the Amazon S3 console.
  2. Choose the source bucket. In this case, the bucket name contains raws3bucket, for example, <stackname>-raws3bucket-1k331rduk5aph.
  3. Go to the Properties tab, and under Advanced settings, choose Events.

  1. Choose Add notification and configure a notification with the following settings:
  • Name– Enter a name of your choice. In this example, it is crawlerlambdaTrigger.
  • Events– Select the All object create events check box to create the AWS Glue Data Catalog when you upload the file.
  • Send to– Choose Lambda function.
  • Lambda– Choose the Lambda function that was created in the deployment section. Your Lambda function should contain the string GlueTriggerLambda.

See the following screenshot for all the settings. When you’re finished, choose Save.

For more details on configuring events, see How Do I Enable and Configure Event Notifications for an S3 Bucket? in the Amazon S3 Console User Guide.

Download the dataset

For this post, you use a publicly available New York green taxi dataset in CSV format. You upload monthly data to your raw zone and perform automated data cataloging using an AWS Glue crawler. After cataloging, an automated AWS Glue ETL job triggers to transform the monthly green taxi data to Parquet format and store it in the processed zone.

You can download the raw dataset from the NYC Taxi & Limousine Commission trip record data site. Download the monthly green taxi dataset and upload only one month of data. For example, first upload only the green taxi January 2018 data to the raw S3 bucket.

Automate the Data Catalog with an AWS Glue crawler

One of the important aspects of a modern data lake is to catalog the available data so that it’s easily discoverable. To run ETL jobs or ad hoc queries against your data lake, you must first determine the schema of the data along with other metadata information like location, format, and size. An AWS Glue crawler makes this process easy.

After you upload the data into the raw zone, the Amazon S3 trigger that you created earlier in the post invokes the GlueTriggerLambdafunction. This function creates an AWS Glue Data Catalog that stores metadata information inferred from the data that was crawled.

Open the AWS Glue console. You should see the database, table, and crawler that were created using the AWS CloudFormation template. Your AWS Glue crawler should appear as follows.

Browse to the table using the left navigation, and you will see the table in the database that you created earlier.

Choose the table name, and further explore the metadata discovered by the crawler, as shown following.

You can also view the columns, data types, and other details.  In following screenshot, Glue Crawler has created schema from files available in Amazon S3 by determining column name and respective data type. You can use this schema to create external table.

Author ETL jobs with AWS Glue

AWS Glue provides a managed Apache Spark environment to run your ETL job without maintaining any infrastructure with a pay as you go model.

Open the AWS Glue console and choose Jobs under the ETL section to start authoring an AWS Glue ETL job. Give the job a name of your choice, and note the name because you’ll need it later. Choose the already created IAM role with the name containing <stackname>– GlueLabRole, as shown following. Keep the other default options.

AWS Glue generates the required Python or Scala code, which you can customize as per your data transformation needs. In the Advanced properties section, choose Enable in the Job bookmark list to avoid reprocessing old data.

On the next page, choose your raw Amazon S3 bucket as the data source, and choose Next. On the Data target page, choose the processed Amazon S3 bucket as the data target path, and choose Parquet as the Format.

On the next page, you can make schema changes as required, such as changing column names, dropping ones that you’re less interested in, or even changing data types. AWS Glue generates the ETL code accordingly.

Lastly, review your job parameters, and choose Save Job and Edit Script, as shown following.

On the next page, you can modify the script further as per your data transformation requirements. For this post, you can leave the script as is. In the next section, you automate the execution of this ETL job.

Automate ETL job execution

As the frequency of data ingestion increases, you will want to automate the ETL job to transform the data. Automating this process helps reduce operational overhead and free your data engineering team to focus on more critical tasks.

AWS Glue is optimized for processing data in batches. You can configure it to process data in batches on a set time interval. How often you run a job is determined by how recent the end user expects the data to be and the cost of processing. For information about the different methods, see Triggering Jobs in AWS Glue in the AWS Glue Developer Guide.

First, you need to make one-time changes and configure your ETL job name in the Lambda function and the CloudWatch Events rule. On the console, open the ETLJobLambda Lambda function, which was created using the AWS CloudFormation stack.

Choose the Lambda function link that appears, and explore the code. Change the JobName value to the ETL job name that you created in the previous step, and then choose Save.

As shown in in the following screenshot, you will see an AWS CloudWatch Events rule CrawlerEventRule that is associated with an AWS Lambda function. When the CloudWatch Events rule receives a success status, it triggers the ETLJobLambda Lambda function.

Now you are all set to trigger your AWS Glue ETL job as soon as you upload a file in the raw S3 bucket. Before testing your data pipeline, set up the monitoring and alerts.

Monitoring and notification with Amazon CloudWatch Events

Suppose that you want to receive a notification over email when your AWS Glue ETL job is completed. To achieve that, the CloudWatch Events rule OpsEventRule was deployed from the AWS CloudFormation template in the data pipeline deployment section. This CloudWatch Events rule monitors the status of the AWS Glue ETL job and sends an email notification using an SNS topic upon successful completion of the job.

As the following image shows, you configure your AWS Glue job name in the Event pattern section in CloudWatch. The event triggers an SNS topic configured as a target when the AWS Glue job state changes to SUCCEEDED. This SNS topic sends an email notification to the email address that you provided in the deployment section to receive notification.

Let’s make one-time configuration changes in the CloudWatch Events rule OpsEventRule to capture the status of the AWS Glue ETL job.

  1. Open the CloudWatch console.
  2. In the navigation pane, under Events, choose Rules. Choose the rule name that contains OpsEventRule, as shown following.

  1. In the upper-right corner, choose Actions, Edit.

  1. Replace Your-ETL-jobName with the ETL job name that you created in the previous step.

  1. Scroll down and choose Configure details. Then choose Update rule.

Now that you have set up an entire data pipeline in an automated way with the appropriate notifications and alerts, it’s time to test your pipeline. If you upload new monthly data to the raw Amazon S3 bucket (for example, upload the NY green taxi February 2018 CSV), it triggers the GlueTriggerLambda AWS Lambda function. You can navigate to the AWS Glue console, where you can see that the AWS Glue crawler is running.

Upon completion of the crawler, the CloudWatch Events rule CrawlerEventRule triggers your ETLJobLambda Lambda function. You can notice now that the AWS Glue ETL job is running.

When the ETL job is successful, the CloudWatch Events rule OpsEventRule sends an email notification to you using an Amazon SNS topic, as shown following, hence completing the automation cycle.

Be sure to check your processed Amazon S3 bucket, where you will find transformed data processed by your automated ETL pipeline. Now that the processed data is ready in Amazon S3, you need to run the AWS Glue crawler on this Amazon S3 location. The crawler creates a metadata table with the relevant schema in the AWS Glue Data Catalog.

After the Data Catalog table is created, you can execute standard SQL queries using Amazon Athena and visualize the data using Amazon QuickSight. To learn more, see the blog post Harmonize, Query, and Visualize Data from Various Providers using AWS Glue, Amazon Athena, and Amazon QuickSight

Conclusion

Having an automated serverless data lake architecture lessens the burden of managing data from its source to destination—including discovery, audit, monitoring, and data quality. With an automated data pipeline across organizations, you can identify relevant datasets and extract value much faster than before. The advantage of reducing the time to analysis is that businesses can analyze the data as it becomes available in real time. From the BI tools, queries return results much faster for a single dataset than for multiple databases.

Business analysts can now get their job done faster, and data engineering teams can free themselves from repetitive tasks. You can extend it further by loading your data into a data warehouse like Amazon Redshift or making it available for machine learning via Amazon SageMaker.

Additional resources

See the following resources for more information:

 


About the Author

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

 

 

 

Luis Lopez Soria is a partner solutions architect and serverless specialist working with global systems integrators. He works with AWS partners and customers to help them with adoption of the cloud operating model at a large scale. He enjoys doing sports in addition to traveling around the world exploring new foods and cultures.

 

 

 

Chirag Oswal is a partner solutions architect and AR/VR specialist working with global systems integrators. He works with AWS partners and customers to help them with adoption of the cloud operating model at a large scale. He enjoys video games and travel.

How to visualize Amazon GuardDuty findings: serverless edition

Post Syndicated from Ben Romano original https://aws.amazon.com/blogs/security/how-to-visualize-amazon-guardduty-findings-serverless-edition/

Note: This blog provides an alternate solution to Visualizing Amazon GuardDuty Findings, in which the authors describe how to build an Amazon Elasticsearch Service-powered Kibana dashboard to ingest and visualize Amazon GuardDuty findings.

Amazon GuardDuty is a managed threat detection service powered by machine learning that can monitor your AWS environment with just a few clicks. GuardDuty can identify threats such as unusual API calls or potentially unauthorized users attempting to access your servers. Many customers also like to visualize their findings in order to generate additional meaningful insights. For example, you might track resources affected by security threats to see how they evolve over time.

In this post, we provide a solution to ingest, process, and visualize your GuardDuty finding logs in a completely serverless fashion. Serverless applications automatically run and scale in response to events you define, rather than requiring you to provision, scale, and manage servers. Our solution covers how to build a pipeline that ingests findings into Amazon Simple Storage Service (Amazon S3), transforms their nested JSON structure into tabular form using Amazon Athena and AWS Glue, and creates visualizations using Amazon QuickSight. We aim to provide both an easy-to-implement and cost-effective solution for consuming and analyzing your GuardDuty findings, and to more generally showcase a repeatable example for processing and visualizing many types of complex JSON logs.

Many customers already maintain centralized logging solutions using Amazon Elasticsearch Service (Amazon ES). If you want to incorporate GuardDuty findings with an existing solution, we recommend referencing this blog post to get started. If you don’t have an existing solution or previous experience with Amazon ES, if you prefer to use serverless technologies, or if you’re familiar with more traditional business intelligence tools, read on!

Before you get started

To follow along with this post, you’ll need to enable GuardDuty in order to start generating findings. See Setting Up Amazon GuardDuty for details if you haven’t already done so. Once enabled, GuardDuty will automatically generate findings as events occur. If you have public-facing compute resources in the same region in which you’ve enabled GuardDuty, you may soon find that they are being scanned quite often. All the more reason to continue reading!

You’ll also need Amazon QuickSight enabled in your account for the visualization sections of this post. You can find instructions in Setting Up Amazon QuickSight.

Architecture from end to end

 

Figure 1:  Complete architecture from findings to visualization

Figure 1: Complete architecture from findings to visualization

Figure 1 highlights the solution architecture, from finding generation all the way through final visualization. The steps are as follows:

  1. Deliver GuardDuty findings to Amazon CloudWatch Events
  2. Push GuardDuty Events to S3 using Amazon Kinesis Data Firehose
  3. Use AWS Lambda to reorganize S3 folder structure
  4. Catalog your GuardDuty findings using AWS Glue
  5. Configure Views with Amazon Athena
  6. Build a GuardDuty findings dashboard in Amazon QuickSight

Below, we’ve included an AWS CloudFormation template to launch a complete ingest pipeline (Steps 1-4) so that we can focus this post on the steps dedicated to building the actual visualizations (Steps 5-6). We cover steps 1-4 briefly in the next section to provide context, and we provide links to the pertinent pages in the documentation for those of you interested in building your own pipeline.
 
Select this image to open a link that starts building the CloudFormation stack

Ingest (Steps 1-4): Get Amazon GuardDuty findings into Amazon S3 and AWS Glue Data Catalog

 

Figure 2: In this section, we'll cover the services highlighted in blue

Figure 2: In this section, we’ll cover the services highlighted in blue

Step 1: Deliver GuardDuty findings to Amazon CloudWatch Events

GuardDuty has integration with and can deliver findings to Amazon CloudWatch Events. To perform this manually, follow the instructions in Creating a CloudWatch Events Rule and Target for GuardDuty.

Step 2: Push GuardDuty events to Amazon S3 using Kinesis Data Firehose

Amazon CloudWatch Events can write to an Kinesis Data Firehose delivery stream to store your GuardDuty events in S3, where you can use AWS Lambda, AWS Glue, and Amazon Athena to build the queries you’ll need to visualize the data. You can create your own delivery stream by following the instructions in Creating a Kinesis Data Firehose Delivery Stream and then adding it as a target for CloudWatch Events.

Step 3: Use AWS Lambda to reorganize Amazon S3 folder structure

Kinesis Data Firehose will automatically create a datetime-based file hierarchy to organize the findings as they come in. Due to the variability of the GuardDuty finding types, we recommend reorganizing the file hierarchy with a folder for each finding type, with separate datetime subfolders for each. This will make it easier to target findings that you want to focus on in your visualization. The provided AWS CloudFormation template utilizes an AWS Lambda function to rewrite the files in a new hierarchy as new files are written to S3. You can use the code provided in it along with Using AWS Lambda with S3 to trigger your own function that reorganizes the data. Once the Lambda function has run, the S3 bucket structure should look similar to the structure we show in figure 3.
 

Figure 3: Sample S3 bucket structure

Figure 3: Sample S3 bucket structure

Step 4: Catalog the GuardDuty findings using AWS Glue

With the reorganized findings stored in S3, use an AWS Glue crawler to scan and catalog each finding type. The CloudFormation template we provided schedules the crawler to run once a day. You can also run it on demand as needed. To build your own crawler, refer to Cataloging Tables with a Crawler. Assuming GuardDuty has generated findings in your account, you can navigate to the GuardDuty findings database in the AWS Glue Data Catalog. It should look something like figure 4:
 

Figure 4: List of finding type tables in the AWS Glue Catalog

Figure 4: List of finding type tables in the AWS Glue Catalog

Note: Because AWS Glue crawlers will attempt to combine similar data into one table, you might need to generate sample findings to ensure enough variability for each finding type to have its own table. If you only intend to build your dashboard from a small subset of finding types, you can opt to just edit the crawler to have multiple data sources and specify the folder path for each desired finding type.

Explore the table structure

Before moving on to the next step, take some time to explore the schema structure of the tables. Selecting one of the tables will bring you to a page that looks like what’s shown in figure 5.
 

Figure 5: Schema information for a single finding table

Figure 5: Schema information for a single finding table

You should see that most of the columns contain basic information about each finding, but there’s a column named detail that is of type struct. Select it to expand, as shown in figure 6.
 

Figure 6: The "detail" column expanded

Figure 6: The “detail” column expanded

Ah, this is where the interesting information is tucked away! The tables for each finding may differ slightly, but in all cases the detail column will hold the bulk of the information you’ll want to visualize. See GuardDuty Active Finding Types for information on what you should expect to find in the logs for each finding type. In the next step, we’ll focus on unpacking detail to prepare it for visualization!

Process (Step 5): Unpack nested JSON and configure views with Amazon Athena

 

Figure 7: In this section, we'll cover the services highlighted in blue

Figure 7: In this section, we’ll cover the services highlighted in blue

Note: This step picks up where the CloudFormation template finishes

Explore the table structure (again) in the Amazon Athena console

Begin by navigating to Athena from the AWS Management Console. Once there, you should see a drop-down menu with a list of databases. These are the same databases that are available in the AWS Glue Data Catalog. Choose the database with your GuardDuty findings and expand a table.
 

Figure 8: Expanded table in the Athena console

Figure 8: Expanded table in the Athena console

This should look very familiar to the table information you explored in step 4, including the detail struct!

You’ll need a method to unpack the struct in order to effectively visualize the data. There are many methods and tools to approach this problem. One that we recommend (and will show) is to use SQL queries within Athena to construct tabular views. This approach will allow you to push the bulk of the processing work to Athena. It will also allow you to simplify building visualizations when using Amazon QuickSight by providing a more conventional tabular format.

Extract details for use in visualization using SQL

The following examples contain SQL statements that will provide everything necessary to extract the necessary fields from the detail struct of the Recon:EC2/PortProbeUnprotectedPort finding to build the Amazon QuickSight dashboard we showcase in the next section. The examples also cover most of the operations you’ll need to work with the elements found in GuardDuty findings (such as deeply nested data with lists), and they serve as a good starting point for constructing your own custom queries. In general, you’ll want to traverse the nested layers (i.e. root.detail.service.count) and create new records for each item in an embedded list that you want to target using the UNNEST function. See this blog for even more examples of constructing queries on complex JSON data using Amazon Athena.

Simply copy the SQL statements that you want into the Athena query field to build the port_probe_geo and affected_instances views.

Note: If your account has yet to generate Recon:EC2/PortProbeUnprotectedPort findings, you can generate sample findings to follow along.


CREATE OR REPLACE VIEW "port_probe_geo" AS

WITH getportdetails AS (
    SELECT id, portdetails
    FROM by_finding_type
    CROSS JOIN UNNEST(detail.service.action.portProbeAction.portProbeDetails) WITH ORDINALITY AS p (portdetails, portdetailsindex)
)

SELECT 
    root.id AS id,
    root.region AS region,
    root.time AS time,
    root.detail.type AS type,
    root.detail.service.count AS count, 
    portdetails.localportdetails.port AS localport, 
    portdetails.localportdetails.portname AS localportname, 
    portdetails.remoteipdetails.geolocation.lon AS longitude, 
    portdetails.remoteipdetails.geolocation.lat AS latitude, 
    portdetails.remoteipdetails.country.countryname AS country, 
    portdetails.remoteipdetails.city.cityname AS city 

FROM 
    by_finding_type  as root, getPortDetails
    
WHERE 
    root.id = getportdetails.id

CREATE OR REPLACE VIEW "affected_instances" AS

SELECT 
    max(root.detail.service.count) AS count,
    date_parse(root.time,'%Y-%m-%dT%H:%i:%sZ') as time,
    root.detail.resource.instancedetails.instanceid

FROM 
    recon_ec2_portprobeunprotectedport  AS root

GROUP BY  
    root.detail.resource.instancedetails.instanceid, 
    time

Visualize (Step 6): Build a GuardDuty findings dashboard in Amazon QuickSight

 

Figure 9: In this section we will cover the services highlighted in blue

Figure 9: In this section we will cover the services highlighted in blue

Now that you’ve created tabular views using Athena, you can jump into Amazon QuickSight from the AWS Management Console and begin visualizing! If you haven’t already done so, enable Amazon QuickSight in your account by following the instructions for Setting Up Amazon QuickSight.

For this example, we’ll leverage the geo_port_probe view to build a geographic visualization and see the locations from which nefarious actors are launching port probes.

Creating an analysis

In the upper left-hand corner of the Amazon QuickSight console select New analysis and then New data set.
 

Figure 10: Create a new analysis

Figure 10: Create a new analysis

To utilize the views you built in the previous step, select Athena as the data source. Give your data source a name (in our example, we use “port probe geo”), and select the database that contains the views you created in the previous section. Then select Visualize.
 

Figure 11: Available data sources in Amazon QuickSight. Be sure to choose Athena!

Figure 11: Available data sources in Amazon QuickSight. Be sure to choose Athena!

 

Figure 12: Select the "port prob geo view" you created in step 5

Figure 12: Select the “port prob geo view” you created in step 5

Viz time!

From the Visual types menu in the bottom left corner, select the globe icon to create a map. Then select the latitude and longitude geospatial coordinates. Choose count (with a max aggregation) for size. Finally, select localportname to break the data down by color.
 

Figure 13: A visual containing a map of port probe scans in Amazon QuickSight

Figure 13: A visual containing a map of port probe scans in Amazon QuickSight

Voila! A detailed map of your environment’s attackers!

Build out a dashboard

Once you like how everything looks, you can move on to adding more visuals to create a full monitoring dashboard.

To add another visual to the analysis, select Add and then Add visual.
 

Figure 14: Add another visual using the 'Add' option from the Amazon QuickSight menu bar

Figure 14: Add another visual using the ‘Add’ option from the Amazon QuickSight menu bar

If the new visual will use the same dataset, then you can immediately start selecting fields to build it. If you want to create a visual from a different data set (our example dashboard below adds the affected_instances view), follow the Creating Data Sets guide to add a new data set. Then return to the current analysis and associate the data set with the analysis by selecting the pencil icon shown below and selecting Add data set.
 

Figure 15: Adding a new data set to your Amazon QuickSight analysis

Figure 15: Adding a new data set to your Amazon QuickSight analysis

Repeat this process until you’ve built out everything you need in your monitoring dashboard. Once it’s completed, you can publish the dashboard by selecting Share and then Publish dashboard.
 

Figure 16: Publish your dashboard using the "Share" option of the Amazon QuickSight menu

Figure 16: Publish your dashboard using the “Share” option of the Amazon QuickSight menu

Here’s an example of a dashboard we created using the port_probe_geo and affected_instances views:
 

Figure 17: An example dashboard created using the "port_probe_geo" and "affected_instances" views

Figure 17: An example dashboard created using the “port_probe_geo” and “affected_instances” views

What does something like this cost?

To get an idea of the scale of the cost, we’ve provided a small pricing example (accurate as of the writing of this blog) that assumes 10,000 GuardDuty findings per month with an average payload size of 5KB.

ServicePricing StructureAmount ConsumedTotal Cost
Amazon CloudWatch Events$1 per million events/td>

10000 events $0.01
Amazon Kinesis Data Firehose$0.029 per GB ingested0.05GB ingested $0.00145
Amazon S3$0.029 per GB stored per month0.1GB stored $0.00230
AWS LambdaFirst million invocations free~200 invocations $0
Amazon Athena$5 per TB Scanned0.003TB scanned (Assume 2 full data scans per day to refresh views) $0.015
AWS Glue$0.44 per DPU hour (2 DPU minimum and 10 minute minimum) = $0.15 per crawler run30 crawler runs $4.50
Total Processing Cost$4.53

Oh, the joys of a consumption-based model: Less than five dollars per month for all of that processing!

From here, all that remains are your visualization costs using Amazon QuickSight. This pricing is highly dependent upon your number of users and their respective usage patterns. See the Amazon QuickSight pricing page for more specific details.

Summary

In this post, we demonstrated how you can ingest your GuardDuty findings into S3, process them with AWS Glue and Amazon Athena, and visualize with Amazon QuickSight. All serverless! Each portion of what we showed can be used in tandem or on its own for this or many other data sets. Go launch the template and get started monitoring your AWS environment!

Want more AWS Security how-to content, news, and feature announcements? Follow us on Twitter.

Author

Ben Romano

Ben is a Solutions Architect in AWS supporting customers in their journey to the cloud with a focus on big data solutions. Ben loves to delight customers by diving deep on AWS technologies and helping them achieve their business and technology objectives.

Author

Jimmy Boyle

Jimmy is a Solutions Architect in AWS with a background in software development. He enjoys working with all things serverless because he doesn’t have to maintain infrastructure. Jimmy enjoys delighting customers to drive their business forward and design solutions that will scale as their business grows.

Create real-time clickstream sessions and run analytics with Amazon Kinesis Data Analytics, AWS Glue, and Amazon Athena

Post Syndicated from Hugo Rozestraten original https://aws.amazon.com/blogs/big-data/create-real-time-clickstream-sessions-and-run-analytics-with-amazon-kinesis-data-analytics-aws-glue-and-amazon-athena/

Clickstream events are small pieces of data that are generated continuously with high speed and volume. Often, clickstream events are generated by user actions, and it is useful to analyze them.

For example, you can detect user behavior in a website or application by analyzing the sequence of clicks a user makes, the amount of time the user spends, where they usually begin the navigation, and how it ends. By tracking this user behavior in real time, you can update recommendations, perform advanced A/B testing, push notifications based on session length, and much more. To track and analyze these events, you need to identify and create sessions from them. The process of identifying events in the data and creating sessions is known as sessionization.

Capturing and processing data clickstream events in real time can be difficult. As the number of users and web and mobile assets you have increases, so does the volume of data. Amazon Kinesis provides you with the capabilities necessary to ingest this data in real time and generate useful statistics immediately so that you can take action.

When you run sessionization on clickstream data, you identify events and assign them to a session with a specified key and lag period. After each event has a key, you can perform analytics on them. The use cases for sessionization vary widely, and have different requirements. For example, you might need to identify and create sessions from events in web analytics to track user actions. Sessionization is also broadly used across many different areas, such as log data and IoT.

This blog post demonstrates how to identify and create sessions from real-time clickstream events and then analyze them using Amazon Kinesis Data Analytics.

Why did we choose Kinesis Data Analytics?

Clickstream data arrives continuously as thousands of messages per second receiving new events. When you analyze the effectiveness of new application features, site layout, or marketing campaigns, it is important to analyze them in real time so that you can take action faster.

To perform the sessionization in batch jobs, you could use a tool such as AWS Glue or Amazon EMR. But with daily schedules, queries and aggregation, it can take more resources and time because each aggregation involves working with large amounts of data. Performing sessionization in Kinesis Data Analytics takes less time and gives you a lower latency between the sessions generation. You can trigger real-time alerts with AWS Lambda functions based on conditions, such as session time that is shorter than 20 seconds, or a machine learning endpoint.

Identifying a session among thousands of clicks

A session is a short-lived and interactive exchange between two or more devices and/or users. For example, it can be a user browsing and then exiting your website, or an IoT device waking up to perform a job and then going back to sleep. These interactions result in a series of events that occur in sequence that start and end, or a session. A start and an end of a session can be difficult to determine, and are often defined by a time period without a relevant event associated with a user or device. A session starts when a new event arrives after a specified “lag” time period has passed without an event arriving. A session ends in a similar manner, when a new event does not arrive within the specified lag period.

This blog post relies on several other posts about performing batch analytics on SQL data with sessions. My two favorite posts on this subject are Sessionization in SQL, Hive, Pig and Python from Dataiku and Finding User Session with SQL by Benn Stancil at Mode. Both posts take advantage of SQL window functions to identify and build sessions from clickstream events.

ANSI added SQL window functions to the SQL standard in 2003 and has since expanded them. Window functions work naturally with streaming data and enable you to easily translate batch SQL examples to Kinesis Data Analytics.

In this use case, I group the events of a specific user as described in the following simplified example. In this example, I use distinct navigation patterns from three users to analyze user behavior. To begin, I group events by user ID to obtain some statistics from data, as shown following:

In this example, for “User ID 20,” the minimum timestamp is 2018-11-29 23:35:10 and the maximum timestamp is 2018-11-29 23:35:44. This provides a 34 seconds-long session, starting with action “B_10” and ending with action “A_02.” These “actions” are identification of the application’s buttons in this example.

Suppose that after several minutes, new “User ID 20” actions arrive. Would you consider them as running in the same session? A user can abort a navigation or start a new one. Also, applications often have timeouts. You have to decide what is the maximum session length to consider it a new session. A session can run anywhere from 20 to 50 seconds, or from 1 to 5 minutes.

There are other elements that you might want to consider, such as a client IP or a machine ID. These elements allow you to separate sessions that occur on different devices.

High-level solution overview

The end-to-end scenario described in this post uses Amazon Kinesis Data Streams to capture the clickstream data and Kinesis Data Analytics to build and analyze the sessions. The aggregated analytics are used to trigger real-time events on Lambda and then send them to Kinesis Data Firehose. Kinesis Data Firehose sends data to an Amazon S3 bucket, where it is ingested to a table by an AWS Glue crawler and made available for running queries with Amazon Athena. You can use this table for ad hoc analysis.

The following diagram shows an end-to-end sessionization solution.

  • Data ingestion: You can use Kinesis Data Streams to build custom applications that process or analyze streaming data for specialized needs. Kinesis Data Streams can continuously capture and store terabytes of data per hour from hundreds of thousands of sources, such as website clickstreams, financial transactions, social media feeds, IT logs, and location-tracking events.
  • Data sessionization: Kinesis Data Analytics is the easiest way to process streaming data in real time with standard SQL without having to learn new programming languages or processing frameworks. With Kinesis Data Analytics, you can query streaming data or build entire streaming applications using SQL, so that you can gain actionable insights and respond to your business and customer needs promptly.
  • Data processing and storage: The sessionization stream is read from Kinesis Data Analytics using an AWS Lambda function. The function triggers two events: one real-time dashboard in Amazon CloudWatch and a second one to persist data with Kinesis Data Firehose.
  • Data analysis: AWS Glue is used to crawl Amazon S3 and build or update metadata definition for Amazon Athena tables.

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena provides connectivity to any application using JDBC or ODBC drivers.

  • Data visualization: Amazon QuickSight is a visualization tool that is natively used to build dashboards over Amazon Athena data.
  • Monitoring: Amazon CloudWatch is a tool that lets you monitor the streaming activities, such as the number of bytes processed or delivered per second, or the number of failures.

After you finish the sessionization stage in Kinesis Data Analytics, you can output data into different tools. For example, you can use a Lambda function to process the data on the fly and take actions such as send SMS alerts or roll back a deployment. To learn how to implement such workflows based on AWS Lambda output, see my other blog post Implement Log Analytics using Amazon Kinesis Data Analytics. In this post, we send data to Amazon CloudWatch, and build a real-time dashboard.

Lambda clickstream generator

To generate the workload, you can use a Python Lambda function with random values, simulating a beer-selling application.

The same user ID can have sessions on different devices, such as a tablet, a browser, or a phone application. This information is captured by the device ID. As a result, the data for the Lambda function payload has these parameters: a user ID, a device ID, a client event, and a client timestamp, as shown in the following example.

The following is the code for the Lambda function payload generator, which is scheduled using CloudWatch Events scheduled events:

...
def getReferrer():
    x = random.randint(1,5)
    x = x*50 
    y = x+30 
    data = {}
    data['user_id'] = random.randint(x,y)
    data['device_id'] = random.choice(['mobile','computer', 'tablet', 'mobile','computer'])
    data['client_event'] = random.choice(['beer_vitrine_nav','beer_checkout','beer_product_detail',
    'beer_products','beer_selection','beer_cart'])
    now = datetime.datetime.now()
    str_now = now.isoformat()
    data['client_timestamp'] = str_now
    return data

def lambda_handler(event, context):
...
        data = json.dumps(getReferrer())
        kinesis.put_record(
                StreamName='sessionsclicks',
                Data=data,
                PartitionKey='partitionkey')

As a result, the following payloads are sent to Kinesis Data Analytics:

Using window SQL functions in Kinesis Data Analytics

Grouping sessions lets us combine all the events from a given user ID or a device ID that occurred during a specific time period. Amazon Kinesis Data Analytics SQL queries in your application code execute continuously over in-application streams. You need to specify bounded queries using a window defined in terms of time or rows. These queries are called window SQL functions.

I had three available options for windowed query functions in Kinesis Data Analytics: sliding windows, tumbling windows, and stagger windows. I chose stagger window because it has some good features for the sessionization use case, as follows:

  • Stagger windows open when the first event that matches a partition key condition arrives. So for each key, it evaluates its particular window as opposed to the other window functions that evaluate one unique window for all the partition keys matched.
  • When dealing with clickstreams, you cannot rely on the order that events arrive in the stream, but when the stream was generated. Stagger windows handle the arrival of out-of-order events well. The time when the window is opened and when the window closes is considered based on the age specified, which is measured from the time when the window opened.

To partition by the timestamp, I chose to write two distinct SQL functions.

In Kinesis Data Analytics, SOURCE_SQL_STREAM_001 is by default the main stream from the source. In this case, it’s receiving the source payload from Kinesis Data Streams.

Kinesis Data Analytics SQL – Create a stream

The following function creates a stream to receive the query aggregation result:

-- CREATE a Stream to receive the query aggregation result
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(
  session_id VARCHAR(60),
  user_id INTEGER,
  device_id VARCHAR(10),
  timeagg timestamp,
  events INTEGER,
  beginnavigation VARCHAR(32),
  endnavigation VARCHAR(32),
  beginsession VARCHAR(25),
  endsession VARCHAR(25),
  duration_sec INTEGER
);

Kinesis Data Analytics SQL – Using a SECOND interval “STEP” function

The following function creates the PUMP and inserts it as SELECT to STREAM:

-- Create the PUMP
CREATE OR REPLACE PUMP "WINDOW_PUMP_SEC" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Insert as Select 
    SELECT  STREAM
-- Make the Session ID using user_ID+device_ID and Timestamp
    UPPER(cast("user_id" as VARCHAR(3))|| '_' ||SUBSTRING("device_id",1,3)
    ||cast( UNIX_TIMESTAMP(STEP("client_timestamp" by interval '30' second))/1000 as VARCHAR(20))) as session_id,
    "user_id" , "device_id",
-- create a common rounded STEP timestamp for this session
    STEP("client_timestamp" by interval '30' second),
-- Count the number of client events , clicks on this session
    COUNT("client_event") events,
-- What was the first navigation action
    first_value("client_event") as beginnavigation,
-- what was the last navigation action    
    last_value("client_event") as endnavigation,
-- begining minute and second  
    SUBSTRING(cast(min("client_timestamp") AS VARCHAR(25)),15,19) as beginsession,
-- ending minute and second      
    SUBSTRING(cast(max("client_timestamp") AS VARCHAR(25)),15,19) as endsession,
-- session duration    
    TSDIFF(max("client_timestamp"),min("client_timestamp"))/1000 as duration_sec
-- from the source stream    
    FROM "SOURCE_SQL_STREAM_001"
-- using stagger window , with STEP to Seconds, for Seconds intervals    
    WINDOWED BY STAGGER (
                PARTITION BY "user_id", "device_id", STEP("client_timestamp" by interval '30' second) 
                RANGE INTERVAL '30' SECOND );

Kinesis Data Analytics SQL – Using a MINUTE interval “FLOOR” function

The following code creates the PUMP and inserts as SELECT to STREAM:

-- Create the PUMP
CREATE OR REPLACE PUMP "WINDOW_PUMP_MIN" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Insert as Select 
SELECT  STREAM
-- Make the Session ID using user_ID+device_ID and Timestamp
UPPER(cast("user_id" as VARCHAR(3))|| '_' ||SUBSTRING("device_id",1,3)
||cast(UNIX_TIMESTAMP(FLOOR("client_timestamp" TO MINUTE))/1000 as VARCHAR(20))) as session_id,
"user_id" , "device_id",
-- create a common rounded timestamp for this session
FLOOR("client_timestamp" TO MINUTE),
-- Count the number of client events , clicks on this session
COUNT("client_event") events,
-- What was the first navigation action
first_value("client_event") as beginnavigation,
-- what was the last navigation action
last_value("client_event") as endnavigation,
-- begining minute and second
SUBSTRING(cast(min("client_timestamp") AS VARCHAR(25)),15,19) as beginsession,
-- ending minute and second
SUBSTRING(cast(max("client_timestamp") AS VARCHAR(25)),15,19) as endsession,
-- session duration
TSDIFF(max("client_timestamp"),min("client_timestamp"))/1000 as duration_sec
-- from the source stream
FROM "SOURCE_SQL_STREAM_001"
-- using stagger window , with floor to Minute, for Minute intervals
WINDOWED BY STAGGER (
            PARTITION BY "user_id", "device_id", FLOOR("client_timestamp" TO MINUTE) 
            RANGE INTERVAL '1' MINUTE);

Sessions

In Kinesis Data Analytics, you can view the resulting data transformed by the SQL, with the sessions identification and information. Session_ID is calculated by User_ID + (3 Chars) of DEVICE_ID + rounded Unix timestamp without the milliseconds.

Automated deployment with AWS CloudFormation

All the steps of this end-to-end solution are included in an AWS CloudFormation template. Fire up the template, add the code on your web server, and voilà, you get real-time sessionization.

This AWS CloudFormation template is intended to be deployed only in the us-east-1 Region.

Create the stack

Step 1: To get started, sign into the AWS Management Console, and then open the stagger window template.

Step 2: On the AWS CloudFormation console, choose Next, and complete the AWS CloudFormation parameters:

  • Stack name: The name of the stack (blog-sessionization or sessions-blog)
  • StreamName: sessionsblog
  • Stream Shard Count: 1 or 2 (1 MB/s) per shard.
  • Bucket Name:  Change to a unique name, for example session-n-bucket-hhug123121.
  • Buffer Interval: 60–900 seconds buffering hint for Kinesis Data Firehose before the data is send to Amazon S3 from Kinesis Data Firehose.
  • Buffer Size: 1–128 MB per file, if the interval is not achieved first.
  • Destination Prefix: Aggregated (internal folder of the bucket to save aggregated data).
  • Base sessions on seconds or minutes: Choose which you want (minutes will start with 1 minute, seconds will start with 30 seconds).

Step 3: Check if the launch has completed, and if it has not, check for errors.

The most common error is when you point to an Amazon S3 bucket that already exists.

Process the data

Step 1: After the deployment, navigate to the solution on the Amazon Kinesis console.

Step 2: Go to the Kinesis Analytics applications page, and choose AnalyticsApp-blog-sessionizationXXXXX, as follows.

Step 3: Choose Run application to start the application.

Step 4: Wait a few seconds for the application to be available, and then choose Application details.

Step 5: On the Application details page, choose Go to SQL results.

Step 6: Examine the SQL code and SOURCE_SQL_STREAM, and change the INTERVAL if you’d like.

Step 7: Choose the Real-time analytics tab to check the DESTINATION_SQL_STREAM results.

 

Step 8: Check the Destination tab to view the AWS Lambda function as the destination to your aggregation.

Step 8: Check the CloudWatch real-time dashboard.

Open the Sessionization-<your cloudformation stack name> dashboard.

Check the number of “events” during the sessions, and the “session duration” behavior from a timeframe. Then you can make decisions, such as whether you need to roll back a new site layout or new features of your application.

Step 9: Open the AWS Glue console and run the crawler that the AWS CloudFormation template created for you.

Choose the crawler job, and then choose Run crawler.

Analyze the data

Step 1: After the job finishes, open the Amazon Athena console and explore the data.

On the Athena console, choose the sessionization database in the list. You should see two tables created based on the data in Amazon S3: rawdata and aggregated.

Step 2: Choose the vertical ellipsis (three dots) on the right side to explore each of the tables, as shown in the following screenshots.

Step 3: Create a view on the Athena console to query only today’s data from your aggregated table, as follows:

CREATE OR REPLACE VIEW clicks_today AS
SELECT 
*
FROM "aggregated" 
WHERE
cast(partition_0 as integer)=year(current_date) and
cast(partition_1 as integer)=month(current_date) and
cast(partition_2 as integer)=day(current_date) ;

The successful query appears on the console as follows:

Step 4: Create a view to query only the current month data from your aggregated table, as in the following example:

CREATE OR REPLACE VIEW clicks_month AS
SELECT 
*
FROM "aggregated" 
WHERE
cast(partition_0 as integer)=year(current_date) and
cast(partition_1 as integer)=month(current_date) ;

The successful query appears as follows:

Step 5: Query data with the sessions grouped by the session duration ordered by sessions, as follows:

SELECT duration_sec, count(1) sessions 
FROM "clicks_today"
where duration_sec>0
group by duration_sec
order by sessions desc;

The query results appear as follows:

Visualize the data

Step 1: Open the Amazon QuickSight console.

If you have never used Amazon QuickSight, perform this setup first.

Step 2: Set up Amazon QuickSight account settings to access Athena and your S3 bucket.

First, select the Amazon Athena check box. Select the Amazon S3 check box to edit Amazon QuickSight access to your S3 buckets.

Choose the buckets that you want to make available, and then choose Select buckets.

Step 3: Choose Manage data.

Step 4: Choose NEW DATASET.

In the list of data sources, choose Athena.

Step 5: Enter daily_session as your data source name.

Step 6: Choose the view that you created for daily sessions, and choose Select.

Step 7: Then you can choose to use either SPICE (cache) or direct query access.

Step 8: Choose beginnavigation and duration_sec as metrics.

Step 9: Choose +Add to add a new visualization.

Step 10: In Visual types, choose the Tree map graph type.

Step 11: For Group by, choose device_id; for Size, choose duration_sec (Sum); and for Color, choose events (Sum).

Summary

In this post, I described how to perform sessionization of clickstream events and analyze them in a serverless architecture. The use of a Kinesis Data Analytics stagger window makes the SQL code short and easy to write and understand. The integration between the services enables a complete data flow with minimal coding.

You also learned about ways to explore and visualize this data using Amazon Athena, AWS Glue, and Amazon QuickSight.

To learn more about the Amazon Kinesis family of use cases, check the Amazon Kinesis Big Data Blog page.

If you have questions or suggestions, please leave a comment below.

Do more with Amazon Kinesis Data Analytics

To explore other ways to gain insights using Kinesis Data Analytics, see Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.

 


About the Author

Hugo is an analytics and database specialist solutions architect at Amazon Web Services out of São Paulo (Brazil). He is currently engaged with several Data Lake and Analytics projects for customers in Latin America. He loves family time, dogs and mountain biking.

 

 

 

 

Our data lake story: How Woot.com built a serverless data lake on AWS

Post Syndicated from Karthik Kumar Odapally original https://aws.amazon.com/blogs/big-data/our-data-lake-story-how-woot-com-built-a-serverless-data-lake-on-aws/

In this post, we talk about designing a cloud-native data warehouse as a replacement for our legacy data warehouse built on a relational database.

At the beginning of the design process, the simplest solution appeared to be a straightforward lift-and-shift migration from one relational database to another. However, we decided to step back and focus first on what we really needed out of a data warehouse. We started looking at how we could decouple our legacy Oracle database into smaller microservices, using the right tool for the right job. Our process wasn’t just about using the AWS tools. More, it was about having a mind shift to use cloud-native technologies to get us to our final state.

This migration required developing new extract, transform, load (ETL) pipelines to get new data flowing in while also migrating existing data. Because of this migration, we were able to deprecate multiple servers and move to a fully serverless data warehouse orchestrated by AWS Glue.

In this blog post, we are going to show you:

  • Why we chose a serverless data lake for our data warehouse.
  • An architectural diagram of Woot’s systems.
  • An overview of the migration project.
  • Our migration results.

Architectural and design concerns

Here are some of the design points that we considered:

  • Customer experience. We always start with what our customer needs, and then work backwards from there. Our data warehouse is used across the business by people with varying level of technical expertise. We focused on the ability for different types of users to gain insights into their operations and to provide better feedback mechanisms to improve the overall customer experience.
  • Minimal infrastructure maintenance. The “Woot data warehouse team” is really just one person—Chaya! Because of this, it’s important for us to focus on AWS services that enable us to use cloud-native technologies. These remove the undifferentiated heavy lifting of managing infrastructure as demand changes and technologies evolve.
  • Responsiveness to data source changes. Our data warehouse gets data from a range of internal services. In our existing data warehouse, any updates to those services required manual updates to ETL jobs and tables. The response times for these data sources are critical to our key stakeholders. This requires us to take a data-driven approach to selecting a high-performance architecture.
  • Separation from production systems. Access to our production systems is tightly coupled. To allow multiple users, we needed to decouple it from our production systems and minimize the complexities of navigating resources in multiple VPCs.

Based on these requirements, we decided to change the data warehouse both operationally and architecturally. From an operational standpoint, we designed a new shared responsibility model for data ingestion. Architecturally, we chose a serverless model over a traditional relational database. These two decisions ended up driving every design and implementation decision that we made in our migration.

As we moved to a shared responsibility model, several important points came up. First, our new way of data ingestion was a major cultural shift for Woot’s technical organization. In the past, data ingestion had been exclusively the responsibility of the data warehouse team and required customized pipelines to pull data from services. We decided to shift to “push, not pull”: Services should send data to the data warehouse.

This is where shared responsibility came in. For the first time, our development teams had ownership over their services’ data in the data warehouse. However, we didn’t want our developers to have to become mini data engineers. Instead, we had to give them an easy way to push data that fit with the existing skill set of a developer. The data also needed to be accessible by the range of technologies used by our website.

These considerations led us to select the following AWS services for our serverless data warehouse:

The following diagram shows at a high level how we use these services.

Tradeoffs

These components together met all of our requirements and enabled our shared responsibility model. However, we made few tradeoffs compared to a lift-and-shift migration to another relational database:

  • The biggest tradeoff was upfront effort vs. ongoing maintenance. We effectively had to start from scratch with all of our data pipelines and introduce a new technology into all of our website services, which required a concerted effort across multiple teams. Minimal ongoing maintenance was a core requirement. We were willing to make this tradeoff to take advantage of the managed infrastructure of the serverless components that we use.
  • Another tradeoff was balancing usability for nontechnical users vs. taking advantage of big data technologies. Making customer experience a core requirement helped us navigate the decision-making when considering these tradeoffs. Ultimately, only switching to another relational database would mean that our customers would have the same experience, not a better one.

Building data pipelines with Kinesis Data Firehose and Lambda

Because our site already runs on AWS, using an AWS SDK to send data to Kinesis Data Firehose was an easy sell to developers. Things like the following were considerations:

  • Direct PUT ingestion for Kinesis Data Firehose is natural for developers to implement, works in all languages used across our services, and delivers data to Amazon S3.
  • Using S3 for data storage means that we automatically get high availability, scalability, and durability. And because S3 is a global resource, it enables us to manage the data warehouse in a separate AWS account and avoid the complexity of navigating multiple VPCs.

We also consume data stored in Amazon DynamoDB tables. Kinesis Data Firehose again provided the core of the solution, this time combined with DynamoDB Streams and Lambda. For each DynamoDB table, we enabled DynamoDB Streams and then used the stream to trigger a Lambda function.

The Lambda function cleans the DynamoDB stream output and writes the cleaned JSON to Kinesis Data Firehose using boto3. After doing this, it converges with the other process and outputs the data to S3. For more information, see How to Stream Data from Amazon DynamoDB to Amazon Aurora using AWS Lambda and Amazon Kinesis Firehose on the AWS Database Blog.

Lambda gave us more fine-grained control and enabled us to move files between accounts:

  • We enabled S3 event notifications on the S3 bucket and created an Amazon SNS topic to receive notifications whenever Kinesis Data Firehose put an object in the bucket.
  • The SNS topic triggered a Lambda function, which took the Kinesis output and moved it to the data warehouse account in our chosen partition structure.

S3 event notifications can trigger Lambda functions, but we chose SNS as an intermediary because the S3 bucket and Lambda function were in separate accounts.

Migrating existing data with AWS DMS and AWS Glue

We needed to migrate data from our existing RDS database to S3, which we accomplished with AWS DMS. DMS natively supports S3 as a target, as described in the DMS documentation.

Setting this up was relatively straightforward. We exported data directly from our production VPC to the separate data warehouse account by tweaking the connection attributes in DMS. The string that we used was this:

"cannedAclForObjects=BUCKET_OWNER_FULL_CONTROL;compressionType=GZIP;addColumnName=true;”

This code gives ownership to the bucket owner (the destination data warehouse account), compresses the files to save on storage costs, and includes all column names. After the data was in S3, we used an AWS Glue crawler to infer the schemas of all exported tables and then compared against the source data.

With AWS Glue, some of the challenges we overcame were these:

  • Unstructured text data, such as forum and blog posts. DMS exports these to CSV. This approach conflicted with the commas present in the text data. We opted to use AWS Glue to export data from RDS to S3 in Parquet format, which is unaffected by commas because it encodes columns directly.
  • Cross-account exports. We resolved this by including the code

"glueContext._jsc.hadoopConfiguration().set("fs.s3.canned.acl", "BucketOwnerFullControl”)”

at the top of each AWS Glue job to grant bucket owner access to all S3 files produced by AWS Glue.

Overall, AWS DMS was quicker to set up and great for exporting large amounts of data with rule-based transformations. AWS Glue required more upfront effort to set up jobs, but provided better results for cases where we needed more control over the output.

If you’re looking to convert existing raw data (CSV or JSON) into Parquet, you can set up an AWS Glue job to do that. The process is described in the AWS Big Data Blog post Build a data lake foundation with AWS Glue and Amazon S3.

Bringing it all together with AWS Glue, Amazon Athena, and Amazon QuickSight

After data landed in S3, it was time for the real fun to start: actually working with the data! Can you tell I’m a data engineer? For me, a big part of the fun was exploring AWS Glue:

  • AWS Glue handles our ETL job scheduling.
  • AWS Glue crawlers manage the metadata in the AWS Glue Data Catalog.

Crawlers are the “secret sauce” that enables us to be responsive to schema changes. Throughout the pipeline, we chose to make each step as schema-agnostic as possible, which allows any schema changes to flow through until they reach AWS Glue.

However, raw data is not ideal for most of our business users, because it often has duplicates or incorrect data types. Most importantly, the data out of Firehose is in JSON format, but we quickly observed significant query performance gains from using Parquet format. Here, we used one of the performance tips in the Big Data Blog post Top 10 performance tuning tips for Amazon Athena.

With our shared responsibility model, the data warehouse and BI teams are responsible for the final processing of data into curated datasets ready for reporting. Using Lambda and AWS Glue enables these teams to work in Python and SQL (the core languages for Amazon data engineering and BI roles). It also enables them to deploy code with minimal infrastructure setup or maintenance.

Our ETL process is as follows:

  • Scheduled triggers.
  • Series of conditional triggers that control the flow of subsequent jobs that depend on previous jobs.
  • A similar pattern across many jobs of reading in the raw data, deduplicating the data, and then writing to Parquet. We centralized this logic by creating a Python library of functions and uploading it to S3. We then included that library in the AWS Glue job as an additional Python library. For more information on how to do this, see Using Python Libraries with AWS Glue in the AWS Glue documentation.

We also migrated complex jobs used to create reporting tables with business metrics:

  • The AWS Glue use of PySpark simplified the migration of these queries, because you can embed SparkSQL queries directly in the job.
  • Converting to SparkSQL took some trial and error, but ultimately required less work than translating SQL queries into Spark methods. However, for people on our BI team who had previously worked with Pandas or Spark, working with Spark dataframes was a natural transition. As someone who used SQL for several years before learning Python, I appreciate that PySpark lets me quickly switch back and forth between SQL and an object-oriented framework.

Another hidden benefit of using AWS Glue jobs is that the AWS Glue version of Python (like Lambda) already has boto3 installed. Thus, ETL jobs can directly use AWS API operations without additional configuration.

For example, some of our longer-running jobs created read inconsistency if a user happened to query that table while AWS Glue was writing data to S3. We modified the AWS Glue jobs to write to a temporary directory with Spark and then used boto3 to move the files into place. Doing this reduced read inconsistency by up to 90 percent. It was great to have this functionality readily available, which may not have been the case if we managed our own Spark cluster.

Comparing previous state and current state

After we had all the datasets in place, it was time for our customers to come on board and start querying. This is where we really leveled up the customer experience.

Previously, users had to download a SQL client, request a user name and password, set it up, and learn SQL to get data out. Now, users just sign in to the AWS Management Console through automatically provisioned IAM roles and run queries in their browser with Athena. Or if they want to skip SQL altogether, they can use our Amazon QuickSight account with accounts managed through our pre-existing Active Directory server.

Integration with Active Directory was a big win for us. We wanted to enable users to get up and running without having to wait for an account to be created or managing separate credentials. We already use Active Directory across the company for access to multiple resources. Upgrading to Amazon QuickSight Enterprise Edition enabled us to manage access with our existing AD groups and credentials.

Migration results

Our legacy data warehouse was developed over the course of five years. We recreated it as a serverless data lake using AWS Glue in about three months.

In the end, it took more upfront effort than simply migrating to another relational database. We also dealt with more uncertainty because we used many products that were relatively new to us (especially AWS Glue).

However, in the months since the migration was completed, we’ve gotten great feedback from data warehouse users about the new tools. Our users have been amazed by these things:

  • How fast Athena is.
  • How intuitive and beautiful Amazon QuickSight is. They love that no setup is required—it’s easy enough that even our CEO has started using it!
  • That Athena plus the AWS Glue Data Catalog have given us the performance gains of a true big data platform, but for end users it retains the look and feel of a relational database.

Summary

From an operational perspective, the investment has already started to pay off. Literally: Our operating costs have fallen by almost 90 percent.

Personally, I was thrilled that recently I was able to take a three-week vacation and didn’t get paged once, thanks to the serverless infrastructure. And for our BI engineers in addition to myself, the S3-centric architecture is enabling us to experiment with new technologies by integrating seamlessly with other services, such as Amazon EMR, Amazon SageMaker, Amazon Redshift Spectrum, and Lambda. It’s been exciting to see how these services have grown in the time since we’ve adopted them (for example, the recent AWS Glue launch of Amazon CloudWatch metrics and Athena’s launch of views).

We are thrilled that we’ve invested in technologies that continue to grow as we do. We are incredibly proud of our team for accomplishing this ambitious migration. We hope our experience can inspire other engineers to dive in to building a data lake of their own.

For additional information, see these similar AWS Big Data blog posts:


About the authors

Chaya Carey is a data engineer at Woot.com. At Woot, she’s responsible for managing the data warehouse and other scalable data solutions. Outside of work, she’s passionate about Seattle’s bar and restaurant scene, books, and video games.

 

 

 

Karthik Odapally is a senior solutions architect at AWS. His passion is to build cost-effective and highly scalable solutions on the cloud. In his spare time, he bakes cookies and cupcakes for family and friends here in the PNW. He loves vintage racing cars.

 

 

 

 

Building Simpler Genomics Workflows on AWS Step Functions

Post Syndicated from Christie Gifrin original https://aws.amazon.com/blogs/compute/building-simpler-genomics-workflows-on-aws-step-functions/

This post is courtesy of Ryan Ulaszek, AWS Genomics Partner Solutions Architect and Aaron Friedman, AWS Healthcare and Life Sciences Partner Solutions Architect

In 2017, we published a four part blog series on how to build a genomics workflow on AWS. In part 1, we introduced a general architecture highlighting three common layers: job, batch and workflow.  In part 2, we described building the job layer with Docker and Amazon Elastic Container Registry (Amazon ECR).  In part 3, we tackled the batch layer and built a batch engine using AWS Batch.  In part 4, we built out the workflow layer using AWS Step Functions and AWS Lambda.

Since then, we’ve worked with many AWS customers and APN partners to implement this solution in genomics as well as in other workloads-of-interest. Today, we wanted to highlight a new feature in Step Functions that simplifies how customers and partners can build high-throughput genomics workflows on AWS.

Step Functions now supports native integration with AWS Batch, which simplifies how you can create an AWS Batch state that submits an asynchronous job and waits for that job to finish.

Before, you needed to build a state machine building block that submitted a job to AWS Batch, and then polled and checked its execution. Now, you can just submit the job to AWS Batch using the new AWS Batch task type.  Step Functions waits to proceed until the job is completed. This reduces the complexity of your state machine and makes it easier to build a genomics workflow with asynchronous AWS Batch steps.

The new integrations include support for the following API actions:

  • AWS Batch SubmitJob
  • Amazon SNS Publish
  • Amazon SQS SendMessage
  • Amazon ECS RunTask
  • AWS Fargate RunTask
  • Amazon DynamoDB
    • PutItem
    • GetItem
    • UpdateItem
    • DeleteItem
  • Amazon SageMaker
    • CreateTrainingJob
    • CreateTransformJob
  • AWS Glue
    • StartJobRun

You can also pass parameters to the service API.  To use the new integrations, the role that you assume when running a state machine needs to have the appropriate permissions.  For more information, see the AWS Step Functions Developer Guide.

Using a job status poller

In our 2017 post series, we created a job poller “pattern” with two separate Lambda functions. When the job finishes, the state machine proceeds to the next step and operates according to the necessary business logic.  This is a useful pattern to manage asynchronous jobs when a direct integration is unavailable.

The steps in this building block state machine are as follows:

  1. A job is submitted through a Lambda function.
  2. The state machine queries the AWS Batch API for the job status in another Lambda function.
  3. The job status is checked to see if the job has completed.  If the job status equals SUCCESS, the final job status is logged. If the job status equals FAILED, the execution of the state machine ends. In all other cases, wait 30 seconds and go back to Step 2.

Both of the Submit Job and Get Job Lambda functions are available as example Lambda functions in the console.  The job status poller is available in the Step Functions console as a sample project.

Here is the JSON representing this state machine.

{
  "Comment": "A simple example that submits a job to AWS Batch",
  "StartAt": "SubmitJob",
  "States": {
    "SubmitJob": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:<account-id>::function:batchSubmitJob",
      "Next": "GetJobStatus"
    },
    "GetJobStatus": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:<account-id>:function:batchGetJobStatus",
      "Next": "CheckJobStatus",
      "InputPath": "$",
      "ResultPath": "$.status"
    },
    "CheckJobStatus": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.status",
          "StringEquals": "FAILED",
          "End": true
        },
        {
          "Variable": "$.status",
          "StringEquals": "SUCCEEDED",
          "Next": "GetFinalJobStatus"
        }
      ],
      "Default": "Wait30Seconds"
    },
    "Wait30Seconds": {
      "Type": "Wait",
      "Seconds": 30,
      "Next": "GetJobStatus"
    },
    "GetFinalJobStatus": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:us-east-1:<account-id>:function:batchGetJobStatus",
      "End": true
    }
  }
}

With Step Functions Service Integrations

With Step Functions service integrations, it is now simpler to submit and wait for an AWS Batch job, or any other supported service.

The following code block is the JSON representing the new state machine for an asynchronous batch job. If you are familiar with the AWS Batch SubmitJob API action, you may notice that the parameters are consistent with what you would see in that API call. You can also use the optional AWS Batch parameters in addition to JobDefinition, JobName, and JobQueue.

{
 "StartAt": "RunBatchJob",
 "States": {
     "RunIsaacJob":{
     "Type":"Task",
     "Resource":"arn:aws:states:::batch:submitJob.sync",
     "Parameters":{
        "JobDefinition":"Isaac",
        "JobName.$":"$.isaac.JobName",
        "JobQueue":"HighPriority",
        "Parameters.$": "$.isaac"
     },
     "TimeoutSeconds": 900,
     "HeartbeatSeconds": 60,
     "Next":"Parallel",
     "InputPath":"$",
     "ResultPath":"$.status",
     "Retry" : [
        {
          "ErrorEquals": [ "States.Timeout" ],
          "IntervalSeconds": 3,
          "MaxAttempts": 2,
          "BackoffRate": 1.5
        }
     ]
  }
}

Here is an example of the workflow input JSON.  Pass all of the container parameters that were being constructed in the submit job Lambda function.

{
  "isaac": {
    "WorkingDir": "/scratch",
    "JobName": "isaac-1",
    "FastQ1S3Path": "s3://aws-batch-genomics-resources/fastq/SRR1919605_1.fastq.gz",
    "BAMS3FolderPath": "s3://aws-batch-genomics-resources/fastq/SRR1919605_2.fastq.gz",
    "FastQ2S3Path": "s3://bccn-genome-data/fastq/NIST7035_R2_trimmed.fastq.gz",
    "ReferenceS3Path": "s3://aws-batch-genomics-resources/reference/isaac/"
  }
}

When you deploy the job definition, add the command attribute that was previously being constructed in the Lambda function launching the AWS Batch job.

IsaacJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "Isaac"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        BAMS3FolderPath: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam"
        FastQ1S3Path: "s3://aws-batch-genomics-resources/fastq/SRR1919605_1.fastq.gz"
        FastQ2S3Path: "s3://aws-batch-genomics-resources/fastq/SRR1919605_2.fastq.gz"
        ReferenceS3Path: "s3://aws-batch-genomics-resources/reference/isaac/"
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/isaac"
        Vcpus: 32
        Memory: 80000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--bam_s3_folder_path"
          - "Ref::BAMS3FolderPath"
          - "--fastq1_s3_path"
          - "Ref::FastQ1S3Path"
          - "--fastq2_s3_path"
          - "Ref::FastQ2S3Path"
          - "--reference_s3_path"
          - "Ref::ReferenceS3Path"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

The key-value parameters passed into the workflow are mapped using Parameters.$ to the values in the job definition using the keys.  Value substitutions do take place. The Docker run looks like the following:

docker run <isaac_container_uri> --bam_s3_folder_path s3://batch-genomics-pipeline-jobresultsbucket-1kzdu216m2b0k/NA12878_states_3/bam
                                 --fastq1_s3_path s3://aws-batch-genomics-resources/fastq/SRR1919605_1.fastq.gz
                                 --fastq2_s3_path s3://aws-batch-genomics-resources/fastq/SRR1919605_2.fastq.gz 
                                 --reference_s3_path s3://aws-batch-genomics-resources/reference/isaac/ 
                                 --working_dir /scratch

Genomics workflow: Before and after

Overall, connectors dramatically simplify your genomics workflow.  The following workflow is a simple genomics secondary analysis pipeline, which we highlighted in our original post series.

The first step aligns the sample against a reference genome.  When alignment is complete, variant calling and QA metrics are calculated in two parallel steps.  When variant calling is complete, variant annotation is performed.  Before, our genomics workflow looked like this:

Now it looks like this:

Here is the new workflow JSON:

{
   "Comment":"A simple genomics secondary-analysis workflow",
   "StartAt":"RunIsaacJob",
   "States":{
      "RunIsaacJob":{
         "Type":"Task",
         "Resource":"arn:aws:states:::batch:submitJob.sync",
         "Parameters":{
            "JobDefinition":"Isaac",
            "JobName.$":"$.isaac.JobName",
            "JobQueue":"HighPriority",
            "Parameters.$": "$.isaac"
         },
         "TimeoutSeconds": 900,
         "HeartbeatSeconds": 60,
         "Next":"Parallel",
         "InputPath":"$",
         "ResultPath":"$.status",
         "Retry" : [
            {
              "ErrorEquals": [ "States.Timeout" ],
              "IntervalSeconds": 3,
              "MaxAttempts": 2,
              "BackoffRate": 1.5
            }
         ]
      },
      "Parallel":{
         "Type":"Parallel",
         "Next":"FinalState",
         "Branches":[
            {
               "StartAt":"RunStrelkaJob",
               "States":{
                  "RunStrelkaJob":{
                     "Type":"Task",
                     "Resource":"arn:aws:states:::batch:submitJob.sync",
                     "Parameters":{
                        "JobDefinition":"Strelka",
                        "JobName.$":"$.strelka.JobName",
                        "JobQueue":"HighPriority",
                        "Parameters.$": "$.strelka"
                     },
                     "TimeoutSeconds": 900,
                     "HeartbeatSeconds": 60,
                     "Next":"RunSnpEffJob",
                     "InputPath":"$",
                     "ResultPath":"$.status",
                     "Retry" : [
                        {
                          "ErrorEquals": [ "States.Timeout" ],
                          "IntervalSeconds": 3,
                          "MaxAttempts": 2,
                          "BackoffRate": 1.5
                        }
                     ]
                  },
                  "RunSnpEffJob":{
                     "Type":"Task",
                     "Resource":"arn:aws:states:::batch:submitJob.sync",
                     "Parameters":{
                        "JobDefinition":"SNPEff",
                        "JobName.$":"$.snpeff.JobName",
                        "JobQueue":"HighPriority",
                        "Parameters.$": "$.snpeff"
                     },
                     "TimeoutSeconds": 900,
                     "HeartbeatSeconds": 60,
                     "Retry" : [
                        {
                          "ErrorEquals": [ "States.Timeout" ],
                          "IntervalSeconds": 3,
                          "MaxAttempts": 2,
                          "BackoffRate": 1.5
                        }
                     ],
                     "End":true
                  }
               }
            },
            {
               "StartAt":"RunSamtoolsStatsJob",
               "States":{
                  "RunSamtoolsStatsJob":{
                     "Type":"Task",
                     "Resource":"arn:aws:states:::batch:submitJob.sync",
                     "Parameters":{
                        "JobDefinition":"SamtoolsStats",
                        "JobName.$":"$.samtools.JobName",
                        "JobQueue":"HighPriority",
                        "Parameters.$": "$.samtools"
                     },
                     "TimeoutSeconds": 900,
                     "HeartbeatSeconds": 60,
                     "End":true,
                     "Retry" : [
                        {
                          "ErrorEquals": [ "States.Timeout" ],
                          "IntervalSeconds": 3,
                          "MaxAttempts": 2,
                          "BackoffRate": 1.5
                        }
                     ]
                  }
               }
            }
         ]
      },
      "FinalState":{
         "Type":"Pass",
         "End":true
      }
   }
}

Here is the new Amazon CloudFormation template for deploying the AWS Batch job definitions for each tool:

AWSTemplateFormatVersion: 2010-09-09

Description: Batch job definitions for batch genomics

Parameters:
  RoleStackName:
    Description: "Stack that deploys roles for genomic workflow"
    Type: String
  VPCStackName:
    Description: "Stack that deploys vps for genomic workflow"
    Type: String
  JobResultsBucket:
    Description: "Bucket that holds workflow job results"
    Type: String

Resources:
  IsaacJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "Isaac"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        BAMS3FolderPath: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam"
        FastQ1S3Path: "s3://aws-batch-genomics-resources/fastq/SRR1919605_1.fastq.gz"
        FastQ2S3Path: "s3://aws-batch-genomics-resources/fastq/SRR1919605_2.fastq.gz"
        ReferenceS3Path: "s3://aws-batch-genomics-resources/reference/isaac/"
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/isaac"
        Vcpus: 32
        Memory: 80000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--bam_s3_folder_path"
          - "Ref::BAMS3FolderPath"
          - "--fastq1_s3_path"
          - "Ref::FastQ1S3Path"
          - "--fastq2_s3_path"
          - "Ref::FastQ2S3Path"
          - "--reference_s3_path"
          - "Ref::ReferenceS3Path"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

  StrelkaJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "Strelka"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        BAMS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam/sorted.bam"
        BAIS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam/sorted.bam.bai"
        ReferenceS3Path: "s3://aws-batch-genomics-resources/reference/hg38.fa"
        ReferenceIndexS3Path: "s3://aws-batch-genomics-resources/reference/hg38.fa.fai"
        VCFS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/vcf"
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/strelka"
        Vcpus: 32
        Memory: 32000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--bam_s3_path"
          - "Ref::BAMS3Path"
          - "--bai_s3_path"
          - "Ref::BAIS3Path"
          - "--reference_s3_path"
          - "Ref::ReferenceS3Path"
          - "--reference_index_s3_path"
          - "Ref::ReferenceIndexS3Path"
          - "--vcf_s3_path"
          - "Ref::VCFS3Path"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

  SnpEffJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "SNPEff"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        VCFS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/vcf/variants/genome.vcf.gz"
        AnnotatedVCFS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/vcf/genome.anno.vcf"
        CommandArgs: " -t hg38 "
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/snpeff"
        Vcpus: 4
        Memory: 10000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--annotated_vcf_s3_path"
          - "Ref::AnnotatedVCFS3Path"
          - "--vcf_s3_path"
          - "Ref::VCFS3Path"
          - "--cmd_args"
          - "Ref::CommandArgs"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

  SamtoolsStatsJobDefinition:
    Type: AWS::Batch::JobDefinition
    Properties:
      JobDefinitionName: "SamtoolsStats"
      Type: container
      RetryStrategy:
        Attempts: 1
      Parameters:
        ReferenceS3Path: "s3://aws-batch-genomics-resources/reference/hg38.fa"
        BAMS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam/sorted.bam"
        BAMStatsS3Path: !Sub "s3://${JobResultsBucket}/NA12878_states_1/bam/sorted.bam.stats"
        WorkingDir: "/scratch"
      ContainerProperties:
        Image: "rulaszek/samtools-stats"
        Vcpus: 4
        Memory: 10000
        JobRoleArn:
          Fn::ImportValue: !Sub "${RoleStackName}:ECSTaskRole"
        Command:
          - "--bam_s3_path"
          - "Ref::BAMS3Path"
          - "--bam_stats_s3_path"
          - "Ref::BAMStatsS3Path"
          - "--reference_s3_path"
          - "Ref::ReferenceS3Path"
          - "--working_dir"
          - "Ref::WorkingDir"
        MountPoints:
          - ContainerPath: "/scratch"
            ReadOnly: false
            SourceVolume: docker_scratch
        Volumes:
          - Name: docker_scratch
            Host:
              SourcePath: "/docker_scratch"

Here is the new CloudFormation script that deploys the new workflow:

AWSTemplateFormatVersion: 2010-09-09

Description: State Machine for batch benomics

Parameters:
  RoleStackName:
    Description: "Stack that deploys roles for genomic workflow"
    Type: String
  VPCStackName:
    Description: "Stack that deploys vps for genomic workflow"
    Type: String

Resources:
  # S3
  GenomicWorkflow:
    Type: AWS::StepFunctions::StateMachine
    Properties:
      RoleArn:
        Fn::ImportValue: !Sub "${RoleStackName}:StatesExecutionRole"
      DefinitionString: !Sub |-
        {
           "Comment":"A simple example that submits a job to AWS Batch",
           "StartAt":"RunIsaacJob",
           "States":{
              "RunIsaacJob":{
                 "Type":"Task",
                 "Resource":"arn:aws:states:::batch:submitJob.sync",
                 "Parameters":{
                    "JobDefinition":"Isaac",
                    "JobName.$":"$.isaac.JobName",
                    "JobQueue":"HighPriority",
                    "Parameters.$": "$.isaac"
                 },
                 "TimeoutSeconds": 900,
                 "HeartbeatSeconds": 60,
                 "Next":"Parallel",
                 "InputPath":"$",
                 "ResultPath":"$.status",
                 "Retry" : [
                    {
                      "ErrorEquals": [ "States.Timeout" ],
                      "IntervalSeconds": 3,
                      "MaxAttempts": 2,
                      "BackoffRate": 1.5
                    }
                 ]
              },
              "Parallel":{
                 "Type":"Parallel",
                 "Next":"FinalState",
                 "Branches":[
                    {
                       "StartAt":"RunStrelkaJob",
                       "States":{
                          "RunStrelkaJob":{
                             "Type":"Task",
                             "Resource":"arn:aws:states:::batch:submitJob.sync",
                             "Parameters":{
                                "JobDefinition":"Strelka",
                                "JobName.$":"$.strelka.JobName",
                                "JobQueue":"HighPriority",
                                "Parameters.$": "$.strelka"
                             },
                             "TimeoutSeconds": 900,
                             "HeartbeatSeconds": 60,
                             "Next":"RunSnpEffJob",
                             "InputPath":"$",
                             "ResultPath":"$.status",
                             "Retry" : [
                                {
                                  "ErrorEquals": [ "States.Timeout" ],
                                  "IntervalSeconds": 3,
                                  "MaxAttempts": 2,
                                  "BackoffRate": 1.5
                                }
                             ]
                          },
                          "RunSnpEffJob":{
                             "Type":"Task",
                             "Resource":"arn:aws:states:::batch:submitJob.sync",
                             "Parameters":{
                                "JobDefinition":"SNPEff",
                                "JobName.$":"$.snpeff.JobName",
                                "JobQueue":"HighPriority",
                                "Parameters.$": "$.snpeff"
                             },
                             "TimeoutSeconds": 900,
                             "HeartbeatSeconds": 60,
                             "Retry" : [
                                {
                                  "ErrorEquals": [ "States.Timeout" ],
                                  "IntervalSeconds": 3,
                                  "MaxAttempts": 2,
                                  "BackoffRate": 1.5
                                }
                             ],
                             "End":true
                          }
                       }
                    },
                    {
                       "StartAt":"RunSamtoolsStatsJob",
                       "States":{
                          "RunSamtoolsStatsJob":{
                             "Type":"Task",
                             "Resource":"arn:aws:states:::batch:submitJob.sync",
                             "Parameters":{
                                "JobDefinition":"SamtoolsStats",
                                "JobName.$":"$.samtools.JobName",
                                "JobQueue":"HighPriority",
                                "Parameters.$": "$.samtools"
                             },
                             "TimeoutSeconds": 900,
                             "HeartbeatSeconds": 60,
                             "End":true,
                             "Retry" : [
                                {
                                  "ErrorEquals": [ "States.Timeout" ],
                                  "IntervalSeconds": 3,
                                  "MaxAttempts": 2,
                                  "BackoffRate": 1.5
                                }
                             ]
                          }
                       }
                    }
                 ]
              },
              "FinalState":{
                 "Type":"Pass",
                 "End":true
              }
           }
        }

Outputs:
  GenomicsWorkflowArn:
    Description: GenomicWorkflow ARN
    Value: !Ref GenomicWorkflow
  StackName:
    Description: StackName
    Value: !Sub ${AWS::StackName}

Conclusion

AWS Step Functions service integrations are a great way to simplify creating complex workflows with asynchronous steps. While we highlighted the use case with AWS Batch today, there are many other ways that healthcare and life sciences customers can use this new feature, such as with message processing.

For more information about how AWS can enable your genomics workloads, be sure to check out the AWS Genomics page.

We’ve updated the open-source project to take advantage of the new AWS Batch integration in Step Functions.  You can find the changes aws-batch-genomics/tree/v2.0.0 folder.

Original posts in this four-part series:

Happy coding!

Create cross-account and cross-region AWS Glue connections

Post Syndicated from Pankaj Malhotra original https://aws.amazon.com/blogs/big-data/create-cross-account-and-cross-region-aws-glue-connections/

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load data for analytics. AWS Glue uses connections to access certain types of source and target data stores, as described in the AWS Glue documentation.

By default, you can use AWS Glue to create connections to data stores in the same AWS account and AWS Region as the one where you have AWS Glue resources. In this blog post, we describe how to access data stores in an account or AWS Region different from the one where you have AWS Glue resources.

AWS Glue connections

AWS Glue uses a connection to crawl and catalog a data store’s metadata in the AWS Glue Data Catalog, as the documentation describes. AWS Glue ETL jobs also use connections to connect to source and target data stores. AWS Glue supports connections to Amazon Redshift, Amazon RDS, and JDBC data stores.

A connection contains the properties needed by AWS Glue to access a data store. These properties might include connection information such as user name and password, data store subnet IDs, and security groups.

If the data store is located inside an Amazon VPC, AWS Glue uses the VPC subnet ID and security group ID connection properties to set up elastic network interfaces in the VPC containing the data store. Doing this enables ETL jobs and crawlers to connect securely to the data store in the VPC.

AWS Glue can create this elastic network interface setup if the VPC containing the data store is in the same account and AWS Region as the AWS Glue resources. The security groups specified in a connection’s properties are applied on each of the network interfaces. The security group rules and network ACLs associated with the subnet control network traffic through the subnet. Correct rules for allowing outbound traffic through the subnet ensure that AWS Glue can establish network connectivity with all subnets in the VPC containing the data store, and therefore access the source or target data store.

VPC components can be interlinked only if they are present in the same AWS Region. Therefore, AWS Glue cannot create the elastic network interfaces inside a VPC in another region. If the VPC containing the data store is in another region, you have to add the network routes and create additional network interfaces which allow network interfaces set up by AWS Glue to establish network connectivity with the data store.

In this blog post, we describe how to configure the networking routes and interfaces to give AWS Glue access to a data store in an AWS Region different from the one with your AWS Glue resources. In our example, we connect AWS Glue, located in Region A, to an Amazon Redshift data warehouse located in Region B.

Note: The examples here assume that the Amazon Redshift cluster is in a different AWS Region, but belongs to the same account. The same setup and instructions are also valid for an Amazon Redshift cluster in a different account.

Setting up VPC components for AWS Glue

AWS Glue requires a VPC with networking routes to the data stores to which it connects. In our solution, the security groups and route tables are configured to enable elastic network interfaces set up by AWS Glue in a private subnet to reach the internet or connect to data stores outside the VPC. The following diagram shows the necessary components and the network traffic flow.

Required components for VPC setup:

  • AWS Glue resources in a private subnet in Region A.
  • A NAT gateway with an Elastic IP address attached to it in a public subnet in Region A.
  • A private route table containing a route allowing outbound network traffic from the private subnet to pass through the NAT gateway.
  • An internet gateway in Region A.
  • A public route table with a route allowing outbound network traffic from the public subnet to pass through the internet gateway.

Note: We must update the default security group of the VPC to include a self-referencing inbound rule and an outbound rule to allow all traffic from all ports. Later in the example, we attach this security group to an AWS Glue connection to let network interfaces set up by AWS Glue communicate with each other within a private subnet.

Network traffic flow through the components:

Outbound network traffic from AWS Glue resources in the private subnet to any destination or data store outside the private subnet is routed through the NAT gateway.

The NAT gateway is present in a public subnet and has an associated Elastic IP address. It forwards network traffic from AWS Glue resources to internet by using an internet gateway.

When AWS Glue tries to establish a connection with a data store outside of the private subnet, the incoming network traffic on the data store side appears to come from the NAT Gateway.

On the data store side, you allow the data store or its security group to accept incoming network traffic from the Elastic IP address attached to the NAT gateway. This is shown in the section “Allow Amazon Redshift to accept network traffic from AWS Glue,” following.

Creating VPC components using AWS CloudFormation

You can automate the creation of a VPC and all the components described preceding using the vpc_setup.yaml CloudFormation template, hosted on GitHub. Follow these step-by-step instructions to create the stack in your AWS account:

  1. Deploy the stack in the US Oregon (us-west-2) Region:

Note: In this example, we create the AWS Glue resources and connection in the us-west-2 Region. You can change this to the AWS Region where you have your AWS Glue connection and resources.

You are directed to the AWS CloudFormation console, with the stack name and URL template fields pre-filled.

  1. Choose Next.
  2. Use the default IP ranges and choose Next.
  3. Skip this step and choose Next.
  4. Review and choose Create.
  5. Wait for stack creation to complete. After completion, all the VPC components and necessary setup required are created.
  6. Navigate to the VPC console and copy the Elastic IP address for the newly created NAT.
    Note: This IP address is used for outbound network flow from AWS Glue resources and so should be whitelisted on the data store side. For more detail, see “Allow Amazon Redshift to accept network traffic from AWS Glue,” following.

Before creating and testing an AWS Glue connection to your data store, you need an IAM role that lets AWS Glue access the VPC components that you just created.

Creating an IAM role to let AWS Glue access Amazon VPC components

For this example, we create a role called TestAWSGlueConnectionIAMRole with a managed IAM policy AWSGlueServiceRole attached to it.

  1. Choose the Roles tab from the AWS Identity and Access Management (IAM) console.
  2. Choose Create role and select AWS Glue as a trusted entity.

  1. Attach an IAM policy to the role that allows AWS Glue to access the VPC components. In this example, we are using the default AWSGlueServiceRole policy, which contains all the required permissions for the setup.

  1. We name the role TestAWSGlueConnectionRole.

Note: The default GlueServiceRole policy that we attached to our custom role TestAWSGlueConnectionIAMRole has permissions for accessing VPC components. If you are using a custom policy instead of the default one, it should also contain the same permissions to be able to access VPC components.

Creating an Amazon Redshift cluster using AWS CloudFormation

For this example, we create a sample Amazon Redshift cluster in a VPC in the US N. Virginia (us-east-1) Region. Follow these step-by-step instructions to create the stack in your AWS account:

  1. Navigate to the CloudFormation console in region us-east-1 and create a new stack using this CloudFormation template, described in the documentation.
  2. Provide the configuration for the cluster and MasterUsername and MasterUserPassword. MasterUserPassword must follow the following constraints:
  • It must be 8–64 characters in length.
  • It must contain at least one uppercase letter, one lowercase letter, and one number.
  • It can use any printable ASCII characters (ASCII code 33–126) except ‘ (single quote), ” (double quote), :, \, /, @, or space.

  1. Choose Next and proceed with the stack creation.
  2. Review the configuration and choose Create.

  1. Wait for stack creation to complete, which can take a few minutes.

  1. Navigate to the Amazon Redshift console and choose the cluster name to see the cluster properties.

  1. Note the JDBC URL for the cluster and the attached security group for later use.

Note: We created a sample Amazon Redshift cluster in a public subnet present inside a VPC in Region B. We recommend that you follow the best practices for increased security and availability while setting up a new Amazon Redshift cluster, as shown in our samples on GitHub.

Creating an AWS Glue connection

Now you have the required VPC setup, Amazon Redshift cluster, and IAM role in place. Next, you can create an AWS Glue connection and test it as follows:

  1. Choose Add Connection under the Connections tab in AWS Glue console. The AWS Region in which we are creating this connection is the same as for our VPC setup, that is US Oregon (us-west-2).

  1. Choose a JDBC connection type. You can choose to enforce JDBC SSL or not, depending on the configuration for your data store.

  1. Add the connection-specific configuration. Note the URL for our Amazon Redshift cluster. It shows that the Amazon Redshift cluster is present in us-east-1.

Note: We use the VPC (VPCForGlue) and the private subnet (GluePrivateSubnet) we created for this connection. For security groups, we use the default security group for the VPC. This security group has a self-referencing inbound rule and an outbound rule that allows all traffic.

  1. Review configuration and choose Finish.

The AWS Glue console should now show that the connection was created successfully.

Note: Completing this step just means that an AWS Glue connection was created. It doesn’t guarantee that AWS Glue can actually connect to your data store. Before we test the connection, we also need to allow Amazon Redshift to accept network traffic coming from AWS Glue.

Allow Amazon Redshift to accept network traffic from AWS Glue

The Amazon Redshift cluster in a different AWS Region (us-east-1) from AWS Glue must allow incoming network traffic from AWS Glue.

For this, we update the security group attached to the Amazon Redshift cluster, and whitelist the Elastic IP address attached to the NAT gateway for the AWS Glue VPC.

Testing the AWS Glue connection

As a best practice, before you use a data store connection in an ETL job, choose Test connection. AWS Glue uses the parameters in your connection to confirm that it can access your data store and reports back any errors.

  1. Select the connection TestAWSGlueConnection that we just created and choose Test Connection.
  2. Select the TestAWSGlueConnectionIAMRole that we created for allowing AWS Glue resources to access VPC components.

  1. After you choose the Test connection button in the previous step, it can take a few seconds for AWS Glue to successfully connect to the data store. When it does, the console shows a message saying it “connected successfully to your instance.”

Conclusion

By creating a VPC setup similar to the one we describe, you can let AWS Glue connect to a data store in a different account or AWS Region. By doing this, you establish network connectivity between AWS Glue resources and your data store. You can now use this AWS Glue connection in ETL jobs and AWS Glue crawlers to connect with the data store.

If you have questions or suggestions, please leave a comment following.


Additional Reading

If you found this post helpful, be sure to check out Connecting to and running ETL jobs across multiple VPCs using a dedicated AWS Glue VPC, and How to access and analyze on-premises data stores using AWS Glue.

 


About the Author

Pankaj Malhotra is a Software Development Engineer at Amazon Web Services. He enjoys solving problems related to cloud infrastructure and distributed systems. He specializes in developing multi-regional, resilient services using serverless technologies.

 

 

 

Connecting to and running ETL jobs across multiple VPCs using a dedicated AWS Glue VPC

Post Syndicated from Nivas Shankar original https://aws.amazon.com/blogs/big-data/connecting-to-and-running-etl-jobs-across-multiple-vpcs-using-a-dedicated-aws-glue-vpc/

Many organizations use a setup that includes multiple VPCs based on the Amazon VPC service, with databases isolated in separate VPCs for security, auditing, and compliance purposes. This blog post shows how you can use AWS Glue to perform extract, transform, load (ETL) and crawler operations for databases located in multiple VPCs.

The solution presented here uses a dedicated AWS Glue VPC and subnet to perform the following operations on databases located in different VPCs:

  • Scenario 1: Ingest data from an Amazon RDS for MySQL database, transform it in AWS Glue, and output the results to an Amazon Redshift data warehouse.
  • Scenario 2: Ingest data from an Amazon RDS for MySQL database, transform it in AWS Glue, and output the results to an Amazon RDS for PostgreSQL database.

In this blog post, we’ll go through the steps needed to build an ETL pipeline that consumes from one source in one VPC and outputs it to another source in a different VPC. We’ll set up in multiple VPCs to reproduce a situation where your database instances are in multiple VPCs for isolation related to security, audit, or other purposes.

For this solution, we create a VPC dedicated to AWS Glue. Next, we set up VPC peering between the AWS Glue VPC and all of the other database VPCs. Then we configure an Amazon S3 endpoint, route tables, security groups, and IAM so that AWS Glue can function properly. Lastly, we create AWS Glue connections and an AWS Glue job to perform the task at hand.

Step 1: Set up a VPC

To simulate these scenarios, we create four VPCs with their respective IPv4 CIDR ranges. (Note: CIDR ranges can’t overlap when you use VPC peering.)

VPC 1Amazon Redshift172.31.0.0/16
VPC 2Amazon RDS for MySQL172.32.0.0/16
VPC 3Amazon RDS for PostgreSQL172.33.0.0/16
VPC 4AWS Glue172.30.0.0/16

Key configuration notes:

  1. The AWS Glue VPC needs at least one private subnet for AWS Glue to use.
  2. Ensure that DNS hostnames are enabled for all of your VPCs (unless you plan to refer to your databases by IP address later on, which isn’t recommended).

Step 2: Set up a VPC peering connection

Next, we peer our VPCs together to ensure that AWS Glue can communicate with all of the database targets and sources. This approach is necessary because AWS Glue resources are created with private addresses only. Thus, they can’t use an internet gateway to communicate with public addresses, such as public database endpoints. If your database endpoints are public, you can alternatively use a network address translation (NAT) gateway with AWS Glue rather than peer across VPCs.

Create the following peering connections.

RequesterAccepter
Peer 1172.30.0.0/16- VPC 4172.31.0.0/16- VPC 1
Peer 2172.30.0.0/16- VPC 4172.32.0.0/16 -VPC 2
Peer 3172.30.0.0/16- VPC 4172.33.0.0/16- VPC 3

These peering connections can be across separate AWS Regions if needed. The database VPCs are not peered together; they are all peered with the AWS Glue VPC instead. We do this because AWS Glue connects to each database from its own VPC. The databases don’t connect to each other.

Key configuration notes:

  1. Create a VPC peering connection, as described in the Amazon VPC documentation. Select the AWS Glue VPC as the requester and the VPC for your database as the accepter.
  2. Accept the VPC peering request. If you are peering to a different AWS Region, switch to that AWS Region to accept the request.

Important: Enable Domain Name Service (DNS) settings for each of the peering connections. Doing this ensures that AWS Glue can retrieve the private IP address of your database endpoints. Otherwise, AWS Glue resolves your database endpoints to public IP addresses. AWS Glue can’t connect to public IP addresses without a NAT gateway.

Step 3: Create an Amazon S3 endpoint for the AWS Glue subnet

We need to add an Amazon S3 endpoint to the AWS Glue VPC (VPC 4). During setup, associate the endpoint with the route table that your private subnet uses. For more details on creating an S3 endpoint for AWS Glue, see Amazon VPC Endpoints for Amazon S3 in the AWS Glue documentation.

AWS Glue uses S3 to store your scripts and temporary data to load into Amazon Redshift.

Step 4: Create a route table configuration

Add the following routes to the route tables used by the respective services’ subnets. These routes are configured along with existing settings.

VPC 4—AWS GlueDestinationTarget
Route table172.33.0.0/16- VPC 3Peer 3
172.31.0.0/16- VPC 1Peer 1
172.32.0.0/16- VPC 2Peer 2

 

VPC 1—Amazon RedshiftDestinationTarget
Route table172.30.0.0/16- VPC 4Peer 1

 

VPC 2—Amazon RDS MySQLDestinationTarget
Route table172.30.0.0/16- VPC 4Peer 2

 

VPC 3—Amazon RDS PostgreSQLDestinationTarget
Route table172.30.0.0/16- VPC 4Peer 3

Key configuration notes:

  • The route table for the AWS Glue VPC has peering connections to all VPCs. It has these so that AWS Glue can initiate connections to all of the databases.
  • All of the database VPCs have a peering connection back to the AWS Glue VPC. They have these connections to allow return traffic to reach AWS Glue.
  • Ensure that your S3 endpoint is present in the route table for the AWS Glue VPC.

Step 5: Update the database security groups

Each database’s security group must allow traffic to its listening port (3306, 5432, 5439, and so on) from the AWS Glue VPC for AWS Glue to be able to connect to it. It’s also a good idea to restrict the range of source IP addresses as much as possible.

There are two ways to accomplish this. If your AWS Glue job will be in the same AWS Region as the resource, you can define the source as the security group that you use for AWS Glue. If you are using AWS Glue to connect across AWS Regions, specify the IP range from the private subnet in the AWS Glue VPC instead. The examples following use a security group as our AWS Glue job, and data sources are all in the same AWS Region.

In addition to configuring the database’s security groups, AWS Glue requires a special security group that allows all inbound traffic from itself. Because it isn’t secure to allow traffic from 0.0.0.0/0, we create a self-referencing rule that simply allows all traffic originating from the security group. You can create a new security group for this purpose, or you can modify an existing security group. In the example following, we create a new security group to use later when AWS Glue connections are created.

The security group Amazon RDS for MySQL needs to allow traffic from AWS Glue:

Amazon RDS for PostgreSQL allows traffic to its listening port from the same:

Amazon Redshift does it as so:

AWS Glue does it as so:

Step 6: Set up IAM

Make sure that you have an AWS Glue IAM role with access to Amazon S3. You might want to provide your own policy for access to specific Amazon S3 resources. Data sources require s3:ListBucket and s3:GetObject permissions. Data targets require s3:ListBucket, s3:PutObject, and s3:DeleteObject permissions. For more information on creating an Amazon S3 policy for your resources, see Policies and Permissions in the IAM documentation.

The role should look like this:

Or you can create an S3 policy that’s more restricted to suit your use case.

Step 7: Set up an AWS Glue connection

The Amazon RDS for MySQL connection in AWS Glue should look like this:

The Amazon Redshift connection should look like this:

The Amazon RDS for PostgreSQL connection should look like this:

Step 8: Set up an AWS Glue job

Key configuration notes:

  1. Create a crawler to import table metadata from the source database (Amazon RDS for MySQL) into the AWS Glue Data Catalog. The scenario includes a database in the catalog named gluedb, to which the crawler adds the sample tables from the source Amazon RDS for MySQL database.
  2. Use either the source connection or destination connection to create a sample job as shown following. (This step is required for the AWS Glue job to establish a network connection and create the necessary elastic network interfaces with the databases’ VPCs and peered connections.)
  3. This scenario uses pyspark code and performs the load operation from Amazon RDS for MySQL to Amazon Redshift. The ingest from Amazon RDS for MySQL to Amazon RDS for PostgreSQL includes a similar job.
  4. After running the job, verify that the table exists in the target database and that the counts match.

The following screenshots show the steps to create a job in the AWS Glue Management Console.

Following are some of examples of loading data from source tables to target instances. These are simple one-to-one mappings, with no transformations applied. Notice that the data sources and data sink (target) connection configuration access multiple VPCs from a single AWS Glue job.

Sample script 1 (Amazon RDS for MySQL to Amazon Redshift)

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job


sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource = glueContext.create_dynamic_frame.from_catalog(database = "gluedb", table_name = "mysqldb_events", transformation_ctx = "datasource")


datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = datasource, catalog_connection = "Redshift", connection_options = {"dbtable": "mysqldb_events", "database": "dmartblog"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink")

job.commit()

Sample script 2:  Amazon RDS for MySQL to Amazon RDS for PostgreSQL (can also change with other RDS endpoint)

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)


datasource = glueContext.create_dynamic_frame.from_catalog(database = "gluedb", table_name = "mysqldb_events", transformation_ctx = "datasource")

datasink = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "vpc-pgsql", connection_options = {"dbtable": "mysqldb_events", "database": "mypgsql"}, transformation_ctx = "datasink")

job.commit()

Summary

In this blog post, you learn how to configure AWS Glue to run in a separate VPC so that it can execute jobs for databases located in multiple VPCs.

The benefits of doing this include the following:

  • A separate VPC and dedicated pool on the running AWS Glue job, isolated from database and compute nodes.
  • Dedicated ETL developer access to a single VPC for better security control and provisioning.

 


Additional Reading

If you found this post useful, be sure to check out Restrict access to your AWS Glue Data Catalog with resource-level IAM permissions and resource-based policies, and Using Amazon Redshift Spectrum, Amazon Athena, and AWS Glue with Node.js in Production.

 


About the Author

Nivas Shankar is a Senior Big Data Consultant at Amazon Web Services. He helps and works closely with enterprise customers building big data applications on the AWS platform. He holds a Masters degree in physics and is highly passionate about theoretical physics concepts. He enjoys spending time with his wife and two adorable kids. In his spare time, he takes his kids to tennis and football practice.

 

 

Ian Eberhart is a Cloud Support Engineer on the Big Data team for AWS Premium Support. He works with customers on a daily basis to find solutions for moving and sorting their data on the AWS platform. In his spare time, Ian enjoys seeing independent and weird movies, riding his bike, and hiking in the mountains.

 

Chasing earthquakes: How to prepare an unstructured dataset for visualization via ETL processing with Amazon Redshift

Post Syndicated from Ian Funnell original https://aws.amazon.com/blogs/big-data/chasing-earthquakes-how-to-prepare-an-unstructured-dataset-for-visualization-via-etl-processing-with-amazon-redshift/

As organizations expand analytics practices and hire data scientists and other specialized roles, big data pipelines are growing increasingly complex. Sophisticated models are being built using the troves of data being collected every second.

The bottleneck today is often not the know-how of analytical techniques. Rather, it’s the difficulty of building and maintaining ETL (extract, transform, and load) jobs using tools that might be unsuitable for the cloud.

In this post, I demonstrate a solution to this challenge. I start with a noisy semistructured dataset of seismic events, spanning several years and recorded at different locations across the globe. I set out to obtain broad insights about the nature of the rocks forming the Earth’s surface itself—the tectonic plate structure—to be visualized using the mapping capability in Amazon QuickSight.

To accomplish this, I use several AWS services, orchestrated together using Matillion ETL for Amazon Redshift:

Tectonic plate structure context

An earthquake is caused by a buildup of pressure that gets suddenly released. Earthquakes tend to be more severe at the boundaries of destructive tectonic plates. These boundaries are formed when a heavier and denser oceanic plate collides with a lighter continental plate, or when two oceanic plates collide. Due to the difference in density, the oceanic lithosphere is pushed underneath the continental plate, forming what is called a subduction zone. (See the following diagram.) In subduction zones, earthquakes can occur at depths as great as 700 kilometers.

Photography by KDS4444 [CC BY-SA 4.0 (https://creativecommons.org/licenses/by-sa/4.0)], from Wikimedia Commons

For our analysis, we ultimately want to visualize the depth of an earthquake focus to locate subduction zones, and therefore find places on earth with the most severe earthquakes.

Seismographic data source

The data source is from the International Federation of Digital Seismograph Networks (FDSN). The event data is in JSON format (from the European Mediterranean Seismological Centre, or EMSC). An external process accumulates files daily into an Amazon S3 bucket, as shown following.

Each individual file contains all the seismic events for one day—usually several hundred—in an embedded array named “features,” as shown in the following example:

{
  "type": "FeatureCollection",
  "metadata": {
    "totalCount": 103
  },
  "features": [
    {
      "geometry": {
        "type": "Point",
        "coordinates": [26.76, 45.77, -140]
      },
      "type": "Feature",
      "id": "20180302_0000103",
      "properties": {
        "lastupdate": "2018-03-02T23:27:00.0Z",
        "lon": 26.76, "lat": 45.77, "depth": 140,
        "mag": 3.7,
        "time": "2018-03-02T23:22:52.1Z",
        "flynn_region": "ROMANIA"
      }
    },
    {
      "geometry": {
        "type": "Point",

Architecture overview

Athena reads and flattens the S3 data and makes it available for Matillion ETL to load into Amazon Redshift via JDBC. Matillion orchestrates this data movement, and it also provides a graphical framework to design and build the more complex data enrichment and aggregation steps to be performed by Amazon Redshift. Finally, the prepared data is queried by Amazon QuickSight for visualization.

Amazon Athena setup

You can use Athena to query data in S3 using standard SQL, via a serverless infrastructure that is managed entirely by AWS on your behalf. Before you can query your data, start by creating an external table. By doing this, you are defining the schema to apply to the data when it is being queried.

You can choose to use an AWS Glue crawler to assist in automatically discovering the schema and format of your source data files.

The following is the CREATE TABLE statement that you can copy and paste into the Athena console to create the schema needed to query the seismic data. Make sure that you substitute the correct S3 path to your seismic data in the LOCATION field of the statement.

CREATE EXTERNAL TABLE `sp_events`(
  `type` string COMMENT 'from deserializer', 
  `metadata` struct<totalcount:int> COMMENT 'from deserializer', 
  `features` array<struct<geometry:struct<type:string,coordinates:array<double>>,type:string,id:string,properties:struct<lastupdate:string,magtype:string,evtype:string,lon:double,auth:string,lat:double,depth:double,unid:string,mag:double,time:string,source_id:string,source_catalog:string,flynn_region:string>>> COMMENT 'from deserializer')
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://your-bucket/SeismicPortal'

After the table is created, you are ready to query it. Athena uses the in-memory, distributed SQL engine called Apache Presto. It provides the ability to unnest arrays, which you use next.

Transfer to Amazon Redshift

The embedded array in every source record gets flattened out and converted to individual records during the JDBC export (download the .jar file) into Amazon Redshift. You use a Matillion ETL Database Query component to assist with the data transfer during this step, as shown in the following image.

This component simplifies ETL by automating the following steps:

Runs the SQL SELECT statement (shown in the following example).

  1. Streams query results across the network from Athena and into temporary storage in S3.
  2. Performs a bulk data load into Amazon Redshift.

Athena executes the following SQL statement:

SELECT f.id,
	   f.properties.time AS event_time,
	   f.properties.lastupdate,
   f.properties.lon,
   f.properties.lat,
   f.properties.depth,
   f.properties.mag,
   f.properties.flynn_region
FROM “seismic”.”sp_events”
CROSS JOIN UNNEST (features) as t(f)

The CROSS JOIN UNNEST syntax flattens the embedded array, generating hundreds of individual event records per day.

Now that the data has been copied and flattened into individual event records (shown in the following image), it’s ready for enrichment and aggregation.

Data enrichment

Earthquakes occur along a continuous range of spatial coordinates. In order to aggregate them, as we’ll be doing very soon, it’s necessary to first group them together. A convenient method is to assign every event into a Universal Transverse Mercator (UTM) zone. These zones are six-degree bands of longitudes that convert the spherical latitude/longitude coordinates into a 2D representation. Performing this conversion provides good granularity for visualization later.

The calculation to convert a spherical longitude/latitude coordinate into a two-dimensional UTM coordinate is complex. It can be performed ideally using an Amazon Redshift user-defined function (UDF). I chose a UDF for the ability to invoke it, via a Matillion component, in the next step.

CREATE OR REPLACE FUNCTION f_ll_utm (Lat float, Long float)
      RETURNS VARCHAR
STABLE
AS $$
From math import pi, sin, cos, tan, sqrt

_deg2rad = pi / 180.0
_rad2deg = 180.0 / pi

_EquatorialRadius = 1
_eccentricitySquared = 2
_ellipsoid = [ “WGS-84”, 6378137, 0.00669438]

The UDF has to return three pieces of information:

  • UTM Zone code
  • Easting (x-axis measurement in meters)
  • Northing (ditto, for the y-axis)

A scalar UDF can only return a single value. Therefore the three results were returned as a pipe-delimited string, in which the three values are pipe-separated:

To bring the values out into individual fields, the UDF is first invoked using a Matillion ETL Calculator component, followed by a field splitter and a Calculator to perform data type conversion and rounding.

Data aggregation

To reiterate, we’re interested in the depth of earthquake focus specifically on destructive plate boundaries. Knowing the depth helps us estimate the potential severity of earthquakes.

We need to find the average event depth within each UTM zone, in the expectation that a spatial pattern will appear that will highlight the subduction zones.

The last three steps in the Matillion transformation (shown in the following diagram) perform the necessary aggregation, add a depth quartile, and create an output table from the resulting data.

The ”Aggregate to UTM ref” step gets Amazon Redshift to perform a GROUP BY function in SQL, which approximates every event to the appropriate UTM zone. While doing this aggregation, you simultaneously do the following:

  • Count the events (which determines the size of the visual representation).
  • Find the average depth (which determines the color of the visual representation).
  • Determine the average latitude and longitude (which approximates to the center of the UTM zone, and determines the position of the visual representation).

The following image shows the aggregation type for each column:

Average depth is a useful measure, but to maximize the visual impact of the final presentation, we also take the opportunity to rank the results into quartiles. This allows the zones with the deepest quartile to stand out consistently on the map.

NTILE(4) OVER (ORDER BY "avg_depth")

Amazon Redshift is great at performing this type of analytics, which is delivered inside another Matillion ETL Calculator component.

The Recreate Output step materializes the dataset into an Amazon Redshift table, ready for Amazon QuickSight to visualize.

Amazon QuickSight visualization

The Amazon QuickSight “points on map” visualization is perfect for this 2D rendering. The values for the field wells come straight from the aggregated data in Amazon Redshift:

  • Geospatial — the average lat/long per UTM grid.
  • Size — the record count, in other words, the number of events in that UTM zone.
  • Color — the Depth Ntile, with the fourth quartile in pink.

The resulting map shows the global subduction zones highlighted clearly in pink, these being the areas with the deepest earthquake’s focus on average.

Recap and summary

In this post, I used seismological data as an example to explore challenges around the visualization of unstructured data and to provide best practices. I suggested a way to overcome these challenges with an architecture that is also applicable for datasets from a wide array of sources, beyond geology. I then explored how to orchestrate activities of different data processing tasks between S3, Athena, and Amazon Redshift using Matillion ETL for Amazon Redshift.

If you’re trying to solve similar analytics challenges and want to see how Amazon Redshift and Matillion can help, launch a 14 day free trial of Matillion ETL for Amazon Redshift on the AWS Marketplace or schedule a demo today. If you have questions or suggestions, please comment below.


Additional Reading

If you found this post helpful, be sure to check out Top 8 Best Practices for High-Performance ETL Processing Using Amazon Redshift, and Orchestrate multiple ETL jobs using AWS Step Functions and AWS Lambda.

 


About the Author

Ian Funnell, a lead solution architect at Matillion, is a seasoned cloud and data warehousing specialist. Ian is dedicated to supporting customers in driving their data transformation forward and solving their deepest technical challenges. Ian’s has 25+ years experience in the tech industry.

 

 

 

Restrict access to your AWS Glue Data Catalog with resource-level IAM permissions and resource-based policies

Post Syndicated from Ben Snively original https://aws.amazon.com/blogs/big-data/restrict-access-to-your-aws-glue-data-catalog-with-resource-level-iam-permissions-and-resource-based-policies/

A data lake provides a centralized repository that you can use to store all your structured and unstructured data at any scale. A data lake can include both raw datasets and curated, query-optimized datasets. Raw datasets can be quickly ingested, in their original form, without having to force-fit them into a predefined schema. Using data lakes, you can run different types of analytics on both raw and curated datasets. By using Amazon S3 as the storage layer of your data lakes, you can have a set of rich controls at both the bucket and object level. You can use these to define access control policies for the datasets in your lake.

The AWS Glue Data Catalog is a persistent, fully managed metadata store for your data lake on AWS. Using the Glue Data Catalog, you can store, annotate, and share metadata in the AWS Cloud in the same way you do in an Apache Hive Metastore. The Glue Data Catalog also has seamless out-of-box integration with Amazon Athena, Amazon EMR, and Amazon Redshift Spectrum.

Using AWS Glue, you can also create policies to restrict access to different portions of the catalog based on users, roles, or applied at a resource level. With these policies, you can provide granular control over which users can access the various metadata definitions in your data lake.

Important: The S3 and the AWS Glue Data Catalog policies define the access permissions to the data and metadata definitions respectively. In other words, the AWS Glue Data Catalog policies define the access to the metadata, and the S3 policies define the access to the content itself.

You can restrict which metadata operations can be performed, such as GetDatabases, GetTables, and CreateTable, and others using identiy-based policies (IAM). You can also restrict which data catalog objects those operations are performed on. Additionally, you can limit which catalog objects get returned in the resulting call. A Glue Data Catalog “object” here refers to a database, a table, a user-defined function, or a connection stored in the Glue Data Catalog.

Suppose that you have users that require read access to your production databases and tables in your data lake, and others have additional permissions to dev resources. Suppose also that you have a data lake storing both raw data feeds and curated datasets used by business intelligence, analytics, and machine learning applications. You can set these configurations easily, and many others, using the access control mechanisms in the AWS Glue Data Catalog.

Note: The following example shows how to set up a policy on the AWS Glue Data Catalog. It doesn’t set up the related S3 bucket or object level policies. This means the metadata isn’t discoverable when using Athena, EMR, and tools integrating with the AWS Glue Data Catalog. At the point when someone tries to access an S3 object directly, S3 policy enforcement is important. You should use Data Catalog and S3 bucket or object level policies together.

Fine-grained access control

You can define the access to the metadata using both resource-based and identity-based policies, depending on your organization’s needs. Resource-based policies list the principals that are allowed or denied access to your resources, allowing you to set up policies such as cross-account access. Identity policies are specifically attached to users, groups, and roles within IAM.

The fine-grained access portion of the policy is defined within the Resource clause. This portion defines both the AWS Glue Data Catalog object that the action can be performed on, and what resulting objects get returned by that operation.

Let’s run through an example. Suppose that you want to define a policy that allows a set of users to access only the finegrainacces database. The policy also allows users to return all the tables listed within the database. For the GetTables actions, the resource definitions include these resource statements:

"arn:aws:glue:us-east-1:123456789012:catalog",
"arn:aws:glue:us-east-1:123456789012:database/finegrainaccess",
"arn:aws:glue:us-east-1:123456789012:table/finegrainaccess/*"

The first resource statement at the database Amazon Resource Name (ARN) allows the user to call the operation on the finegrainaccess database. The second ARN allows all the tables within that database to be returned.

Now, what if we want to return only the tables that started with “dev_” from the “finegrainaccess” database? If so, this is how the policy changes:

"arn:aws:glue:us-east-1:123456789012:catalog",
"arn:aws:glue:us-east-1:123456789012:db/finegrainaccess",           
"arn:aws:glue:us-east-1:123456789012:tables/finegrainaccess/dev_*"  

Now, we are specifying dev_ as part of the table’s ARN in the second resource definition. This approach also works with actions for getting the list of databases, partitions for a table, connections, and other operations in the catalog.

Taking it for a spin

Note: This post focuses on the policies for AWS Glue Data Catalog. If you look closely, all of these datasets are pointing to the same S3 locations, which are world-readable. In a full example, you should also set the necessary S3 bucket or object level permissions, or both.

Next, we show an example you can do yourself. The next example creates the following in a Data Catalog.

We set up two users in the example, as shown following.

In the AWS Management Console, launch the AWS CloudFormation template.

Choose Next.

Important: Enter a password for the IAM users to be created. These users will have permissions to run Athena queries, access to your Athena S3 results bucket, and see the AWS Glue databases and tables that the CloudFormation script creates. These permissions must match the minimum requirements of the IAM password policy on the account that you run this example from.

Choose Next, and then on the next page choose Next again.

Lastly, acknowledge that the template will create IAM users and policies.

Then choose Create.

When you refresh your CloudFormation page, you can see the script creating the example resources.

Wait until it’s complete.

This script creates the necessary IAM users and policies attached to them, along with the necessarily databases and tables listed preceding.

After the CloudFormation script completes, you should see these tables if using an administrator user.

If you look on the Outputs tab, you can see the two IAM users that were created along with your IAM sign-in URL.

Note:

If you click the sign-in link in the same browser, the system logs you out. A nice trick is to right-click and open a private or incognito window.

If the provided IAM password doesn’t meet the minimum requirements, you see this message in the CloudFormation script event log:

The specified password is invalid … <why it was invalid>

Looking in the AWS Glue Data Catalog, you can see the tables that just got created by the script.

We can see the script created the structure that we outlined preceding.

Let’s check the two user profiles. If you go into IAM and users, they are set as inline policies. You should see the following for each user.

For the AWS Glue dev user, this section gives us full access to anything in the dev databases:

This section gives us the ability to query and see the prod database:

Lastly, this section gives us access to get tables and partitions from the prod database. You can structure this section so that it explicitly lists the blog_prod database in the resource and only allow that. The following lets someone query for database/* and return only the blog_prod tables. This, in fact, is the default behavior of the console.

Without this, you could still query those two databases explicitly, but the policy would not allow a wildcard query such as the following.

In contrast, the QA user doesn’t have access to the dev database and can only see the tables that start with prod_in the prod database. So the following is what the QA user’s policy looks like.

The query for the prod database is as follows:

Only GetTables and GetPartitions are available for the tables starting with prod_.

Notice the “prod_*” in the resource definition following.

Querying based on the different users

Logging in as the two different IAM users created by the AWS CloudFormation output tab and the password you provided, you can see some differences.

Notice that the QA user can’t see any metadata definitions for the blog_dev database, or the staging_yellow table in the blog_prod database.

Next, sign in as blog_dev_user and go to the Athena console. Notice that the blog user only sees the databases and tables listed that this user is permitted to.

The dev user can create a table under blog_dev, but not the blog_prod database.

Now let’s look under dev_qa_user. Notice that we only see the blog_prod and prod_* tables in Athena.

The QA user can query the datasets that user can see, but the policy doesn’t let that user create a database or tables.

If the QA user tries to query through Athena, and manually pull the metadata outside the console, that user can’t see any of the information. You can test this by running the following.

select * from blog_dev.yellow limit 10;

Conclusion

Data cataloging is an important part of many analytical systems. The AWS Glue Data Catalog provides integration with a wide number of tools. Using the Data Catalog, you also can specify a policy that grants permissions to objects in the Data Catalog. Data lakes require detailed access control at both the content level and the level of the metadata describing the content. In this example, we show how you can define the access policies for the metadata in the catalog.


Additional Reading

Learn how to harmonize, search, and analyze loosely coupled datasets on AWS.

 


About the Author

Ben Snively is a Public Sector Specialist Solutions Architect. He works with government, non-profit and education customers on big data and analytical projects, helping them build solutions using AWS. In his spare time, he adds IoT sensors throughout his house and runs analytics on it.

 

 

 

Store, Protect, Optimize Your Healthcare Data with AWS: Part 2

Post Syndicated from Stephen Jepsen original https://aws.amazon.com/blogs/architecture/store-protect-optimize-your-healthcare-data-with-aws-part-2/

Leveraging Analytics and Machine Learning Tools for Readmissions Prediction

This blog post was co-authored by Ujjwal Ratan, a senior AI/ML solutions architect on the global life sciences team.

In Part 1, we looked at various options to ingest and store sensitive healthcare data using AWS. The post described our shared responsibility model and provided a reference architecture that healthcare organizations could use as a foundation to build a robust platform on AWS to store and protect their sensitive data, including protected health information (PHI). In Part 2, we will dive deeper into how customers can optimize their healthcare datasets for analytics and machine learning (ML) to address clinical and operational challenges.

There are a number of factors creating pressures for healthcare organizations, both providers and payers, to adopt analytic tools to better understand their data: regulatory requirements, changing reimbursement models from volume- to value-based care, population health management for risk-bearing organizations, and movement toward personalized medicine. As organizations deploy new solutions to address these areas, the availability of large and complex datasets from electronic health records, genomics, images (for example, CAT, PET, MRI, ultrasound, X-ray), and IoT has been increasing. With these data assets growing in size, healthcare organizations want to leverage analytic and ML tools to derive new actionable insights across their departments.

One example of the use of ML in healthcare is diagnostic image analysis, including digital pathology. Pathology is extremely important in diagnosing and treating patients, but it is also extremely time-consuming and largely a manual process. While the complexity and quantity of workloads are increasing, the number of pathologists is decreasing. According to one study, the number of active pathologists could drop by 30 percent by 2030 compared to 2010 levels. (1) A cloud architecture and solution can automate part of the workflow, including sample management, analysis, storing, sharing, and comparison with previous samples to complement existing provider workflows effectively. A recent study using deep learning to analyze metastatic breast cancer tissue samples resulted in an approximately 85% reduction in human error rate. (2)

ML is also being used to assist radiologists in examining other diagnostic images such as X-rays, MRIs, and CAT scans. Having large quantities of images and metadata to train the algorithms that are the key to ML is one of the main challenges for ML adoption. To help address this problem, the National Institutes of Health recently released 90,000 X-ray plates tagged either with one of 14 diseases or tagged as being normal. Leading academic medical centers are using these images to build their neural networks and train their algorithms. With advanced analytics and ML, we can answer the hard questions such as “what is the next best action for my patient, the expected outcome, and the cost.”

The foundations for a great analytical layer

Let’s pick up from where we left off in Part 1. We have seen how providers can ingest data into AWS from their data centers and store it securely into different services depending on the type of data. For example:

  1. All object data is stored in Amazon S3, Amazon S3 Infrequent Access, or Amazon Glacier depending on how often they are used.
  2. Data from the provider’s database is either processed and stored as objects in Amazon S3 or aggregated into data marts on Amazon Redshift.
  3. Metadata of the objects on Amazon S3 are maintained in the DynamoDB database.
  4. Amazon Athena is used to query the objects directly stored on Amazon S3 to address ad hoc requirements.

We will now look at two best practices that are key to building a robust analytical layer using these datasets.

  1. Separating storage and compute: You should not be compelled to scale compute resources just to store more data. The scaling rules of the two layers should be separate.
  2. Leverage the vast array of AWS big data services when it comes to building the analytical platforms instead of concentrating on just a few of them. Remember, one size does not fit all.

Technical overview

In this overview, we will demonstrate how we can leverage AWS big data and ML services to build a scalable analytical layer for our healthcare data. We will use a single source of data stored in Amazon S3 for performing ad hoc analysis using Amazon Athena, integrate it with a data warehouse on Amazon Redshift, build a visual dashboard for some metrics using Amazon QuickSight, and finally build a ML model to predict readmissions using Amazon SageMaker. By not moving the data around and just connecting to it using different services, we avoid building redundant copies of the same data. There are multiple advantages to this approach:

  1. We optimize our storage. Not having redundant copies reduces the amount of storage required.
  2. We keep the data secure with only authorized services having access to it. Keeping multiple copies of the data can result in higher security risk.
  3. We are able to scale the storage and compute separately as needed.
  4. It becomes easier to manage the data and monitor usage metrics centrally such as how often the data has been accessed, who has been accessing it, and what has been the growth pattern of the data over a period of time. These metrics can be difficult to aggregate if the data is duplicated multiple times.

Let’s build out this architecture using the following steps:

  1. Create a database in AWS Glue Data Catalog

We will do this using a Glue crawler. First create a JSON file that contains the parameters for the Glue crawler.

{
"Name": "readmissions",
"Role": "arn of the role for Glue",
"DatabaseName": "readmissions",
"Description": "glue data catalog for storing readmission data",
"Targets": {
"S3Targets": [
{
"Path": "s3://<bucket>/<prefix>"
},
{
"Path": "s3://<bucket>/<prefix>"
}
]
}
}

As you can see, the crawler will crawl two locations in Amazon S3 and save the resulting tables in a new database called “readmissions.” Replace the role ARN and Amazon S3 locations with your corresponding details. Save this in a file create_crawler.json. Then from the AWS CLI, call the following command to create the crawler:

aws glue create-crawler --cli-input-json file://create_crawler.json

Once the crawler is created, run it by calling the following command:

aws glue start-crawler --name readmissions

Log on to the AWS Glue console, navigate to the crawlers, and wait until the crawler completes running.

This will create two tables — phi and non-phi — in a database named “readmissions” in the AWS Glue Data Catalog as shown below.

  1. Query the data using Athena

The Amazon Glue Data Catalog is seamlessly integrated with Amazon Athena. For details on how to enable this, see Integration with AWS Glue.

As a result of this integration, the tables created using the Glue crawler can now be queried using Amazon Athena. Amazon Athena allows you to do ad hoc analysis on the dataset. You can do exploratory analysis on the data and also determine its structure and quality. This type of upfront ad hoc analysis is invaluable for ensuring the data quality in your downstream data warehouse or your ML algorithms that will make use of this data for training models. In the next few sections, we will explore these aspects in greater detail.

To query the data using Amazon Athena, navigate to the Amazon Athena console.

NOTE: Make sure the region is the same as the region you chose in the previous step. If it’s not the same, switch the region by using the drop-down menu on the top right-hand corner of the screen.

Once you arrive in the Amazon Athena console, you should already see the tables and databases you created previously, and you should be able to see the data in the two tables by writing Amazon Athena queries. Here is a list of the top 10 rows from the table readmissions.nonphi:

Now that we are able to query the dataset, we can run some queries for exploratory analysis. Here are just a few examples:

AnalysisAmazon Athena Query
How many Patients have been discharged to home?SELECT count(*) from nonphi where discharge_disposition = ‘Discharged to home’
What’s the minimum and the maximum number of procedures carried out on a patient?SELECT min(num_procedures), max(num_procedures) from nonphi
How many patients were referred to this hospital by another physician?SELECT count(*) FROM nonphi group by admission_source having admission_source = ‘Physician Referral’
What were the top 5 specialties with positive readmissions?

SELECT count(readmission_result) as num_readmissions, medical_specialty from

(select readmission_result,medical_specialty from nonphi where readmission_result = ‘Yes’)

group by medical_specialty order by num_readmissions desc limit 5

Which payer was responsible for paying for treatments that involved more than 5 procedures?SELECT distinct payer_code from nonphi where num_procedures >5 and payer_code !='(null)’

While this information is valuable, you typically do not want to invest too much time and effort into building an ad hoc query platform like this because at this stage, you are not even sure if the data is of any value for your business-critical analytical applications. One benefit of using Amazon Athena for ad hoc analysis is that it requires little effort or time. It uses Schema-On-Read instead of schema on write, allowing you to work with various source data formats without worrying about the underlying structures. You can put the data on Amazon S3 and start querying immediately.

  1. Create an external table in Amazon Redshift Spectrum with the same data

Now that we are satisfied with the data quality and understand the structure of the data, we would like to integrate this with a data warehouse. We’ll use Amazon Redshift Spectrum to create external tables on the files in S3 and then integrate these external tables with a physical table in Amazon Redshift.

Amazon Redshift Spectrum allows you to run Amazon Redshift SQL queries against data on Amazon S3, extending the capabilities of your data warehouse beyond the physical Amazon Redshift clusters. You don’t need to do any elaborate ETL or move the data around. The data exists in one place in Amazon S3 and you interface with it using different services (Athena and Redshift Spectrum) to satisfy different requirements.

Before beginning, please look at this step by step guide to set up Redshift Spectrum.

After you have set up Amazon Redshift Spectrum, you can begin executing the steps below:

  1. Create an external schema called “readmissions.” Amazon Redshift Spectrum integrates with the Amazon Glue Data Catalog and allows you to create spectrum tables by referring the catalog. This feature allows you to build the external table on the same data that you analyzed with Amazon Athena in the previous step without the need for ETL. This can be achieved by the following:
create external schema readmissions
from data catalog
database 'readmissions'
iam_role 'arn for your redshift spectrum role '
region ‘region when the S3 data exists’;

NOTE: Make sure you select the appropriate role arn and region.

  1. Once the command executes successfully, you can confirm the schema was created by running the following:
select * from svv_external_schemas;

You should see a row similar to the one above with your corresponding region and role.

You can also see the external tables that were created by running the following command:

select * from SVV_EXTERNAL_TABLES;

  1. Let’s confirm we can see all the rows in the external table by counting the number of rows:
select count(*) from readmissions.phi;
select count(*) from readmissions.nonphi;

You should see 101,766 rows in both the tables, confirming that your external tables contain all the records that you read using the AWS Glue data crawler and analyzed using Athena.

  1. Now that we have all the external tables created, let’s create an aggregate fact table in the physical Redshift data warehouse. We can use the “As Select” clause of the Redshift create table query to do this:
create table readmissions_aggregate_fact as
select
readmission_result,admission_type,discharge_disposition,diabetesmed,
avg(time_in_hospital) as avg_time_in_hospital,
min(num_procedures) as min_procedures,
max(num_procedures) as max_procedures,
avg(num_procedures) as avg_num_procedures,
avg(num_medications) as avg_num_medications,
avg(number_outpatient) as avg_number_outpatient,
avg(number_emergency) as avg_number_emergency,
avg(number_inpatient) as avg_number_inpatient,
avg(number_diagnoses) as avg_number_diagnoses
from readmissions.nonphi
group by readmission_result,admission_type,discharge_disposition,diabetesmed

Once this query executes successfully, you can see a new table created in the physical public schema of your Amazon Redshift cluster. You can confirm this by executing the following query:

select distinct(tablename) from pg_table_def where schemaname = 'public'

  1. Build a QuickSight Dashboard from the aggregate fact

We can now create dashboards to visualize the data in our readmissions aggregate fact table using Amazon QuickSight. Here are some examples of reports you can generate using Amazon QuickSight on the readmission data.

For more details on Amazon QuickSight, refer to the service documentation.

  1. Build a ML model in Amazon SageMaker to predict readmissions

As a final step, we will create a ML model to predict the attribute readmission_result, which denotes if a patient was readmitted or not, using the non-PHI dataset.

  1. Create a notebook instance in Amazon SageMaker that is used to develop our code.
  2. The code reads non-PHI data from the Amazon S3 bucket as a data frame in Python. This is achieved using the pandas.readcsv function.

  1. Use the pandas.get_dummies function to encode categorical values into numeric values for use with the model.

  1. Split the data into two, 80% for training and 20% for testing, using the numpy.random.rand function.

  1. Form train_X, train_y and test_X, test_y corresponding to training features, training labels, testing features, and testing labels respectively.

  1. Use the Amazon SageMaker Linear learner algorithm to train our model. The implementation of the algorithm uses dense tensor format to optimize the training job. Use the function write_numpy_to_dense_tensor from the Amazon SageMaker library to convert the numpy array into the dense tensor format.

  1. Create the training job in Amazon SageMaker with appropriate configurations and run it.

  1. Once the training job completes, create an endpoint in Amazon SageMaker to host our model, using the linear.deploy function to deploy the endpoint.

  1. Finally, run a prediction by invoking the endpoint using the linear_predictor.predict function.

You can view the complete notebook here.

Data, analytics, and ML are strategic assets to help you manage your patients, staff, equipment, and supplies more efficiently. These technologies can also help you be more proactive in treating and preventing disease. Industry luminaries share this opinion: “By leveraging big data and scientific advancements while maintaining the important doctor-patient bond, we believe we can create a health system that will go beyond curing disease after the fact to preventing disease before it strikes by focusing on health and wellness,” writes Lloyd B. Minor, MD, dean of the Stanford School of Medicine.

ML and analytics offer huge value in helping achieve the quadruple aim : improved patient satisfaction, improved population health, improved provider satisfaction, and reduced costs. Technology should never replace the clinician but instead become an extension of the clinician and allow them to be more efficient by removing some of the mundane, repetitive tasks involved in prevention, diagnostics, and treatment of patients.

(1) “The Digital Future of Pathology.” The Medical Futurist, 28 May 2018, medicalfuturist.com/digital-future-pathology.

(2) Wang, Dayong, et al. “Deep Learning for Identifying Metastatic Breast Cancer.” Deep Learning for Identifying Metastatic Breast Cancer, 18 June 2016, arxiv.org/abs/1606.05718.

About the Author

Stephen Jepsen is a Global HCLS Practice Manager in AWS Professional Services.

 

How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes – Part 2

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/how-to-build-a-front-line-concussion-monitoring-system-using-aws-iot-and-serverless-data-lakes-part-2/

In part 1 of this series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Amazon Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and AWS Lambda. In part 2, we discuss how to process and visualize the data by creating a serverless data lake that uses key analytics to create actionable data.

Create a serverless data lake and explore data using AWS Glue, Amazon Athena, and Amazon QuickSight

As we discussed in part 1, you can store heart rate data in an Amazon S3 bucket using Kinesis Data Streams. However, storing data in a repository is not enough. You also need to be able to catalog and store the associated metadata related to your repository so that you can extract the meaningful pieces for analytics.

For a serverless data lake, you can use AWS Glue, which is a fully managed data catalog and ETL (extract, transform, and load) service. AWS Glue simplifies and automates the difficult and time-consuming tasks of data discovery, conversion, and job scheduling. As you get your AWS Glue Data Catalog data partitioned and compressed for optimal performance, you can use Amazon Athena for the direct query to S3 data. You can then visualize the data using Amazon QuickSight.

The following diagram depicts the data lake that is created in this demonstration:

Amazon S3 now has the raw data stored from the Kinesis process. The first task is to prepare the Data Catalog and identify what data attributes are available to query and analyze. To do this task, you need to create a database in AWS Glue that will hold the table created by the AWS Glue crawler.

An AWS Glue crawler scans through the raw data available in an S3 bucket and creates a data table with a Data Catalog. You can add a scheduler to the crawler to run periodically and scan new data as required. For specific steps to create a database and crawler in AWS Glue, see the blog post Build a Data Lake Foundation with AWS Glue and Amazon S3.

The following figure shows the summary screen for a crawler configuration in AWS Glue:

After configuring the crawler, choose Finish, and then choose Crawler in the navigation bar. Select the crawler that you created, and choose Run crawler.

The crawler process can take 20–60 seconds to initiate. It depends on the Data Catalog, and it creates a table in your database as defined during the crawler configuration.

You can choose the table name and explore the Data Catalog and table:

In the demonstration table details, our data has three attribute time stamps as value_time, the person’s ID as id, and the heart rate as colvalue. These attributes are identified and listed by the AWS Glue crawler. You can see other information such as the data format (text) and the record count (approx. 15,000 with each record size of 61 bytes).

You can use Athena to query the raw data. To access Athena directly from the AWS Glue console, choose the table, and then choose View data on the Actions menu, as shown following:

As noted, the data is currently in a JSON format and we haven’t partitioned it. This means that Athena continues to scan more data, which increases the query cost. The best practice is to always partition data and to convert the data into a columnar format like Apache Parquet or Apache ORC. This reduces the amount of data scans while running a query. Having fewer data scans means better query performance at a lower cost.

To accomplish this, AWS Glue generates an ETL script for you. You can schedule it to run periodically for your data processing, which removes the necessity for complex code writing. AWS Glue is a managed service that runs on top of a warm Apache Spark cluster that is managed by AWS. You can run your own script in AWS Glue or modify a script provided by AWS Glue that meets your requirements. For examples of how to build a custom script for your solution, see Providing Your Own Custom Scripts in the AWS Glue Developer Guide.

For detailed steps to create a job, see the blog post Build a Data Lake Foundation with AWS Glue and Amazon S3. The following figure shows the final AWS Glue job configuration summary for this demonstration:

In this example configuration, we enabled the job bookmark, which helps AWS Glue maintain state information and prevents the reprocessing of old data. You only want to process new data when rerunning on a scheduled interval.

When you choose Finish, AWS Glue generates a Python script. This script processes your data and stores it in a columnar format in the destination S3 bucket specified in the job configuration.

If you choose Run Job, it takes time to complete depending on the amount of data and data processing units (DPUs) configured. By default, a job is configured with 10 DPUs, which can be increased. A single DPU provides processing capacity that consists of 4 vCPUs of compute and 16 GB of memory.

After the job is complete, inspect your destination S3 bucket, and you will find that your data is now in columnar Parquet format.

Partitioning has emerged as an important technique for organizing datasets so that they can be queried efficiently by a variety of big data systems. Data is organized in a hierarchical directory structure based on the distinct values of one or more columns. For information about efficiently processing partitioned datasets using AWS Glue, see the blog post Work with partitioned data in AWS Glue.

You can create triggers for your job that run the job periodically to process new data as it is transmitted to your S3 bucket. For detailed steps on how to configure a job trigger, see Triggering Jobs in AWS Glue.

The next step is to create a crawler for the Parquet data so that a table can be created. The following image shows the configuration for our Parquet crawler:

Choose Finish, and execute the crawler.

Explore your database, and you will notice that one more table was created in the Parquet format.

You can use this new table for direct queries to reduce costs and to increase the query performance of this demonstration.

Because AWS Glue is integrated with Athena, you will find in the Athena console an AWS Glue catalog already available with the table catalog. Fetch 10 rows from Athena in a new Parquet table like you did for the JSON data table in the previous steps.

As the following image shows, we fetched the first 10 rows of heartbeat data from a Parquet format table. This same Athena query scanned only 4.99 KB of data compared to 205 KB of data that was scanned in a raw format. Also, there was a significant improvement in query performance in terms of run time.

Visualize data in Amazon QuickSight

Amazon QuickSight is a data visualization service that you can use to analyze data that has been combined. For more detailed instructions, see the Amazon QuickSight User Guide.

The first step in Amazon QuickSight is to create a new Amazon Athena data source. Choose the heartbeat database created in AWS Glue, and then choose the table that was created by the AWS Glue crawler.

Choose Import to SPICE for quicker analytics. This option creates a data cache and improves graph loading. All non-database datasets must use SPICE. To learn more about SPICE, see Managing SPICE Capacity.

Choose Visualize, and wait for SPICE to import the data to the cache. You can also schedule a periodic refresh so that new data is loaded to SPICE as the data is pipelined to the S3 bucket.

When the SPICE import is complete, you can create a visual dashboard easily. The following figure shows graphs displaying the occurrence of heart rate records per device.  The first graph is a horizontally stacked bar chart, which shows the percentage of heart rate occurrence per device. In the second graph, you can visualize the heart rate count group to the heart rate device.

Conclusion

Processing streaming data at scale is relevant in every industry. Whether you process data from wearables to tackle human health issues or address predictive maintenance in manufacturing centers, AWS can help you simplify your data ingestion and analysis while keeping your overall IT expenditure manageable.

In this two-part series, you learned how to ingest streaming data from a heart rate sensor and visualize it in such a way to create actionable insights. The current state of the art available in the big data and machine learning space makes it possible to ingest terabytes and petabytes of data and extract useful and actionable information from that process.


Additional Reading

If you found this post useful, be sure to check out Work with partitioned data in AWS Glue, and 10 visualizations to try in Amazon QuickSight with sample data.

 


About the Authors

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them architectural guidance for building scalable architecture in hybrid and AWS environments.

 

 

 

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. During his free time, he likes to cook and travel.

 

 

 

John Cupit is a partner solutions architect for AWS’ Global Telecom Alliance Team. His passion is leveraging the cloud to transform the carrier industry. He has a son and daughter who have both graduated from college. His daughter is gainfully employed, while his son is in his first year of law school at Tulane University. As such, he has no spare money and no spare time to work a second job.

 

 

David Cowden is partner solutions architect and IoT specialist working with AWS emerging partners. He works with customers to provide them architectural guidance for building scalable architecture in IoT space.

 

 

 

Josh Ragsdale is an enterprise solutions architect at AWS. His focus is on adapting to a cloud operating model at very large scale. He enjoys cycling and spending time with his family outdoors.

 

 

 

Pierre-Yves Aquilanti, Ph.D., is a senior specialized HPC solutions architect at AWS. He spent several years in the oil & gas industry to optimize R&D applications for large scale HPC systems and enable the potential of machine learning for the upstream. He and his family crave to live in Singapore again for the human, cultural experience and eat fresh durians.

 

 

Manuel Puron is an enterprise solutions architect at AWS. He has been working in cloud security and IT service management for over 10 years. He is focused on the telecommunications industry. He enjoys video games and traveling to new destinations to discover new cultures.

 

How to build a front-line concussion monitoring system using AWS IoT and serverless data lakes – Part 1

Post Syndicated from Saurabh Shrivastava original https://aws.amazon.com/blogs/big-data/how-to-build-a-front-line-concussion-monitoring-system-using-aws-iot-and-serverless-data-lakes-part-1/

Sports-related minor traumatic brain injuries (mTBI) continue to incite concern among different groups in the medical, sports, and parenting community. At the recreational level, approximately 1.6–3.8 million related mTBI incidents occur in the United States every year, and in most cases, are not treated at the hospital. (See “The epidemiology and impact of traumatic brain injury: a brief overview” in Additional resources.) The estimated medical and indirect costs of minor traumatic brain injury are reaching $60 billion annually.

Although emergency facilities in North America collect data on admitted traumatic brain injuries (TBI) cases, there isn’t meaningful data on the number of unreported mTBIs among athletes. Recent studies indicate a significant rate of under-reporting of sports-related mTBI due to many factors. These factors include the simple inability of team staff to either recognize the signs and symptoms or to actually witness the impact. (See “A prospective study of physician-observed concussions during junior ice hockey: implications for incidence rates” in Additional resources.)

The majority of players involved in hockey and football are not college or professional athletes. There are over 3 million youth hockey players and approximately 5 million registered participants in football. (See “Head Impact Exposure in Youth Football” in Additional resources.) These recreational athletes don’t have basic access to medical staff trained in concussion recognition and sideline injury assessment. A user-friendly measurement and a smartphone-based assessment tool would facilitate the process between identifying potential head injuries, assessment, and return to play (RTP) criteria.

Recently, the use of instrumented sports helmets, including the Head Impact Telemetry System (HITS), has allowed for detailed recording of impacts to the head in many research trials. This practice has led to recommendations to alter contact in practices and certain helmet design parameters. (See “Head impact severity measures for evaluating mild traumatic brain injury risk exposure” in Additional resources.) However, due to the higher costs of the HITS system and complexity of the equipment, it is not a practical impact alert device for the general recreational population.

A simple, practical, and affordable system for measuring head trauma within the sports environment, subject to the absence of trained medical personnel, is required.

Given the proliferation of smartphones, we felt that this was a practical device to investigate to provide this type of monitoring.  All smartphone devices have an embedded Bluetooth communication system to receive and transmit data at various ranges.  For the purposes of this demonstration, we chose a class 1 Bluetooth device as the hardware communication method. We chose it because of its simplicity, widely accepted standard, and compatibility to interface with existing smartphones and IoT devices.

Remote monitoring typically involves collecting information from devices (for example, wearables) at the edge, integrating that information into a data lake, and generating inferences that can then be served back to the relevant stakeholders. Additionally, in some cases, compute and inference must also be done at the edge to shorten the feedback loop between data collection and response.

This use case can be extended to many other use cases in myriad verticals. In this two-part series, we show you how to build a data pipeline in support of a data lake. We use key AWS services such as Amazon Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and AWS Lambda. In part 2, we focus on generating simple inferences from that data that can support RTP parameters.

Architectural overview

Here is the AWS architecture that we cover in this two-part series:

Note: For the purposes of our demonstration, we chose to use heart rate monitoring sensors rather than helmet sensors because they are significantly easier to acquire. Both types of sensors are very similar in how they transmit data. They are also very similar in terms of how they are integrated into a data lake solution.

The resulting demonstration transfers the heartbeat data using the following components:

  • AWS Greengrass set up with a Raspberry Pi 3 to stream heart rate data into the cloud.
  • Data is ingested via Amazon Kinesis Data Streams, and raw data is stored in an Amazon S3 bucket using Kinesis Data Firehose. Find more details about writing to Kinesis Data Firehose using Kinesis Data Streams.
  • Kinesis Data Analytics averages out the heartbeat-per-minute data during stream data ingestion and passes the average to an AWS Lambda
  • AWS Lambda enriches the heartbeat data by comparing the real-time data with baseline information stored in Amazon DynamoDB.
  • AWS Lambda sends SMS/email alerts via an Amazon SNS topic if the heartbeat rate is greater than 120 BPM, for example.
  • AWS Glue runs an extract, transform, and load (ETL) job. This job transforms the data store in a JSON format to a compressed Apache Parquet columnar format and applies that transformed partition for faster query processing. AWS Glue is a fully managed ETL service for crawling data stored in an Amazon S3 bucket and building a metadata catalog.
  • Amazon Athena is used for ad hoc query analysis on the data that is processed by AWS Glue. This data is also available for machine learning processing using predictive analysis to reduce heart disease risk.
  • Amazon QuickSight is a fully managed visualization tool. It uses Amazon Athena as a data source and depicts visual line and pie charts to show the heart rate data in a visual dashboard.

All data pipelines are serverless and are refreshed periodically to provide up-to-date data.

You can use Kinesis Data Firehose to transform the data in the pipeline to a compressed Parquet format without needing to use AWS Glue. For the purposes of this post, we are using AWS Glue to highlight its capabilities, including a centralized AWS Glue Data Catalog. This Data Catalog can be used by Athena for ad hoc queries and by Apache Spark EMR to run complex machine learning processes. AWS Glue also lets you edit generated ETL scripts and supports “bring your own ETL” to process data for more complex use cases.

Configuring key processes to support the pipeline

The following sections describe how to set up and configure the devices and services used in the demonstration to build a data pipeline in support of a data lake.

Remote sensors and IoT devices

You can use commercially available heart rate monitors to collect electrocardiography (ECG) information such as heart rate. The monitor is strapped around the chest area with the sensor placed over the sternum for better accuracy. The monitor measures the heart rate and sends the data over Bluetooth Low Energy (BLE) to a Raspberry Pi 3. The following figure depicts the device-side architecture for our demonstration.

The Raspberry Pi 3 is host to both the IoT device and the AWS Greengrass core. The IoT device is responsible for connecting to the heart rate monitor over BLE and collecting the heart rate data. The collected data is then sent locally to the AWS Greengrass core, where it can be processed and routed to the cloud through a secure connection. The AWS Greengrass core serves as the “edge” gateway for the heart rate monitor.

Set up AWS Greengrass core software on Raspberry Pi 3

To prepare your Raspberry Pi for running AWS Greengrass software, follow the instructions in Environment Setup for Greengrass in the AWS Greengrass Developer Guide.

After setting up your Raspberry Pi, you are ready to install AWS Greengrass and create your first Greengrass group. Create a Greengrass group by following the steps in Configure AWS Greengrass on AWS IoT. Then install the appropriate certificates to the Raspberry Pi by following the steps to start AWS Greengrass on a core device.

The preceding steps deploy a Greengrass group that consists of three discrete configurable items: a device, a subscription list, and the connectivity information.

The core device is a set of code that is responsible for collecting the heart rate information from the sensor and sending it to the AWS Greengrass core. This device is using the AWS IoT Device SDK for Python including the Greengrass Discovery API.

Use the following AWS CLI command to create a Greengrass group:

aws greengrass create-group --name heartRateGroup

To complete the setup, follow the steps in Create AWS IoT Devices in an AWS Greengrass Group.

After you complete the setup, the heart rate data is routed from the device to the AWS IoT Core service using AWS Greengrass. As such, you need to add a single subscription in the Greengrass group to facilitate this message route:

Here, your device is named Heartrate_Sensor, and the target is the IoT Cloud on the topic iot/heartrate. That means that when your device publishes to the iot/heartrate topic, AWS Greengrass also sends this message to the AWS IoT Core service on the same topic. Then you can use the breadth of AWS services to process the data.

The connectivity information is configured to use the local host because the IoT device resides on the Raspberry Pi 3 along with the AWS Greengrass core software. The IoT device uses the Discovery API, which is responsible for retrieving the connectivity information of the AWS Greengrass core that the IoT device is associated with.

The IoT device then uses the endpoint and port information to open a secure TLS connection to AWS Greengrass core, where the heart rate data is sent. The AWS Greengrass core connectivity information should be depicted as follows:

The power of AWS Greengrass core is that you can deploy AWS Lambda functions and new subscriptions to process the heart rate information locally on the Raspberry Pi 3. For example, you can deploy an AWS Lambda function that can trigger a reaction if the detected heart rate is reaching a set threshold. In this scenario, different individuals might require different thresholds and responses, so you could theoretically deploy unique Lambda functions on a per-individual basis if needed.

Configure AWS Greengrass and AWS IoT Core

To enable further processing and storage of the heart rate data messages published from AWS Greengrass core to AWS IoT Core, create an AWS IoT rule. The AWS IoT rule retrieves messages published to the IoT/heartrate topic and sends them to the Kinesis data stream through an AWS IoT rule action for Kinesis action.  

Simulate heart rate data

You might not have access to an IoT device, but you still want to run a proof of concept (PoC) around heart rate use cases. You can simulate data by creating a shell script and deploying that data simulation script on an Amazon EC2 instance. Refer to the EC2 user guide to get started with Amazon EC2 Linux instances.

On the Amazon EC2 instance, create a shell script kinesis_client_HeartRate.sh, and copy the provided code to start writing some records into the Kinesis data stream. Be sure to create your Kinesis data stream and replace the variable <your_stream_name> in the following script.

#!/bin/sh
while true
do
  deviceID=$(( ( RANDOM % 10 )  + 1 ))
  heartRate=$(jot -r 1 60 140)
  echo "$deviceID,$heartRate"
  aws kinesis put-record --stream-name <your_stream_name> --data "$deviceID,$heartRate"$'\n' --partition-key $deviceID --region us-east-1
done

You can also use the Kinesis Data Generator to create data and then stream it to your solution or demonstration. For details on its use, see the blog post Test Your Streaming Data Solution with the New Amazon Kinesis Data Generator.

Ingest data using Kinesis and manage alerts with Lambda, DynamoDB, and Amazon SNS

Now you need to ingest data from the IoT device, which can be processed for real-time notifications when abnormal heart rates are detected.

Streaming data from the heart rate monitoring device is ingested to Kinesis Data Streams. Amazon Kinesis makes it easy to collect, process, and analyze real-time, streaming data. For this project, the data stream was configured with one open shard and a data retention period of 24 hours. This lets you send 1 MB of data or 1,000 events per second and read 2 MB of data per second. If you need to support more devices, you can scale up and add more shards using the UpdateShardCount API or the Amazon Kinesis scaling utility.

You can configure your data stream by using the following AWS CLI command (and then using the appropriate flag to turn on encryption).

aws kinesis create-stream --stream-name hearrate_stream --shard-count 1

You can use an AWS CloudFormation template to create the entire stack depicted in the following architecture diagram.

When launching an AWS CloudFormation template, be sure to enter your email address or mobile phone number with the appropriate endpoint protocol (“Email” or “SMS”) as parameters:

Alternatively, you can follow the manual steps in the documentation links that are provided in this post.

Streaming data in Kinesis can be processed and analyzed in real time by Kinesis clients. Refer to the Kinesis Data Streams Developer Guide to learn how to create a Kinesis data stream.

To identify abnormal heart rate information, you must use real-time analytics to detect abnormal behavior. You can use Kinesis Data Analytics to perform analytics on streaming data in real time. Kinesis Data Analytics consists of three configurable components: source, real-time analytics, and destination. Refer to the AWS documentation to learn the detailed steps to configure Kinesis Data Analytics.

Kinesis Data Analytics uses Kinesis Data Streams as the source stream for the data. In the source configuration process, if there are scenarios where in-filtering or masking records is required, you can preprocess records using AWS Lambda. The data in this particular case is relatively simple, so you don’t need preprocessing of records on the data.

The Kinesis Data Analytics schema editor lets you edit and transform the schema if required. In the following example, we transformed the second column to Value instead of COL_Value.

The SQL code to perform the real-time analysis of the data has to be copied to the SQL Editor for real-time analytics. The following is the sample code that was used for this demonstration.

“CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                                   VALUEROWTIME TIMESTAMP,
                                   ID INTEGER, 
                                   COLVALUE INTEGER);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
  INSERT INTO "DESTINATION_SQL_STREAM" 
SELECT STREAM ROWTIME,
              ID,
              AVG("Value") AS HEARTRATE
FROM     "SOURCE_SQL_STREAM_001"
GROUP BY ID, 
         STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) HAVING AVG("Value") > 120 OR AVG("Value") < 40;”

This code generates DESTINATION_SQL_STREAM. It inserts values into the stream only when the average value of the heart beat that is received from SOURCE_SQL_STREAM_001 is greater than 120 or less than 40 in the 60-second time window.

For more information about the tumbling window concept, see Tumbling Windows (Aggregations Using GROUP BY).

Next, add an AWS Lambda function as one of your destinations, and configure it as follows:

In the destination editor, make sure that the stream name selected is the DESTINATION_SQL_STREAM. You only want to trigger the Lambda function when anomalies in the heart rate are detected. The output format can be JSON or CSV. In this example, our Lambda function expects the data in JSON format, so we chose JSON.

Athlete and athletic trainer registration information is stored in the heartrate Registrations DynamoDB table. Amazon DynamoDB offers fully managed encryption at rest using an AWS Key Management Service (AWS KMS) managed encryption key for DynamoDB. You need to create a table with encryption at rest enabled. Follow the detailed steps in Amazon DynamoDB Encryption at Rest.

Each record in the table should include deviceid, customerid, firstname, lastname, and mobile. The following is an example table record for reference.

{
  "customerid": {
    "S": "3"
  },
  "deviceid": {
    "S": "7"
  },
  "email": {
    "S": "[email protected]"
  },
  "firstname": {
    "S": "John"
  },
  "lastname": {
    "S": "Smith"
  },
  "mobile": {
    "S": "19999999999"
  }
}

Refer to the DynamoDB Developer Guide for complete instructions for creating and populating a DynamoDB table.

The Lambda function is created to process the record passed from the Kinesis Data Analytics application.  The node.js Lambda function retrieves the athlete and athletic trainer information from the DynamoDB registrations table. It then alerts the athletic trainer to the event by sending a cellular text message via the Amazon Simple Notification Service (Amazon SNS).

Note: The default AWS account limit for Amazon SNS for mobile messages is $1.00 per month. You can increase this limit through an SNS Limit Increase case as described in AWS Service Limits.

You now create a new Lambda function with a runtime of Node.js 6.10 and choose the Create a custom role option for IAM permissions.  If you are new to deploying Lambda functions, see Create a Simple Lambda Function.

You must configure the new Lambda function with a specific IAM role, providing privileges to Amazon CloudWatch Logs, Amazon DynamoDB, and Amazon SNS as provided in the supplied AWS CloudFormation template.

The provided AWS Lambda function retrieves the HR Monitor Device ID and HR Average from the base64-encoded JSON message that is passed from Kinesis Data Analytics.  After retrieving the HR Monitor Device ID, the function then queries the DynamoDB Athlete registration table to retrieve the athlete and athletic trainer information.

Finally, the AWS Lambda function sends a mobile text notification (which does not contain any sensitive information) to the athletic trainer’s mobile number retrieved from the athlete data by using the Amazon SNS service.

To store the streaming data to an S3 bucket for further analysis and visualization using other tools, you can use Kinesis Data Firehose to connect the pipeline to Amazon S3 storage.  To learn more, see Create a Kinesis Data Firehose Delivery Stream.

Kinesis Data Firehose delivers the streaming data in intervals to the destination S3 bucket. The intervals can be defined using either an S3 buffer size or an S3 buffer interval (or both, whichever exceeds the first metric). The data in the Data Firehose delivery stream can be transformed. It also lets you back up the source record before applying any transformation. The data can be encrypted and compressed to GZip, Zip, or Snappy format to store the data in a columnar format like Apache Parquet and Apache ORC. This improves the query performance and reduces the storage footprint. You should enable error logging for operational and production troubleshooting.

Conclusion

In part 1 of this blog series, we demonstrated how to build a data pipeline in support of a data lake. We used key AWS services such as Kinesis Data Streams, Kinesis Data Analytics, Kinesis Data Firehose, and Lambda. In part 2, we’ll discuss how to deploy a serverless data lake and use key analytics to create actionable insights from the data lake.

Additional resources

Langlois, J.A., Rutland-Brown, W. & Wald, M., “The epidemiology and impact of traumatic brain injury: a brief overview,” Journal of Head Trauma Rehabilitation, Vol. 21, No. 5, 2006, pp. 375-378.

Echlin, S. E., Tator, C. H., Cusimano, M. D., Cantu, R. C., Taunton, J. E., Upshur E. G., Hall, C. R., Johnson, A. M., Forwell, L. A., Skopelja, E. N., “A prospective study of physician-observed concussions during junior ice hockey: implications for incidence rates,” Neurosurg Focus, 29 (5):E4, 2010

Daniel, R. W., Rowson, S., Duma, S. M., “Head Impact Exposure in Youth Football,” Annals of Biomedical Engineering., Vol. 10, 2012, 1007.

Greenwald, R. M., Gwin, J. T., Chu, J. J., Crisco, J. J., “Head impact severity measures for evaluating mild traumatic brain injury risk exposure,” Neurosurgery Vol. 62, 2008, pp. 789–79


Additional Reading

If you found this post useful, be sure to check out Setting Up Just-in-Time Provisioning with AWS IoT Core, and Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.

 


About the Authors

Saurabh Shrivastava is a partner solutions architect and big data specialist working with global systems integrators. He works with AWS partners and customers to provide them architectural guidance for building scalable architecture in hybrid and AWS environments.

 

 

 

Abhinav Krishna Vadlapatla is a Solutions Architect with Amazon Web Services. He supports startups and small businesses with their cloud adoption to build scalable and secure solutions using AWS. During his free time, he likes to cook and travel.

 

 

 

John Cupit is a partner solutions architect for AWS’ Global Telecom Alliance Team.  His passion is leveraging the cloud to transform the carrier industry.  He has a son and daughter who have both graduated from college. His daughter is gainfully employed, while his son is in his first year of law school at Tulane University.  As such, he has no spare money and no spare time to work a second job.

 

 

David Cowden is partner solutions architect and IoT specialist working with AWS emerging partners. He works with customers to provide them architectural guidance for building scalable architecture in IoT space.

 

 

 

Josh Ragsdale is an enterprise solutions architect at AWS.  His focus is on adapting to a cloud operating model at very large scale. He enjoys cycling and spending time with his family outdoors.

 

 

 

Pierre-Yves Aquilanti, Ph.D., is a senior specialized HPC solutions architect at AWS. He spent several years in the oil & gas industry to optimize R&D applications for large scale HPC systems and enable the potential of machine learning for the upstream. He and his family crave to live in Singapore again for the human, cultural experience and eat fresh durians.

 

 

Manuel Puron is an enterprise solutions architect at AWS. He has been working in cloud security and IT service management for over 10 years. He is focused on the telecommunications industry. He enjoys video games and traveling to new destinations to discover new cultures.