Tag Archives: Amazon Athena

How ironSource built a multi-purpose data lake with Upsolver, Amazon S3, and Amazon Athena

Post Syndicated from Seva Feldman original https://aws.amazon.com/blogs/big-data/how-ironsource-built-a-multi-purpose-data-lake-with-upsolver-amazon-s3-and-amazon-athena/

ironSource, in their own words, is the leading in-app monetization and video advertising platform, making free-to-play and free-to-use possible for over 1.5B people around the world. ironSource helps app developers take their apps to the next level, including the industry’s largest in-app video network. Over 80,000 apps use ironSource technologies to grow their businesses.

The massive scale in which ironSource operates across its various monetization platforms—including apps, video, and mediation—leads to millions of end-devices generating massive amounts of streaming data. They need to collect, store, and prepare data to support multiple use cases while minimizing infrastructure and engineering overheads.

This post discusses the following:

  • Why ironSource opted for a data lake architecture based on Amazon S3.
  • How ironSource built the data lake using Upsolver.
  • How to create outputs to analytic services such as Amazon Athena, Amazon ES, and Tableau.
  • The benefits of this solution.

Advantages of a data lake architecture

After working for several years in a database-focused approach, the rapid growth in ironSource’s data made their previous system unviable from a cost and maintenance perspective. Instead, they adopted a data lake architecture, storing raw event data on object storage, and creating customized output streams that power multiple applications and analytic flows.

Why ironSource chose an AWS data lake

A data lake was the right solution for ironSource for the following reasons:

  • Scale – ironSource processes 500K events per second and over 20 billion events daily. The ability to store near-infinite amounts of data in S3 without preprocessing the data is crucial.
  • Flexibility – ironSource uses data to support multiple business processes. Because they need to feed the same data into multiple services to provide for different use cases, the company needed to bypass the rigidity and schema limitations entailed by a database approach. Instead, they store all the original data on S3 and create ad-hoc outputs and transformations as needed.
  • Resilience – Because all historical data is on S3, recovery from failure is easier, and errors further down the pipeline are less likely to affect production environments.

Why ironSource chose Upsolver

Upsolver’s streaming data platform automates the coding-intensive processes associated with building and managing a cloud data lake. Upsolver enables ironSource to support a broad range of data consumers and minimize the time DevOps engineers spend on data plumbing by providing a GUI-based, self-service tool for ingesting data, preparing it for analysis, and outputting structured tables to various query services.

Key benefits include the following:

  • Self-sufficiency for data consumers – As a self-service platform, Upsolver allows BI developers, Ops, and software teams to transform data streams into tabular data without writing code.
  • Improved performance – Because Upsolver stores files in optimized Parquet storage on S3, ironSource benefits from high query performance without manual performance tuning.
  • Elastic scaling – ironSource is in hyper-growth, so needs elastic scaling to handle increases in inbound data volume and peaks throughout the week, reprocessing of events from S3, and isolation between different groups that use the data.
  • Data privacy – Because ironSource’s VPC deploys Upsolver with no access from outside, there is no risk to sensitive data.

This post shows how ironSource uses Upsolver to build, manage, and orchestrate its data lake with minimal coding and maintenance.

Solution Architecture

The following diagram shows the architecture ironSource uses:

Architecture showing Apache Kafka with an arrow pointing left to Upsolver. Upsolver contains stream ingestion, schemaless data management and stateful data processing, it has two arrows coming out the bottom, each going to S3, one for raw data, the other for parquet files. Upsolver box has an arrow pointing right to a Query Engines box, which contains Athena, Redshift and Elastic. This box has a an arrow pointing right to Use cases, which contains product analytics, campaign performance and customer dashboards.

Streaming data from Kafka to Upsolver and storing on S3

Apache Kafka streams data from ironSource’s mobile SDK at a rate of up to 500K events per second. Upsolver pulls data from Kafka and stores it in S3 within a data lake architecture. It also keeps a copy of the raw event data while making sure to write each event exactly one time, and stores the same data as Parquet files that are optimized for consumption.

Building the input stream in Upsolver:

Using the Upsolver GUI, ironSource connects directly to the relevant Kafka topics and writes them to S3 precisely one time. See the following screenshot.

Image of the Upsolver UI showing the "Data Sources" tab is open to the "Create a Kafka Data Source" page with "Mobile SDK Cluster" highlighted under the "Compute Cluster" section.

After the data is stored in S3, ironSource can proceed to operationalize the data using a wide variety of databases and analytic tools. The next steps cover the most prominent tools.

Output to Athena

To understand production issues, developers and product teams need access to data. These teams can work with the data directly and answer their own questions by using Upsolver and Athena.

Upsolver simplifies and automates the process of preparing data for consumption in Athena, including compaction, compression, partitioning, and creating and managing tables in the AWS Glue Data Catalog. ironSource’s DevOps teams save hundreds of hours on pipeline engineering. Upsolver’s GUI creates each table one time, and from that point onwards, data consumers are entirely self-sufficient. To ensure queries in Athena run fast and at minimal cost, Upsolver also enforces performance-tuning best practices as data is ingested and stored on S3. For more information, see Top 10 Performance Tuning Tips for Amazon Athena.

Athena’s serverless architecture further compliments this independence, which means there’s no infrastructure to manage and analysts don’t need DevOps to use Amazon Redshift or query clusters for each new question. Instead, analysts can indicate the data they need and get answers.

Sending tables to Athena in Upsolver

In Upsolver, you can declare tables with associated schema using SQL or the built-in GUI. You can expose these tables to Athena through the AWS Glue Data Catalog. Upsolver stores Parquet files in S3 and creates the appropriate table and partition information in the AWS Glue Data Catalog by using Create and Alter DDL statements. You can also edit these tables with Upsolver Output to add, remove, or change columns. Upsolver automates the process of recreating table data on S3 and altering the metadata in the AWS Glue Data Catalog.

Creating the table

Image of the Upsolver UI showing the "Outputs" tab is open to the "Mobile SDK Data" page.

Sending the table to Amazon Athena

Image of the Upsolver UI showing the "Run Parameters" dialog box is open, having arrived there from the "Mobile SDK Data" page noted in the previous image.

Editing the table option for Outputs

Image on the "Mobile SDK Data" page showing the drop down menu from the 3 dots in the upper left with "Edit" highlighted.

Modifying an existing table in the Upsolver Output

Image showing "Alter Existing Table" with a radio button selected, along with a blurb that states "The changes will affect the existing rable from the time specific. Any data already written after that time with be deleted. The previous output will stop once it finishes processing all the data up to the specified time." Below that is a box showing an example data and time. The other option with a radio button not selected is "Create New Table" with the blurb "A new table will be created. The existing table and output will not be affected in any way by this operation. The buttons at the bottom are "Next" and "Cancel," with "Next" selected.

Output to BI platforms

IronSource’s BI analysts use Tableau to query and visualize data using SQL. However, performing this type of analysis on streaming data may require extensive ETL and data preparation, which can limit the scope of analysis and create reporting bottlenecks.

IronSource’s cloud data lake architecture enables BI teams to work with big data in Tableau. They use Upsolver to enrich and filter data and write it to Redshift to build reporting dashboards, or send tables to Athena for ad-hoc analytic queries. Tableau connects natively to both Redshift and Athena, so analysts can query the data using regular SQL and familiar tools, rather than relying on manual ETL processes.

Creating a reduced stream for Amazon ES

Engineering teams at IronSource use Amazon ES to monitor and analyze application logs. However, as with any database, storing raw data in Amazon ES is expensive and can lead to production issues.

Because a large part of these logs are duplicates, Upsolver deduplicates the data. This reduces Amazon ES costs and improves performance. Upsolver cuts down the size of the data stored in Amazon ES by 70% by aggregating identical records. This makes it viable and cost-effective despite generating a high volume of logs.

To do this, Upsolver adds a calculated field to the event stream, which indicates whether a particular log is a duplicate. If so, it filters the log out of the stream that it sends to Amazon ES.

Creating the calculated field

Image showing the Upsolver UI with the "Outputs" tab selected, showing the "Create Calcuated Field" page.

Filtering using the calculated field

Upsolver UI showing the "Outputs" tab selected, on the "Create Filter" page.

Conclusion

Self-sufficiency is a big part of ironSource’s development ethos. In revamping its data infrastructure, the company sought to create a self-service environment for dev and BI teams to work with data, without becoming overly reliant on DevOps and data engineering. Data engineers can now focus on features rather than building and maintaining code-driven ETL flows.

ironSource successfully built an agile and versatile architecture with Upsolver and AWS data lake tools. This solution enables data consumers to work independently with data, while significantly improving data freshness, which helps power both the company’s internal decision-making and external reporting.

Some of the results in numbers include:

  • Thousands of engineering hours saved – ironSource’s DevOps and data engineers save thousands of hours that they would otherwise spend on infrastructure by replacing manual, coding-intensive processes with self-service tools and managed infrastructure.
  • Fees reduction – Factoring infrastructure, workforce, and licensing costs, Upsolver significantly reduces ironSource’s total infrastructure costs.
  • 15-minute latency from Kafka to end-user – Data consumers can respond and take action with near real-time data.
  • 9X increase in scale – Currently at 0.5M incoming events/sec and 3.5M outgoing events/sec.

“It’s important for every engineering project to generate tangible value for the business,” says Seva Feldman, Vice President of Research and Development at ironSource Mobile. “We want to minimize the time our engineering teams, including DevOps, spend on infrastructure and maximize the time spent developing features. Upsolver has saved thousands of engineering hours and significantly reduced total cost of ownership, which enables us to invest these resources in continuing our hypergrowth rather than data pipelines.”

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.

 


About the Authors


Seva Feldman is Vice President of R&D at ironSource Mobile.
With over two decades of experience senior architecture, DevOps and engineering roles, Seva is an expert in turning operational challenges into opportunities for improvement.

 

Eran Levy is the Director of Marketing at Upsolver.

 

 

 

 

Roy Hasson is the Global Business Development Lead of Analytics and Data Lakes at AWS.. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan, cheering his team on and hanging out with his family.

 

 

 

Analyze Google Analytics data using Upsolver, Amazon Athena, and Amazon QuickSight

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/analyze-google-analytics-data-using-upsolver-amazon-athena-and-amazon-quicksight/

In this post, we present a solution for analyzing Google Analytics data using Amazon Athena. We’re including a reference architecture built on moving hit-level data from Google Analytics to Amazon S3, performing joins and enrichments, and visualizing the data using Amazon Athena and Amazon QuickSight. Upsolver is used for data lake automation and orchestration, enabling customers to get started quickly.

Google Analytics is a popular solution for organizations who want to understand the performance of their web properties and applications. Google Analytics data is collected and aggregated to help users extract insights quickly.  This works great for simple analytics. It’s less than ideal, however, when you need to enrich Google Analytics data with other datasets to produce a comprehensive view of the customer journey.

Why analyze Google Analytics data on AWS?

Google Analytics has become the de-facto standard web analytics tool. It is offered for free at lower data volumes and provides tracking, analytics, and reporting.  It enables non-technical users to understand website performance by answering questions such as: where are users coming from? Which pages have the highest conversion rates? Where are users experiencing friction and abandoning their shopping cart?

While these questions are answered within the Google Analytics UI, there are however some limitation, such as:

  • Data sampling: Google Analytics standard edition displays sampled data when running ad hoc queries on time periods that contain more than 500,000 sessions. Large websites can easily exceed this number on a weekly or even daily basis. This can create reliability issues between different reports, as each query can be fed by a different sample of the data.
  • Difficulty integrating with existing AWS stack: Many customers have built or are in the process of building their data and analytics platform on AWS. Customers want to use the AWS analytics and machine learning capabilities with their Google Analytics data to enable new and innovative use cases.
  • Joining with external data sources: Seeing the full picture of a business’ online activity might require combining web traffic data with other sources. Google Analytics does not offer a simple way to either move raw data in or out of the system. Custom dimensions in Google Analytics can be used, but they are limited to 20 for the standard edition and are difficult to use.
  • Multi-dimensional analysis: Google Analytics custom reports and APIs are limited to seven dimensions per query. This limits the depth of analysis and requires various workarounds for more granular slicing and dicing.
  • Lack of alternatives: Google Analytics 360, which allows users to export raw data to Google BigQuery, carries a hefty annual fee. This can be prohibitive for organizations. And even with this upgrade, the native integration is only with BigQuery, which means users still can’t use their existing AWS stack.

Building or buying a new web analytics solution (including cookie-based tracking) is also cost-prohibitive, and can interrupt existing workflows that rely on Google Analytics data.

Customers are looking for a solution to enable their analysts and business users to incorporate Google Analytics data into their existing workflows using familiar AWS tools.

Moving Google Analytics data to AWS: Defining the requirements

To provide an analytics solution with the same or better level of reporting as Google Analytics, we designed our solution around the following tenets:

  1. Analytics with a low technical barrier to entry: Google Analytics is built for business users, and our solution is designed to provide a similar experience. This means that beyond ingesting the data, we want to automate the data engineering work that goes into making the data ready for analysis.  This includes data retention, partitioning, and compression. All of this work must be done under the hood and remain invisible to the user querying the data.
  2. Hit-level data: Google Analytics tracks clickstream activity based on Hits – the lowest level of interaction between a user and a webpage. These hits are then grouped into Sessions – hits within a given time period, and Users – groups of sessions (more details here). The standard Google Analytics API is limited to session and user-based queries, and does not offer any simple way of extracting hit-level data. Our solution, however, does provide access to this granular data.
  3. Unsampled data: By extracting the data from Google Analytics and storing it on Amazon S3, we are able to bypass the 500K sessions limitation. We also have access to unsampled data for any query at any scale.
  4. Data privacy: If sensitive data is stored in Google Analytics, relying on third-party ETL tools can create risks around data privacy, especially in the era of GDPR. Therefore, our solution encrypts data in transit and relies exclusively on processing within the customer’s VPC.

Solution overview

The solution is built on extracting hit-level data and storing it in a data lake architecture on Amazon S3. We then use Amazon Athena and Amazon QuickSight for analytics and reporting. Upsolver, an AWS premier solution provider, is used to automate ingestion, ETL and data management on S3. Upsolver also orchestrate the entire solution with a simple-to-use graphical user interface.  The following diagram shows the high level architecture of our solutions.

Reference architecture showing the flow of data across Google Anaytics, Amazon Athena and Amazon QuickSight

Using Upsolver’s GA connector we extract unsampled, hit-level data from Google Analytics. This data is then automatically ingested according to accepted data lake best practices and stored in an optimized form on Amazon S3. The following best practices are applied to the data:

  • Store data in Apache Parquet columnar file format to improve read performance and reduce the amount of data scanned per query.
  • Partition data by event (hit) time rather than by API query time.
  • Perform periodic compaction by which small files are merged into larger ones improving performance and optimizing compression.

Once data is stored on S3, we use Upsolver’s GUI to create structured fact tables from the Google Analytics data. Users can query them using Amazon Athena and Amazon Redshift. Upsolver provides simple to use templates to help users quickly create tables from their Google Analytics data.  Finally, we use Amazon QuickSight to create interactive dashboards to visualize the data.

The result is a complete view of our Google Analytics data. This view provides the level of self-service analytics that users have grown accustomed to, at any scale, and without the limitations outlined earlier.

Building the solution: Step by step guide

In this section, we walk through the steps to set up the environment, configure Upsolver’s Google Analytics plugin, extract the data, and begin exploring.

Step 1: Installation and permissions

  1. Sign up for Upsolver (can also be done via the AWS Marketplace).
  2. Allow Upsolver access to read data from Google Analytics and add new custom dimensions. Custom dimensions enable Upsolver to read non-sampled hit-level data directly from Google Analytics instead of creating parallel tracking mechanisms that aren’t as trust-worthy.
  3. To populate the custom dimensions that were added to Google Analytics, allow Upsolver to run a small JavaScript code on your website. If you’re using GA360, this is not required.

Step 2: Review and clean the raw data

For supported data sources, Upsolver automatically discovers the schema and collects key statistics for every field in the table. Doing so gives users a glimpse into their data.

In the following screenshot, you can see schema-on-read information on the left side, stats per field and value distribution on the right side.

Screen shot of the Upsolver UI showing schema-on-read information on the left side, stats per field and value distribution on the right side

Step 3: Publishing to Amazon Athena

Upsolver comes with four templates for creating tables in your AWS based data lake according to the Google Analytics entity being analyzed:

  • Pageviews – used to analyze user flow and behavior on specific sections of the web property using metrics such as time on page and exit rate.
  • Events – user-defined interactions such as scroll depth and link clicks.
  • Sessions – monitor a specific journey in the web property (all pageviews and events).
  • Users – understand a user’s interaction with the web property or app over time.

All tables are partitioned by event time, which helps improve query performance.

Upsolver users can choose to run the templates as-is, modify them first or create new tables unique to their needs.

The following screenshot shows the schema produced by the Pageviews template:

Screen shot of the Upsolver UI showing the schema produced by the Pageviews template:

The following screenshot shows the Pageviews and Events tables as well as the Amazon Athena views for Sessions and Users generated by the Upsolver templates.

Screenshot showing the Pageviews and Events tables as well as the Athena views for Sessions and Users generated from the Upsolver templates.

The following are a couple example queries you may want to run to extract specific insights:

-- Popular page titles 
SELECT page_title, 
       Count(*) AS num_hits 
FROM   ga_hits_pageviews 
GROUP  BY page_title 
ORDER  BY 2 DESC 
-- User aggregations from hit data 
SELECT user_id, 
       Count(*)                   AS num_hits, 
       Count(DISTINCT session_id) AS num_of_sessions, 
       Sum(session_duration)      AS total_sessions_time 
FROM   ga_hits_pageviews 
GROUP  BY user_id 

Step 4: Visualization in Amazon QuickSight

Now that the data has been ingested, cleansed, and written to S3 in a structured manner, we are ready visualize it with Amazon QuickSight. Start by creating a dashboard to mimic the one provided by Google Analytics.  But we don’t need to stop there.  We can use QuickSight ML Insights to extract deeper insights from our data.  We can also embed Amazon QuickSight visualizations into existing web portals and applications making insights available to everyone.

Screenshot of QuickSight visual ization showing several sections, one with a graph, several others with various statistics

Screen shot of QuickSight showing a global map with usage concentrations marked by bubbles, alongside a pie graph.

Sreenshot of QuickSight showing a bar graph, alongside a table with various data values.

Conclusion

With minimal setup, we were able to extract raw hit-level Google Analytics data, prepare, and stored it in a data lake on Amazon S3.  Using Upsolver, combined with Amazon Athena and Amazon QuickSight, we built a feature-complete solution for analyzing web traffic collected by Google Analytics on AWS.

Key technical benefits:

  • Schema on-read means data consumers don’t need to model the data into a table structure, and can instantly understand what their top dimensions are. For example, 85% of my users navigate my website using Google Chrome browser.
  • Graphical user interface that enables self-service consumption of Google Analytics data.
  • Fast implementation using pre-defined templates that map raw data from Google Analytics to tables in the data lake.
  • Ability to replay historical Google Analytics data stored on Amazon S3.
  • Ability to partition the data on Amazon S3 by hit time reducing complexity of handling late arriving events.
  • Optimize data on Amazon S3 automatically for improved query performance.
  • Automatically manage tables and partitions in AWS Glue Data Catalog.
  • Fully integrated with a suite of AWS native services – Amazon S3, Amazon Athena, Amazon Redshift and Amazon QuickSight.

Now that we have feature parity, we can begin to explore integrating other data sources such as CRM, sales, and customer profile to build a true 360-degree view of the customer.  Furthermore, you can now begin using AWS Machine Learning services to optimize traffic to your websites, forecast demand and personalize the user experience.

We’d love to hear what you think. Please feel free to leave a comment with any feedback or questions you may have.

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.

 


About the Authors


Roy Hasson is the global business development lead of analytics and data lakes at AWS.
He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan, cheering his team on and hanging out with his family.

 

 

 

Eran Levy is the director of marketing at Upsolver.

 

 

 

 

Extract Oracle OLTP data in real time with GoldenGate and query from Amazon Athena

Post Syndicated from Sreekanth Krishnavajjala original https://aws.amazon.com/blogs/big-data/extract-oracle-oltp-data-in-real-time-with-goldengate-and-query-from-amazon-athena/

This post describes how you can improve performance and reduce costs by offloading reporting workloads from an online transaction processing (OLTP) database to Amazon Athena and Amazon S3. The architecture described allows you to implement a reporting system and have an understanding of the data that you receive by being able to query it on arrival. In this solution:

  • Oracle GoldenGate generates a new row on the target for every change on the source to create Slowly Changing Dimension Type 2 (SCD Type 2) data.
  • Athena allows you to run ad hoc queries on the SCD Type 2 data.

Principles of a modern reporting solution

Advanced database solutions use a set of principles to help them build cost-effective reporting solutions. Some of these principles are:

  • Separate the reporting activity from the OLTP. This approach provides resource isolation and enables databases to scale for their respective workloads.
  • Use query engines running on top of distributed file systems like Hadoop Distributed File System (HDFS) and cloud object stores, such as Amazon S3. The advent of query engines that can run on top of open-source HDFS and cloud object stores further reduces the cost of implementing dedicated reporting systems.

Furthermore, you can use these principles when building reporting solutions:

  • To reduce licensing costs of the commercial databases, move the reporting activity to an open-source database.
  • Use a log-based, real-time, change data capture (CDC), data-integration solution, which can replicate OLTP data from source systems, preferably in real-time mode, and provide a current view of the data. You can enable the data replication between the source and the target reporting systems using database CDC solutions. The transaction log-based CDC solutions capture database changes noninvasively from the source database and replicate them to the target datastore or file systems.

Prerequisites

If you use GoldenGate with Kafka and are considering cloud migration, you can benefit from this post. This post also assumes prior knowledge of GoldenGate and does not detail steps to install and configure GoldenGate. Knowledge of Java and Maven is also assumed. Ensure that a VPC with three subnets is available for manual deployment.

Understanding the architecture of this solution

The following workflow diagram (Figure 1) illustrates the solution that this post describes:

  1. Amazon RDS for Oracle acts as the source.
  2. A GoldenGate CDC solution produces data for Amazon Managed Streaming for Apache Kafka (Amazon MSK). GoldenGate streams the database CDC data to the consumer. Kafka topics with an MSK cluster receives the data from GoldenGate.
  3. The Apache Flink application running on Amazon EMR consumes the data and sinks it into an S3 bucket.
  4. Athena analyzes the data through queries. You can optionally run queries from Amazon Redshift Spectrum.

Data Pipeline

Figure 1

Amazon MSK is a fully managed service for Apache Kafka that makes it easy to provision  Kafka clusters with few clicks without the need to provision servers, storage and configuring Apache Zookeeper manually. Kafka is an open-source platform for building real-time streaming data pipelines and applications.

Amazon RDS for Oracle is a fully managed database that frees up your time to focus on application development. It manages time-consuming database administration tasks, including provisioning, backups, software patching, monitoring, and hardware scaling.

GoldenGate is a real-time, log-based, heterogeneous database CDC solution. GoldenGate supports data replication from any supported database to various target databases or big data platforms like Kafka. GoldenGate’s ability to write the transactional data captured from the source in different formats, including delimited text, JSON, and Avro, enables seamless integration with a variety of BI tools. Each row has additional metadata columns including database operation type (Insert/Update/Delete).

Flink is an open-source, stream-processing framework with a distributed streaming dataflow engine for stateful computations over unbounded and bounded data streams. EMR supports Flink, letting you create managed clusters from the AWS Management Console. Flink also supports exactly-once semantics with the checkpointing feature, which is vital to ensure data accuracy when processing database CDC data. You can also use Flink to transform the streaming data row by row or in batches using windowing capabilities.

S3 is an object storage service with high scalability, data availability, security, and performance. You can run big data analytics across your S3 objects with AWS query-in-place services like Athena.

Athena is a serverless query service that makes it easy to query and analyze data in S3. With Athena and S3 as a data source, you define the schema and start querying using standard SQL. There’s no need for complex ETL jobs to prepare your data for analysis, which makes it easy for anyone familiar with SQL skills to analyze large-scale datasets quickly.

The following diagram shows a more detailed view of the data pipeline:

  1. RDS for Oracle runs in a Single-AZ.
  2. GoldenGate runs on an Amazon EC2 instance.
  3. The MSK cluster spans across three Availability Zones.
  4. Kafka topic is set up in MSK.
  5. Flink runs on an EMR Cluster.
  6. Producer Security Group for Oracle DB and GoldenGate instance.
  7. Consumer Security Group for EMR with Flink.
  8. Gateway endpoint for S3 private access.
  9. NAT Gateway to download software components on GoldenGate instance.
  10. S3 bucket and Athena.

For simplicity, this setup uses a single VPC with multiple subnets to deploy resources.

Figure 2

Configuring single-click deployment using AWS CloudFormation

The AWS CloudFormation template included in this post automates the deployment of the end-to-end solution that this blog post describes. The template provisions all required resources including RDS for Oracle, MSK, EMR, S3 bucket, and also adds an EMR step with a JAR file to consume messages from Kafka topic on MSK. Here’s the list of steps to launch the template and test the solution:

  1. Launch the AWS CloudFormation template in the us-east-1
  2. After successful stack creation, obtain GoldenGate Hub Server public IP from the Outputs tab of cloudformation.
  3. Login to GoldenGate hub server using the IP address from step 2 as ec2-user and then switch to oracle user.sudo su – oracle
  4. Connect to the source RDS for Oracle database using the sqlplus client and provide password(source).[[email protected] ~]$ sqlplus [email protected]
  5. Generate database transactions using SQL statements available in oracle user’s home directory.
    SQL> @s
    
     SQL> @s1
    
     SQL> @s2

  6. Query STOCK_TRADES table from Amazon Athena console. It takes a few seconds after committing transactions on the source database for database changes to be available for Athena for querying.

Manually deploying components

The following steps describe the configurations required to stream Oracle-changed data to MSK and sink it to an S3 bucket using Flink running on EMR. You can then query the S3 bucket using Athena. If you deployed the solution using AWS CloudFormation as described in the previous step, skip to the Testing the solution section.

 

  1. Prepare an RDS source database for CDC using GoldenGate.The RDS source database version is Enterprise Edition 12.1.0.2.14. For instructions on configuring the RDS database, see Using Oracle GoldenGate with Amazon RDS. This post does not consider capturing data definition language (DDL).
  2. Configure an EC2 instance for the GoldenGate hub server.Configure the GoldenGate hub server using Oracle Linux server 7.6 (ami-b9c38ad3) image in the us-east-1 Region. The GoldenGate hub server runs the GoldenGate extract process that extracts changes in real time from the database transaction log files. The server also runs a replicat process that publishes database changes to MSK.The GoldenGate hub server requires the following software components:
  • Java JDK 1.8.0 (required for GoldenGate big data adapter).
  • GoldenGate for Oracle (12.3.0.1.4) and GoldenGate for big data adapter (12.3.0.1).
  • Kafka 1.1.1 binaries (required for GoldenGate big data adapter classpath).
  • An IAM role attached to the GoldenGate hub server to allow access to the MSK cluster for GoldenGate processes running on the hub server.Use the GoldenGate (12.3.0) documentation to install and configure the GoldenGate for Oracle database. The GoldenGate Integrated Extract parameter file is eora2msk.prm.
    EXTRACT eora2msk
    SETENV (NLSLANG=AL32UTF8)
    
    USERID [email protected], password ggadmin
    TRANLOGOPTIONS INTEGRATEDPARAMS (max_sga_size 256)
    EXTTRAIL /u01/app/oracle/product/ogg/dirdat/or
    LOGALLSUPCOLS
    
    TABLE SOURCE.STOCK_TRADES;

    The logallsupcols extract parameter ensures that a full database table row is generated for every DML operation on the source, including updates and deletes.

  1. Create a Kafka cluster using MSK and configure Kakfa topic.You can create the MSK cluster from the AWS Management Console, using the AWS CLI, or through an AWS CloudFormation template.
  • Use the list-clusters command to obtain a ClusterArn and a Zookeeper connection string after creating the cluster. You need this information to configure the GoldenGate big data adapter and Flink consumer. The following code illustrates the commands to run:
    $aws kafka list-clusters --region us-east-1
    {
        "ClusterInfoList": [
            {
                "EncryptionInfo": {
                    "EncryptionAtRest": {
                        "DataVolumeKMSKeyId": "arn:aws:kms:us-east-1:xxxxxxxxxxxx:key/717d53d8-9d08-4bbb-832e-de97fadcaf00"
                    }
                }, 
                "BrokerNodeGroupInfo": {
                    "BrokerAZDistribution": "DEFAULT", 
                    "ClientSubnets": [
                        "subnet-098210ac85a046999", 
                        "subnet-0c4b5ee5ff5ef70f2", 
                        "subnet-076c99d28d4ee87b4"
                    ], 
                    "StorageInfo": {
                        "EbsStorageInfo": {
                            "VolumeSize": 1000
                        }
                    }, 
                    "InstanceType": "kafka.m5.large"
                }, 
                "ClusterName": "mskcluster", 
                "CurrentBrokerSoftwareInfo": {
                    "KafkaVersion": "1.1.1"
                }, 
                "CreationTime": "2019-01-24T04:41:56.493Z", 
                "NumberOfBrokerNodes": 3, 
                "ZookeeperConnectString": "10.0.2.9:2181,10.0.0.4:2181,10.0.3.14:2181", 
                "State": "ACTIVE", 
                "CurrentVersion": "K13V1IB3VIYZZH", 
                "ClusterArn": "arn:aws:kafka:us-east-1:xxxxxxxxx:cluster/mskcluster/8920bb38-c227-4bef-9f6c-f5d6b01d2239-3", 
                "EnhancedMonitoring": "DEFAULT"
            }
        ]
    }

  • Obtain the IP addresses of the Kafka broker nodes by using the ClusterArn.
    $aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn arn:aws:kafka:us-east-1:xxxxxxxxxxxx:cluster/mskcluster/8920bb38-c227-4bef-9f6c-f5d6b01d2239-3
    {
        "BootstrapBrokerString": "10.0.3.6:9092,10.0.2.10:9092,10.0.0.5:9092"
    }

  • Create a Kafka topic. The solution in this post uses the same name as table name for Kafka topic.
    ./kafka-topics.sh --create --zookeeper 10.0.2.9:2181,10.0.0.4:2181,10.0.3.14:2181 --replication-factor 3 --partitions 1 --topic STOCK_TRADES

  1. Provision an EMR cluster with Flink.Create an EMR cluster 5.25 with Flink 1.8.0 (advanced option of the EMR cluster), and enable SSH access to the master node. Create and attach a role to the EMR master node so that Flink consumers can access the Kafka topic in the MSK cluster.
  2. Configure the Oracle GoldenGate big data adapter for Kafka on the GoldenGate hub server.Download and install the Oracle GoldenGate big data adapter (12.3.0.1.0) using the Oracle GoldenGate download link. For more information, see the Oracle GoldenGate 12c (12.3.0.1) installation documentation.The following is the GoldenGate producer property file for Kafka (custom_kafka_producer.properties):
    #Bootstrap broker string obtained from Step 3
    bootstrap.servers= 10.0.3.6:9092,10.0.2.10:9092,10.0.0.5:9092
    #bootstrap.servers=localhost:9092
    acks=1
    reconnect.backoff.ms=1000
    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    # 100KB per partition
    batch.size=16384
    linger.ms=0

    The following is the GoldenGate properties file for Kafka (Kafka.props):

    gg.handlerlist = kafkahandler
    gg.handler.kafkahandler.type=kafka
    gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
    #The following resolves the topic name using the short table name
    #gg.handler.kafkahandler.topicName=SOURCE
    gg.handler.kafkahandler.topicMappingTemplate=${tableName}
    #The following selects the message key using the concatenated primary keys
    gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
    gg.handler.kafkahandler.format=json_row
    #gg.handler.kafkahandler.format=delimitedtext
    #gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
    #gg.handler.kafkahandler.SchemaTopicName=oratopic
    gg.handler.kafkahandler.BlockingSend =false
    gg.handler.kafkahandler.includeTokens=false
    gg.handler.kafkahandler.mode=op
    goldengate.userexit.writers=javawriter
    javawriter.stats.display=TRUE
    javawriter.stats.full=TRUE
    
    gg.log=log4j
    #gg.log.level=INFO
    gg.log.level=DEBUG
    gg.report.time=30sec
    gg.classpath=dirprm/:/home/oracle/kafka/kafka_2.11-1.1.1/libs/*
    
    javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

    The following is the GoldenGate replicat parameter file (rkafka.prm):

    REPLICAT rkafka
    -- Trail file for this example is located in "AdapterExamples/trail" directory
    -- Command to add REPLICAT
    -- add replicat rkafka, exttrail AdapterExamples/trail/tr
    TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
    REPORTCOUNT EVERY 1 MINUTES, RATE
    GROUPTRANSOPS 10000
    MAP SOURCE.STOCK_TRADES, TARGET SOURCE.STOCK_TRADES;

  3. Create an S3 bucket and directory with a table name underneath for Flink to store (sink) Oracle CDC data.
  4. Configure a Flink consumer to read from the Kafka topic that writes the CDC data to an S3 bucket.For instructions on setting up a Flink project using the Maven archetype, see Flink Project Build Setup.The following code example is the pom.xml file, used with the Maven project. For more information, see Getting Started with Maven.
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-quickstart-java</artifactId>
      <version>1.8.0</version>
      <packaging>jar</packaging>
    
      <name>flink-quickstart-java</name>
      <url>http://www.example.com</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <slf4j.version>@[email protected]</slf4j.version>
        <log4j.version>@[email protected]</log4j.version>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
      </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-connector-filesystem_2.11</artifactId>
         <version>1.8.0</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
         <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-s3-fs-presto</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
       <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_2.11</artifactId>
          <version>1.8.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.11</artifactId>
          <version>1.8.0</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.11</artifactId>
          <version>1.8.0</version>
        </dependency>
    
        <dependency>
          <groupId>com.typesafe.akka</groupId>
          <artifactId>akka-actor_2.11</artifactId>
          <version>2.4.20</version>
        </dependency>
        <dependency>
           <groupId>com.typesafe.akka</groupId>
           <artifactId>akka-protobuf_2.11</artifactId>
           <version>2.4.20</version>
        </dependency>
    <build>
      <plugins>
         <plugin>
             <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                       <execution>
                          <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                             </goals>
                           <configuration>
                          <artifactSet>
                      <excludes>
    
                             <!-- Excludes here -->
                               </excludes>
    </artifactSet>
                    <filters>
                            <filter>
                                                                                     <artifact>org.apache.flink:*</artifact>
                            </filter>
                       </filters>
                 <transformers>
                   <!-- add Main-Class to manifest file -->
                                                                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                                                            <mainClass>flinkconsumer.flinkconsumer</mainClass>
                   </transformer>
                                                                             <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                                                         <resource>reference.conf</resource>
                                                                    </transformer>
                                                            </transformers>
                    <relocations>
                          <relocation>
                                                                <pattern>org.codehaus.plexus.util</pattern>
                                                                  <shadedPattern>org.shaded.plexus.util</shadedPattern>
                        <excludes>
                                                                      <exclude>org.codehaus.plexus.util.xml.Xpp3Dom</exclude>
                                                                      <exclude>org.codehaus.plexus.util.xml.pull.*</exclude>
                                                                  </excludes>
                                                               </relocation>
                                                            </relocations>
                                                            <createDependencyReducedPom>false</createDependencyReducedPom>
                                                    </configuration>
                                            </execution>
                                    </executions>
                            </plugin>
    <!-- Add the main class as a manifest entry -->
                            <plugin>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-jar-plugin</artifactId>
                                    <version>2.5</version>
                                    <configuration>
                                            <archive>
                                                    <manifestEntries>
                                                            <Main-Class>flinkconsumer.flinkconsumer</Main-Class>
                                                    </manifestEntries>
                                            </archive>
           </configuration>
                            </plugin>
    
                            <plugin>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-compiler-plugin</artifactId>
                                    <version>3.1</version>
                                    <configuration>
                                            <source>1.7</source>
                                            <target>1.7</target>
                                    </configuration>
                            </plugin>
                    </plugins>
    
    </build>
    <profiles>
                    <profile>
                            <id>build-jar</id>
                            <activation>
                                    <activeByDefault>false</activeByDefault>
                            </activation>
                    </profile>
            </profiles>
    
    
    </project>

    Compile the following Java program using mvn clean install and generate the JAR file:

    package flinkconsumer;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.typeutils.TypeExtractor;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    import org.apache.flink.streaming.util.serialization.SerializationSchema;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.slf4j.LoggerFactory;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import akka.actor.ActorSystem;
    import akka.stream.ActorMaterializer;
    import akka.stream.Materializer;
    import com.typesafe.config.Config;
    import org.apache.flink.streaming.connectors.fs.*;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
    import java.util.stream.Collectors;
    import java.util.Arrays;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.Properties;
    import java.util.regex.Pattern;
    import java.io.*;
    import java.net.BindException;
    import java.util.*;
    import java.util.Map.*;
    import java.util.Arrays;
    
    public class flinkconsumer{
    
        public static void main(String[] args) throws Exception {
            // create Streaming execution environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setBufferTimeout(1000);
            env.enableCheckpointing(5000);
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.0.3.6:9092,10.0.2.10:9092,10.0.0.5:9092");
            properties.setProperty("group.id", "flink");
            properties.setProperty("client.id", "demo1");
    
            DataStream<String> message = env.addSource(new FlinkKafkaConsumer<>("STOCK_TRADES", new SimpleStringSchema(),properties));
            env.enableCheckpointing(60_00);
            env.setStateBackend(new FsStateBackend("hdfs://ip-10-0-3-12.ec2.internal:8020/flink/checkpoints"));
    
            RollingSink<String> sink= new RollingSink<String>("s3://flink-stream-demo/STOCK_TRADES");
           // sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HHmm"));
           // The bucket part file size in bytes.
               sink.setBatchSize(400);
             message.map(new MapFunction<String, String>() {
                private static final long serialVersionUID = -6867736771747690202L;
                @Override
                public String map(String value) throws Exception {
                    //return " Value: " + value;
                    return value;
                }
            }).addSink(sink).setParallelism(1);
            env.execute();
        }
    }

    Log in as a Hadoop user to an EMR master node, start Flink, and execute the JAR file:

    $ /usr/bin/flink run ./flink-quickstart-java-1.7.0.jar

  5. Create the stock_trades table from the Athena console. Each JSON document must be on a new line.
    CREATE EXTERNAL TABLE `stock_trades`(
      `trade_id` string COMMENT 'from deserializer', 
      `ticker_symbol` string COMMENT 'from deserializer', 
      `units` int COMMENT 'from deserializer', 
      `unit_price` float COMMENT 'from deserializer', 
      `trade_date` timestamp COMMENT 'from deserializer', 
      `op_type` string COMMENT 'from deserializer')
    ROW FORMAT SERDE 
      'org.openx.data.jsonserde.JsonSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
    LOCATION
      's3://flink-cdc-demo/STOCK_TRADES'
    TBLPROPERTIES (
      'has_encrypted_data'='false', 
      'transient_lastDdlTime'='1561051196')

    For more information, see Hive JSON SerDe.

Testing the solution

To test that the solution works, complete the following steps:

  1. Log in to the source RDS instance from the GoldenGate hub server and perform insert, update, and delete operations on the stock_trades table:
    $sqlplus [email protected]
    SQL> insert into stock_trades values(6,'NEW',29,75,sysdate);
    SQL> update stock_trades set units=999 where trade_id=6;
    SQL> insert into stock_trades values(7,'TEST',30,80,SYSDATE);
    SQL>insert into stock_trades values (8,'XYZC', 20, 1800,sysdate);
    SQL> commit;

  2. Monitor the GoldenGate capture from the source database using the following stats command:
    [[email protected] 12.3.0]$ pwd
    /u02/app/oracle/product/ogg/12.3.0
    [[email protected] 12.3.0]$ ./ggsci
    
    Oracle GoldenGate Command Interpreter for Oracle
    Version 12.3.0.1.4 OGGCORE_12.3.0.1.0_PLATFORMS_180415.0359_FBO
    Linux, x64, 64bit (optimized), Oracle 12c on Apr 16 2018 00:53:30
    Operating system character set identified as UTF-8.
    
    Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.
    
    
    
    GGSCI (ip-10-0-1-170.ec2.internal) 1> stats eora2msk

  3. Monitor the GoldenGate replicat to a Kafka topic with the following:
    [[email protected] 12.3.0]$ pwd
    /u03/app/oracle/product/ogg/bdata/12.3.0
    [[email protected] 12.3.0]$ ./ggsci
    
    Oracle GoldenGate for Big Data
    Version 12.3.2.1.1 (Build 005)
    
    Oracle GoldenGate Command Interpreter
    Version 12.3.0.1.2 OGGCORE_OGGADP.12.3.0.1.2_PLATFORMS_180712.2305
    Linux, x64, 64bit (optimized), Generic on Jul 13 2018 00:46:09
    Operating system character set identified as UTF-8.
    
    Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.
    
    
    
    GGSCI (ip-10-0-1-170.ec2.internal) 1> stats rkafka

  4. Query the stock_trades table using the Athena console.

Summary

This post illustrates how you can offload reporting activity to Athena with S3 to reduce reporting costs and improve OLTP performance on the source database. This post serves as a guide for setting up a solution in the staging environment.

Deploying this solution in a production environment may require additional considerations, for example, high availability of GoldenGate hub servers, different file encoding formats for optimal query performance, and security considerations. Additionally, you can achieve similar outcomes using technologies like AWS Database Migration Service instead of GoldenGate for database CDC and Kafka Connect for the S3 sink.

 


About the Authors

Sreekanth Krishnavajjala is a solutions architect at Amazon Web Services.

 

 

 

 

Vinod Kataria is a senior partner solutions architect at Amazon Web Services.

Perform biomedical informatics without a database using MIMIC-III data and Amazon Athena

Post Syndicated from James Wiggins original https://aws.amazon.com/blogs/big-data/perform-biomedical-informatics-without-a-database-using-mimic-iii-data-and-amazon-athena/

Biomedical researchers require access to accurate, detailed data. The MIT MIMIC-III dataset is a popular resource. Using Amazon Athena, you can execute standard SQL queries against MIMIC-III without first loading the data into a database. Your analyses always reference the most recent version of the MIMIC-III dataset.

This post describes how to make the MIMIC-III dataset available in Athena and provide automated access to an analysis environment for MIMIC-III on AWS. We also compare a MIMIC-III reference bioinformatics study using a traditional database to that same study using Athena.

Overview

A dataset capturing a variety of measures longitudinally over time, across many patients, can drive analytics and machine learning toward research discovery and improved clinical decision-making. These features describe the MIT Laboratory of Computational Physiology (LCP) MIMIC-III dataset. In the words of LCP researchers:

“MIMIC-III is a large, publicly available database comprising de-identified health-related data associated with approximately 60K admissions of patients who stayed in critical care units of the Beth Israel Deaconess Medical Center between 2001 and 2012. …MIMIC supports a diverse range of analytic studies spanning epidemiology, clinical decision-rule improvement, and electronic tool development. It is notable for three factors: it is publicly and freely available, it encompasses a diverse and large population of ICU patients, and it contains high temporal resolution data including lab results, electronic documentation, and bedside monitor trends and waveforms.”

Recently, the Registry of Open Data on AWS (RODA) program made the MIMIC-III dataset available through the AWS Cloud. You can now use the MIMIC-III dataset without having to download, copy, or pay to store it. Instead, you can analyze the MIMIC-III dataset in the AWS Cloud using AWS services like Amazon EC2, Athena, AWS Lambda, or Amazon EMR. AWS Cloud availability enables quicker and cheaper research into the dataset.

Services like Athena also offer you new analytical approaches to the MIMIC-III dataset. Using Athena, you can execute standard SQL queries against MIMIC-III without first loading the data into a database. Because you can reference the MIMIC-III dataset hosted in the RODA program, your analyses always reference the most recent version of the MIMIC-III dataset. Live hosting reduces upfront time and effort, eliminates data synchronization issues, improves data analysis, and reduces overall study costs.

Transforming MIMIC-III data

Historically, the MIMIC team distributed the MIMIC-III dataset in compressed (gzipped) CSV format. The choice of CSV format reflects the loading of MIMIC-III data into a traditional relational database for analysis.

In contrast, the MIMIC team provides the MIMIC-III dataset to the RODA program in the following formats:

  • The traditional CSV format
  • An Apache Parquet format optimized for modern data processing technologies such as Athena

Apache Parquet stores data by column, so queries that fetch specific columns can run without reading the whole table. This optimization helps improve the performance and lower the cost of many query types common to biomedical informatics.

To convert the original MIMIC-III CSV dataset to Apache Parquet, we created a data transformation job using AWS Glue. The MIMIC team schedules the AWS Glue job to run as needed, updating the Parquet files in the RODA program with any changes to the CSV dataset. These scheduled updates keep the Parquet files current without interfering with the MIMIC team’s CSV dataset creation procedures. Find the code for this AWS Glue job in the mimic-code GitHub repo. Use the same code as a basis to develop other CSV-to-Parquet conversions.

The code that converts the MIMIC-III NOTEEVENTS table offers a valuable resource. This table stores long medical notes made up of multiple lines, with commas, double-quotes, and other characters that can be challenging to transform. For example, you can use the following Spark read statement to interpret that CSV file:

df = spark.read.csv('s3://'+mimiccsvinputbucket+'/NOTEEVENTS.csv.gz',\
	header=True,\
	schema=schema,\
	multiLine=True,\
	quote='"',\
	escape='"')

Executing the indwelling arterial catheter study using MIMIC-III

For example, the MIMIC team provides code for the MIMIC indwelling arterial catheter (IAC) aline study. The study uses the MIMIC-III dataset to reproduce findings from the published study, The Association Between Indwelling Arterial Catheters and Mortality in Hemodynamically Stable Patients With Respiratory Failure.

You can obtain the aline study code, as well as many other analytic examples, in the MIT Laboratory of Computational Physiology MIMIC Code Repository in GitHub. Learn more about the provided code in the JAMA paper, The MIMIC Code Repository: enabling reproducibility in critical care research.

The aline study code comprises about 1400 lines of SQL. It was developed for a PostgreSQL database, and is executed and further analyzed by Python and R code. We ran the aline study against the Parquet MIMIC-III dataset using Athena instead of PostgreSQL to answer the following questions:

  • What modifications would be required, if any, to run the study using Athena, instead of PostgreSQL?
  • How would performance differ?
  • How would cost differ?

Some SQL features used in the study work differently in Athena than they do in PostgreSQL. As a result, about 5% of the SQL statements needed modification. Specifically, the SQL statements require the following types of modification for use with Athena:

Instead of using materialized views, in Athena, create a new table. For instance, the CREATE MATERIALIZED VIEW statement can be written as CREATE TABLE.

  • Instead of using schema context statements like SET SEARCH_PATH, in Athena, explicitly declare the schema of referenced tables. For instance, instead of having a SET SEARCH_PATH statement before your query and referencing tables without a schema as follows:

SET SEARCH_PATH TO mimiciii;

left join chartevents ce

You declare the schema as a part of every table reference:

left join mimiciii.chartevents ce

  • Epoch time, as well as timestamp arithmetic, works differently in Athena than PostgreSQL. So, instead of arithmetic, use the date_diff() and specify seconds as the return value.

extract(epoch from endtime-starttime)/24.0/60.0/60.0 

The preceding statement can be written as:

date_diff('second',starttime, endtime)/24.0/60.0/60.0

  • Athena uses the “double” data type instead of the “numeric” data type.

ROUND( cast(f.height_first as numeric), 2) AS height_first,

The preceding statement can be written as:

ROUND( cast(f.height_first as double), 2) AS height_first,

  • If VARCHAR fields are empty, they are not detected as NULL. Instead, compare a VARCHAR to an empty string. For instance, consider the following field:

where ce.value IS NOT NULL

If ce.value is a VARCHAR, it can be written as the following:

where ce.value <> ''

After making these minor edits to the aline study SQL statements, the study executes successfully using Athena instead of PostgreSQL. The output produced from both methods is identical.

Next, consider the speed of Athena compared to PostgreSQL in analyzing the aline study. As shown in the following graph, the aline study using Athena queries of the Parquet-formatted MIMIC-III dataset run 10 times faster than queries of the same data in an Amazon RDS PostgreSQL database.

Compare the costs of running the aline study in Athena to running it in PostgreSQL. RDS PostgreSQL databases bill in one-second increments for the time the database runs, while Athena queries bill per gigabyte of data each query scans. Because of this fundamental pricing difference, you must make some assumptions to compare costs.

You could assume that running the aline study in RDS PostgreSQL involves the following steps, taking up an eight-hour working day:

  1. Deploy a new RDS PostgreSQL database.
  2. Load the MIMIC-III dataset.
  3. Connect a Jupyter notebook.
  4. Run the aline study.
  5. Terminate the RDS database.

You’d then compare the cost of running an RDS PostgreSQL database for eight hours (at $0.37 per hour, db.m5.xlarge with 100-GB EBS storage) to the cost of running the aline study against a dataset using Athena (9.93 GB of data scanned at $0.005/GB). As the following graph shows, this results in an almost 60x cost savings.

Working with MIMIC-III in AWS

MIMIC-III is a publicly available dataset containing detailed information about the clinical care of patients. For this reason, the MIMIC team requires that you complete a training course and submit a formal request before gaining access. For more information, see Requesting access.

After you obtain access to the MIMIC-III dataset, log in to the PhysioNet website and, under your profile settings, provide your AWS account ID. Then click the request access link, under Files, on the MIMIC-III Clinical Database page.  You then have access to the MIMIC-III dataset in AWS.

Choose Launch Stack below to begin working with the MIMIC-III dataset in your AWS account. This launches an AWS CloudFormation template, which deploys an index of the MIMIC-III Parquet-formatted dataset into your AWS Glue Data Catalog. It also deploys an Amazon SageMaker notebook instance pre-loaded with the aline study code and many other MIMIC-provided analytic examples.

After the AWS CloudFormation template deploys, choose Outputs, and follow the link to access Jupyter Notebooks. From there, you can open and execute the aline study code by browsing the filesystem to the path ./mimic-code/notebooks/aline-aws/aline-awsathena.ipynb.

After you’ve completed your analysis, you can stop your Jupyter Notebook instance to suspend charges for compute.  Any time that you want to continue working, just start the instance and continue where you left off.

Conclusion

The RODA program provides quick and inexpensive access to the global biomedical research resources of the MIMIC-III dataset. Cloud-native analytic tools like AWS Glue and Athena accelerate research, and groups like the MIT Laboratory of Computational Physiology are pioneering data availability in modern data formats like Apache Parquet.

The analytic approaches and code demonstrated in this post can be applied generally to help increase agility and decrease cost. Consider using them as you enhance existing or perform new biomedical informatics research.

 


About the Author


James Wiggins is a senior healthcare solutions architect at AWS. He is passionate about using technology to help organizations positively impact world health. He also loves spending time with his wife and three children.

 

 

 


Alistair Johnson is a research scientist and the Massachusetts Institute of Technology. Alistair has extensive experience and expertise in working with ICU data, having published the MIMIC-III and eICU-CRD datasets. His current research focuses on clinical epidemiology and machine learning for decision support.

 

 

AWS Lake Formation – Now Generally Available

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/aws-lake-formation-now-generally-available/

As soon as companies started to have data in digital format, it was possible for them to build a data warehouse, collecting data from their operational systems, such as Customer relationship management (CRM) and Enterprise resource planning (ERP) systems, and use this information to support their business decisions.

The reduction in costs of storage, together with an even greater reduction in complexity for managing large quantities of data, made possible by services such as Amazon S3, has allowed companies to retain more information, including raw data that is not structured, such as logs, images, video, and scanned documents.

This is the idea of a data lake: to store all your data in one, centralized repository, at any scale. We are seeing this approach with customers like Netflix, Zillow, NASDAQ, Yelp, iRobot, FINRA, and Lyft. They can run their analytics on this larger dataset, from simple aggregations to complex machine learning algorithms, to better discover patterns in their data and understand their business.

Last year at re:Invent we introduced in preview AWS Lake Formation, a service that makes it easy to ingest, clean, catalog, transform, and secure your data and make it available for analytics and machine learning. I am happy to share that Lake Formation is generally available today!

With Lake Formation you have a central console to manage your data lake, for example to configure the jobs that move data from multiple sources, such as databases and logs, to your data lake. Having such a large and diversified amount of data makes configuring the right access permission also critical. You can secure access to metadata in the Glue Data Catalog and data stored in S3 using a single set of granular data access policies defined in Lake Formation. These policies allow you to define table and column-level data access.

One thing I like the most of Lake Formation is that it works with your data already in S3! You can easily register your existing data with Lake Formation, and you don’t need to change existing processes loading your data to S3. Since data remains in your account, you have full control.

You can also use Glue ML Transforms to easily deduplicate your data. Deduplication is important to reduce the amount of storage you need, but also to make analyzing your data more efficient because you don’t have neither the overhead nor the possible confusion of looking at the same data twice. This problem is trivial if duplicate records can be identified by a unique key, but becomes very challenging when you have to do a “fuzzy match”. A similar approach can be used for record linkage, that is when you are looking for similar items in different tables, for example to do a “fuzzy join” of two databases that do not share a unique key.

In this way, implementing a data lake from scratch is much faster, and managing a data lake is much easier, making these technologies available to more customers.

Creating a Data Lake
Let’s build a data lake using the Lake Formation console. First I register the S3 buckets that are going to be part of my data lake. Then I create a database and grant permission to the IAM users and roles that I am going to use to manage my data lake. The database is registered in the Glue Data Catalog and holds the metadata required to analyze the raw data, such as the structure of the tables that are going to be automatically generated during data ingestion.

Managing permissions is one of the most complex tasks for a data lake. Consider for example the huge amount of data that can be part of it, the sensitive, mission-critical nature of some of the data, and the different structured, semi-structured, and unstructured formats in which data can reside. Lake Formation makes it easier with a central location where you can give IAM users, roles, groups, and Active Directory users (via federation) access to databases, tables, optionally allowing or denying access to specific columns within a table.

To simplify data ingestion, I can use blueprints that create the necessary workflows, crawlers and jobs on AWS Glue for common use cases. Workflows enable orchestration of your data loading workloads by building dependencies between Glue entities, such as triggers, crawlers and jobs, and allow you to track visually the status of the different nodes in the workflows on the console, making it easier to monitor progress and troubleshoot issues.

Database blueprints help load data from operational databases. For example, if you have an e-commerce website, you can ingest all your orders in your data lake. You can load a full snapshot from an existing database, or incrementally load new data. In case of an incremental load, you can select a table and one or more of its columns as bookmark keys (for example, a timestamp in your orders) to determine previously imported data.

Log file blueprints simplify ingesting logging formats used by Application Load Balancers, Elastic Load Balancers, and AWS CloudTrail. Let’s see how that works more in depth.

Security is always a top priority, and I want to be able to have a forensic log of all management operations across my account, so I choose the CloudTrail blueprint. As source, I select a trail collecting my CloudTrail logs from all regions into an S3 bucket. In this way, I’ll be able to query account activity across all my AWS infrastructure. This works similarly for a larger organization having multiple AWS accounts: they just need, when configuring the trail in the CloudTrial console, to apply the trail to their whole organization.

I then select the target database, and the S3 location for the data lake. As data format I use Parquet, a columnar storage format that will make querying the data faster and cheaper. The import frequency can be hourly to monthly, with the option to choose the day of the week and the time. For now, I want to run the workflow on demand. I can do that from the console or programmatically, for example using any AWS SDK or the AWS Command Line Interface (CLI).

Finally, I give the workflow a name, the IAM role to use during execution, and a prefix for the tables that will be automatically created by this workflow.

I start the workflow from the Lake Formation console and select to view the workflow graph. This opens the AWS Glue console, where I can visually see the steps of the workflow and monitor the progress of this run.

When the workflow is completed a new table is available in my data lake database. The source data remain as logs in the S3 bucket output of CloudTrail, but now I have them consolidated, in Parquet format and partitioned by date, in my data lake S3 location. To optimize costs, I can set up an S3 lifecycle policy that automatically expires data in the source S3 bucket after a safe amount of time has passed.

Securing Access to the Data Lake
Lake Formation provides secure and granular access to data stores in the data lake, via a new grant/revoke permissions model that augments IAM policies. It is simple to set up these permissions, for example using the console:

I simply select the IAM user or role I want to grant access to. Then I select the database and optionally the tables and the columns I want to provide access to. It is also possible to select which type of access to provide. For this demo, simple select permissions are sufficient.

Accessing the Data Lake
Now I can query the data using tools like Amazon Athena or Amazon Redshift. For example, I open the query editor in the Athena console. First, I want to use my new data lake to look into which source IP addresses are most common in my AWS Account activity:

SELECT sourceipaddress, count(*)
FROM my_trail_cloudtrail
GROUP BY  sourceipaddress
ORDER BY  2 DESC;

Looking at the result of the query, you can see which are the AWS API endpoints that I use the most. Then, I’d like to check which user identity types are used. That is an information stored in JSON format inside one of the columns. I can use some of the JSON functions available with Amazon Athena to get that information in my SQL statements:

SELECT json_extract_scalar(useridentity, '$.type'), count(*)
FROM "mylake"."my_trail_cloudtrail"
GROUP BY  json_extract_scalar(useridentity, '$.type')
ORDER BY  2 DESC;

Most of the times, AWS services are the ones creating activities in my trail. These queries are just an example, but give me quickly a deeper insight in what is happening in my AWS account.

Think of what could be a similar impact for your business! Using database and logs blueprints, you can quickly create workflows to ingest data from multiple sources within your organization, set the right permission at column level of who can have access to any information collected, clean and prepare your data using machine learning transforms, and correlate and visualize the information using tools like Amazon Athena, Amazon Redshift, and Amazon QuickSight.

Customizing Data Access with Column-Level Permissions
In order to follow data privacy guidelines and compliance, the mission-critical data stored in a data lake requires to create custom views for different stakeholders inside the company. Let’s compare the visibility of two IAM users in my AWS account, one that has full permissions on a table, and one that has only select access to a subset of the columns of the same table.

I already have a user with full access to the table containing my CloudTrail data, it’s called danilop. I create a new limitedview IAM user and I give it access to the Athena console. In the Lake Formation console, I only give this new user select permissions on three of the columns.

To verify the different access to the data in the table, I log in with one user at a time and go to the Athena console. On the left I can explore which tables and columns the logged-in user can see in the Glue Data Catalog. Here’s a comparison for the two users, side-by-side:

The limited user has access only to the three columns that I explicitly configured, and to the four columns used for partitioning the table, whose access is required to see any data. When I query the table in the Athena console with a select * SQL statement, logged in as the limitedview user, I only see data from those seven columns:

Available Now
There is no additional cost in using AWS Lake Formation, you pay for the use of the underlying services such as Amazon S3 and AWS Glue. One of the core benefits of Lake Formation are the security policies it is introducing. Previously you had to use separate policies to secure data and metadata access, and these policies only allowed table-level access. Now you can give access to each user, from a central location, only to the the columns they need to use.

AWS Lake Formation is now available in US East (N. Virginia), US East (Ohio), US West (Oregon), Europe (Ireland), and Asia Pacific (Tokyo). Redshift integration with Lake Formation requires Redshift cluster version 1.0.8610 or higher, your clusters should have been automatically updated by the time you read this. Support for Apache Spark with Amazon EMR will follow over the next few months.

I only scratched the surface of what you can do with Lake Formation. Building and managing a data lake for your business is now much easier, let me know how you are using these new capabilities!

Danilo

Analyzing AWS WAF logs with Amazon ES, Amazon Athena, and Amazon QuickSight

Post Syndicated from Aaron Franco original https://aws.amazon.com/blogs/big-data/analyzing-aws-waf-logs-with-amazon-es-amazon-athena-and-amazon-quicksight/

AWS WAF now includes the ability to log all web requests inspected by the service. AWS WAF can store these logs in an Amazon S3 bucket in the same Region, but most customers deploy AWS WAF across multiple Regions—wherever they also deploy applications. When analyzing web application security, organizations need the ability to gain a holistic view across all their deployed AWS WAF Regions.

This post presents a simple approach to aggregating AWS WAF logs into a central data lake repository, which lets teams better analyze and understand their organization’s security posture. I walk through the steps to aggregate regional AWS WAF logs into a dedicated S3 bucket. I follow that up by demonstrating how you can use Amazon ES to visualize the log data. I also present an option to offload and process historical data using AWS Glue ETL. With the data collected in one place, I finally show you how you can use Amazon Athena and Amazon QuickSight to query historical data and extract business insights.

Architecture overview

The case I highlight in this post is the forensic use of the AWS WAF access logs to identify distributed denial of service (DDoS) attacks by a client IP address. This solution provides your security teams with a view of all incoming requests hitting every AWS WAF in your infrastructure.

I investigate what the IP access patterns look like over time and assess which IP addresses access the site multiple times in a short period of time. This pattern suggests that the IP address could be an attacker. With this solution, you can identify DDoS attackers for a single application, and detect DDoS patterns across your entire global IT infrastructure.

Walkthrough

This solution requires separate tasks for architecture setup, which allows you to begin receiving log files in a centralized repository, and analytics, which processes your log data into useful results.

Prerequisites

To follow along, you must have the following resources:

  • Two AWS accounts. Following AWS multi-account best practices, create two accounts:
    • A logging account
    • A resource account that hosts the web applications using AWS WAFFor more information about multi-account setup, see AWS Landing Zone. Using multiple accounts isolates your logs from your resource environments. This helps maintain the integrity of your log files and provides a central access point for auditing all application, network, and security logs.
  • The ability to launch new resources into your account. The resources might not be eligible for Free Tier usage and so might incur costs.
  • An application running with an Application Load Balancer, preferably in multiple Regions. If you do not already have one, you can launch any AWS web application reference architecture to test and implement this solution.

For this walkthrough, you can launch an Amazon ECS example from the ecs-refarch-cloudformation GitHub repo. This is a “one click to deploy” example that automatically sets up a web application with an Application Load Balancer. Launch this in two different Regions to simulate a global infrastructure. You ultimately set up a centralized bucket that both Regions log into, which your forensic analysis tools then draw from. Choose Launch Stack to launch the sample application in your Region of choice.

Setup

Architecture setup allows you to begin receiving log files in a centralized repository.

Step 1: Provide permissions

Begin this process by providing appropriate permissions for one account to access resources in another. Your resource account needs cross-account permission to access the bucket in the logging account.

  1. Create your central logging S3 bucket in the logging account and attach the following bucket policy to it under the Permissions Make a note of the bucket’s ARN. You need this information for future steps.
  2. Change RESOURCE-ACCOUNT-ID and CENTRAL-LOGGING-BUCKET-ARNto the correct values based on the actual values in your accounts:
     // JSON Document
     {
       "Version": "2012-10-17",
       "Statement": [
          {
             "Sid": "Cross Account AWS WAF Account 1",
             "Effect": "Allow",
             "Principal": {
                "AWS": "arn:aws:iam::RESOURCE-ACCOUNT-ID:root"
             },
             "Action": [
                "s3:GetObject",
                "s3:PutObject"
             ],
             "Resource": [
                "CENTRAL-LOGGING-BUCKET-ARN/*"
             ]
          }
       ]
    }

Step 2: Manage Lambda permissions

Next, the Lambda function that you create in your resource account needs permissions to access the S3 bucket in your central logging account so it can write files to that location. You already provided basic cross-account access in the previous step, but Lambda still needs the granular permissions at the resources level. Remember to grant these permissions in both Regions where you launched the application that you intend to monitor with AWS WAF.

  1. Log in to your resource account.
  2. To create an IAM role for the Lambda function, in the Lambda console, choose Policies, Create Policy.
  3. Choose JSON, and enter the following policy document. Replace YOUR-SOURCE-BUCKETand YOUR-DESTINATION-BUCKET with the relative ARNs of the buckets that you are using for this walkthrough.
    // JSON document
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "ListSourceAndDestinationBuckets",
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket",
                    "s3:ListBucketVersions"
                ],
                "Resource": [
                    "YOUR-SOURCE-BUCKET",
                    "YOUR-DESTINATION-BUCKET"
                ]
            },
            {
                "Sid": "SourceBucketGetObjectAccess",
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:GetObjectVersion"
                ],
                "Resource": "YOUR-SOURCE-BUCKET/*"
            },
            {
                "Sid": "DestinationBucketPutObjectAccess",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject"
                ],
                "Resource": "YOUR-DESTINATION-BUCKET/*"
            }
        ]
     }

  4. Choose Review policy, enter your policy name, and save it.
  5. With the policy created, create a new role for your Lambda function and attach the custom policy to that role. To do this, navigate back to the IAM dashboard.
  6. Select Create roleand choose Lambda as the service that uses the role. Select the custom policy that you created earlier in this step and choose Next. You can add tags if required and then name and create this new role.
  7. You must also add S3 as a trusted entity in the Trust Relationship section of the role. Choose Edit trust relationship and add amazonaws.com to the policy, as shown in the following example.

Lambda and S3 now appear as trusted entities under the Trust relationships tab, as shown in the following screenshot.

Step 3: Create a Lambda function and copy log files

Create a Lambda function in the same Region as your resource account’s S3 bucket. This function reads log files from the resource account bucket and then copies that content to the logging account’s bucket. Repeat this step for every Region where you launched the application that you intend to monitor with AWS WAF.

  1. Log in to your resource account.
  2. Navigate to Lambda in your console and choose Create Function.
  3. Choose the Author from scratch function and name it. Choose the IAM role you created in the previous step and attach it to the Lambda function.
  4. Choose Create function.
  5. This Lambda function receives a document from S3 that contains nested JSON string data. To handle this data, you must extract the JSON from this string to retrieve key names of both the document and the bucket. Your function then uses this information to copy the data to your central logging account bucket in the next step. To create this function, Copy and paste this code into the Lambda function that you created. Replace the bucket names with the names of the buckets that you created earlier. After you decide on a partitioning strategy, modify this script later.
    // Load the AWS SDK
    const aws = require('aws-sdk');
    
    // Construct the AWS S3 Object 
    const s3 = new aws.S3();
    
    //Main function
    exports.handler = (event, context, callback) => {
        console.log("Got WAF Item Event")
        var _srcBucket = event.Records[0].s3.bucket.name;
        let _key = event.Records[0].s3.object.key;
        let _keySplit = _key.split("/")
        let _objName = _keySplit[ (_keySplit.length - 1) ];
        let _destPath = _keySplit[0]+"/"+_keySplit[1]+"/YOUR-DESTINATION-BUCKET/"+_objName;
        let _sourcePath = _srcBucket + "/" + _key;
        console.log(_destPath)
        let params = { Bucket: destBucket, ACL: "bucket-owner-full-control", CopySource: _sourcePath, Key: _destPath };
        s3.copyObject(params, function(err, data) {
            if (err) {
                console.log(err, err.stack);
            } else {
                console.log("SUCCESS!");
            }
        });
        callback(null, 'All done!');
    };

Step 4: Set S3 to Lambda event triggers

This step sets up event triggers in your resource account’s S3 buckets. These triggers send the file name and location logged by AWS WAF logs to the Lambda function. The triggers also notify the Lambda function that it must move the newly arrived file into your central logging bucket. Repeat this step for every Region where you launched the application that you intend to monitor with AWS WAF.

  1. Go to the S3 dashboard and choose your S3 bucket, then choose the Properties Under Advanced settings, choose Events.
  2. Give your event a name and select PUT from the Events check boxes.
  3. Choose Lambda from the Send To option and select your Lambda function as the destination for the event.

Step 5: Add AWS WAF to the Application Load Balancer

Add an AWS WAF to the Application Load Balancer so that you can start logging events. You can optionally delete the original log file after Lambda copies it. This reduces costs, but your business and security needs might err on the side of retaining that data.

Create a separate prefix for each Region in your central logging account bucket waf-central-logs so that AWS Glue can properly partition them. For best practices of partitioning with AWS Glue, see Working with partitioned data in AWS Glue. AWS Glue ingests your data and stores it in a columnar format optimized for querying in Amazon Athena. This helps you visualize the data and investigate the potential attacks.

Repeat this step for every Region where you launched the application that you intend to monitor with AWS WAF. The procedure assumes that you already have an AWS WAF enabled that you can use for this exercise. To move forward with the next step, you need AWS WAF enabled and connected to Amazon Kinesis Data Firehose for log delivery.

Setting up and configuring AWS WAF

If you don’t already have a web ACL in place, set up and configure AWS WAF at this point. This solution handles logging data from multiple AWS WAF logs in multiple Regions from more than one account.

To do this efficiently, you should consider your partitioning strategy for the data. You can grant your security teams a comprehensive view of the network. Create each partition based on the Kinesis Data Firehose delivery stream for the specific AWS WAF associated with the Application Load Balancer. This partitioning strategy also allows the security team to view the logs by Region and by account. As a result, your S3 bucket name and prefix look similar to the following example:

s3://central-waf-logs/<account_id>/<region_name>/<kinesis_firehose_name>/...filename...

Step 6: Copying logs with Lambda code

This step updates the Lambda function to start copying log files. Keep your partitioning strategy in mind as you update the Lambda function. Repeat this step for every Region where you launched the application that you intend to monitor with AWS WAF.

To accommodate the partitioning, modify your Lambda code to match the examples in the GitHub repo.

Replace <kinesis_firehose_name> in the example code with the name of the Kinesis Data Firehose delivery stream attached to the AWS WAF. Replace <central logging bucket name> with the S3 bucket name from your central logging account.

Kinesis Data Firehose should now begin writing files to your central S3 logging bucket with the correct partitioning. To generate logs, access your web application.

Analytics

Now that Kinesis Data Firehose can write collected files into your logging account’s S3 bucket, create an Elasticsearch cluster in your logging account in the same Region as the central logging bucket. You also must create a Lambda function to handle S3 events as the central logging bucket receives new log files. This creates a connection between your central log files and your search engine. Amazon ES gives you the ability to query your logs quickly to look for potential security threats. The Lambda function loads the data into your Amazon ES cluster. Amazon ES also includes a tool named Kibana, which helps with managing data and creating visualizations.

Step 7: Create an Elasticsearch cluster

  1. In your central Logging Account, navigate to the Elasticsearch Service in the AWS Console.
  2. Select Create Cluster, enter a domain name for your cluster, and choose version 3 from the Elasticsearch version dropdown. Choose Next.In this example, don’t implement any security policies for your cluster and only use one instance. For any real-world production tasks, keep your Elasticsearch Cluster inside your VPC.
  3. For network configuration, choose Public access and choose Next.
  4. For the access policy, and this tutorial, only allow access to the domain from a specified Account ID or ARN address. In this case, use your Account ID to gain access.
  5. Choose Next and on the final screen and confirm. You generally want to create strict access policies for your domain and not allow public access. This example only uses these settings to quickly demonstrate the capabilities of AWS services. I would never recommend this in a production environment.

AWS takes a few minutes to finish and activate your Amazon ES. Once it goes live, you can see two endpoints. The Endpoint URL is the URL you use to send data to the cluster.

Step 8: Create a Lambda function to copy log files

Add an event trigger to your central logs bucket. This trigger tells your Lambda function to write the data from the log file to Amazon ES. Before you create the S3 trigger, create a Lambda function in your logging account to handle the events.

For this Lambda function, we use code from the aws-samples GitHub repository that streams data from an S3 file line by line into Amazon ES. This example uses code taken from amazon-elasticsearch-lambda-samples. Name your new Lambda function myS3toES.

  1. Copy and paste the following code into a text file named js:
    exports.handler = (event, context, callback) => {
        // get the source bucket name
        var _srcBucket = event.Records[0].s3.bucket.name;
            // get the object key of the file that landed on S3
        let _key = event.Records[0].s3.object.key;
        
        // split the key by "/"
        let _keySplit = _key.split("/")
            // get the object name
        let _objName = _keySplit[ (_keySplit.length - 1) ];
            // reset the destination path
        let _destPath = _keySplit[0]+"/"+_keySplit[1]+"/<kinesis_firehose_name>/"+_objName;
            // setup the source path
        let _sourcePath = _srcBucket + "/" + _key;
            // build the params for the copyObject request to S3
        let params = { Bucket: destBucket, ACL: "bucket-owner-full-control", CopySource: _sourcePath, Key: _destPath };
            // execute the copyObject request
        s3.copyObject(params, function(err, data) {
            if (err) {
                console.log(err, err.stack);
            } else {
                console.log("SUCCESS!");
            }
        });
        callback(null, 'All done!');
    };

  2. Copy and paste this code into a text file and name it json:
    //JSON Document
    {
      "name": "s3toesfunction",
      "version": "1.0.0",
      "description": "",
      "main": "index.js",
      "scripts": {},
      "author": "",
      "dependencies": {
        "byline": "^5.0.0",
        "clf-parser": "0.0.2",
        "path": "^0.12.7",    "stream": "0.0.2"
      }
    }

  3. Execute the following command in the folder containing these files:> npm install
  4. After the installation completes, create a .zip file that includes the js file and the node_modules folder.
  5. Log in to your logging account.
  6. Upload your .zip file to the Lambda function. For Code entry type, choose Upload a .zip file.
  7. This Lambda function needs an appropriate service role with a trust relationship to S3. Choose Edit trust relationships and add amazonaws.com and lambda.amazonaws.com as trusted entities.
  8. Set up your IAM role with the following permissions: S3 Read Only permissions and Lambda Basic Execution. To grant the role the appropriate access, assign it to the Lambda function from the Lambda Execution Role section in the console.
  9. Set Environment variables for your Lambda function so it knows where to send the data. Add an endpoint and use the endpoint URL you created in Step 7. Add an index and enter your index name. Add a value for region and detail the Region where you deployed your application.

Step 9: Create an S3 trigger

After creating the Lambda function, create the event triggers on your S3 bucket to execute that function. This completes your log delivery pipeline to Amazon ES. This is a common pipeline architecture for streaming data from S3 into Amazon S3.

  1. Log in to your central logging account.
  2. Navigate to the S3 console, select your bucket, then open the Properties pane and scroll down to Events.
  3. Choose Add notification and name your new event s3toLambdaToEs.
  4. Under Events, select the check box for PUT. Leave Prefix and Suffix
  5. Under Send to, select Lambda Function, and enter the name of the Lambda function that you created in the previous step—in this example, myS3toES.
  6. Choose Save.

With this complete, Lambda should start sending data to your Elasticsearch index whenever you access your web application.

Step 10: Configure Amazon ES

Your pipeline now automatically adds data to your Elasticsearch cluster. Next, use Kibana to visualize the AWS WAF logs in the central logging account’s S3 bucket. This is the final step in assembling your forensic investigation architecture.

Kibana provides tools to create visualizations and dashboards that help your security teams view log data. Using the log data, you can filter by IP address to see how many times an IP address has hit your firewall each month. This helps you track usage anomalies and isolate potentially malicious IP addresses. You can use this information to add web ACL rules to your firewall that adds extra protection against those IP addresses.

Kibana produces visualizations like the following screenshot.

In addition to the Number of IPs over Time visualization, you can also correlate the IP address to its country of origin. Correlation provides even more precise filtering for potential web ACL rules to protect against attackers. The visualization for that data looks like the following image.

Elasticsearch setup

To set up and visualize your AWS WAF data, follow this How to analyze AWS WAF logs using Amazon Elasticsearch Service post. With this solution, you can investigate your global dataset instead of isolated Regions.

An alternative to Amazon ES

Amazon ES is an excellent tool for forensic work because it provides high-performance search capability for large datasets. However, Amazon ES requires cluster management and complex capacity planning for future growth. To get top-notch performance from Amazon ES, you must adequately scale it. With the more straightforward data of these investigations, you could instead work with more traditional SQL queries.

Forensic data grows quickly, so using a relational database means you might quickly outgrow your capacity. Instead, take advantage of AWS serverless technologies like AWS Glue, Athena, and Amazon QuickSight. These technologies enable forensic analysis without the operational overhead you would experience with Elasticsearch or a relational database. To learn more about this option, consult posts like How to extract, transform, and load data from analytic processing using AWS Glue and Work with partitioned data in AWS Glue.

Athena query

With your forensic tools now in place, you can use Athena to query your data and analyze the results. This lets you refine the data for your Kibana visualizations, or directly load it into Amazon QuickSight for additional visualization. Use the Athena console to experiment until you have the best query for your visual needs. Having the database in your AWS Glue Catalog means you can make ad hoc queries in Athena to inspect your data.

In the Athena console, create a new Query tab and enter the following query:

# SQL Query
SELECT date_format(from_unixtime("timestamp"/1000), '%Y-%m-%d %h:%i:%s') as event_date, client_ip, country, account_id, waf_name, region FROM "paritionedlogdata"."waf_logs_transformed" where year='2018' and month='12';

Replace <your-database-name> and <your-table-name> with the appropriate values for your environment. This query converts the numerical timestamp to an actual date format using the SQL according to Presto 0.176 documentation. It should return the following results.

You can see which IP addresses hit your environment the most over any period of time. In a production environment, you would run an ETL job to re-partition this data and transform it into a columnar format optimized for queries. If you would like more information about doing that, see the Best Practices When Using Athena with AWS Glue post.

Amazon QuickSight visualization

Now that you can query your data in Athena, you can visualize the results using Amazon QuickSight. First, grant Amazon QuickSight access to the S3 bucket where your Athena query results live.

  1. In the Amazon QuickSight console, log in.
  2. Choose Admin/username, Manage QuickSight.
  3. Choose Account settings, Security & permissions.
  4. Under QuickSight access to AWS services, choose Add or remove.
  5. Choose Amazon S3, then choose Select S3 buckets.
  6. Choose the output bucket for your central AWS WAF logs. Also, choose your Athena query results bucket. The query results bucket begins with aws-athena-query-results-*.

Amazon QuickSight can now access the data sources. To set up your visualizations, follow these steps:

  1. In the QuickSight console, choose Manage data, New data set.
  2. For Source, choose Athena.
  3. Give your new dataset a name and choose Validate connection.
  4. After you validate the connection, choose Create data source.
  5. Select Use custom SQL and give your SQL query a name.
  6. Input the same query that you used earlier in Athena, and choose Confirm query.
  7. Choose Import to SPICE for quicker analytics, Visualize.

Allow Amazon QuickSight several minutes. It alerts you after completing the import.

Now that you have imported your data into your analysis, you can apply a visualization:

  1. In Amazon QuickSight, New analysis.
  2. Select the last dataset that you created earlier and choose Create analysis.
  3. At the bottom left of the screen, choose Line Chart.
  4. Drag and drop event_date to the X-Axis
  5. Drag and drop client_ip to the ValueThis should create a visualization similar to the following image.
  6. Choose the right arrow at the top left of the visualization and choose Hide “other” categories.This should modify your visualization to look like the following image.

You can also map the countries from which the requests originate, allowing you to track global access anomalies. You can do this in QuickSight by selecting the “Points on map” visualization type and choosing the country as the data point to visualize.

You can also add a count of IP addresses to see if you have any unusual access patterns originating from specific IP addresses.

Conclusion

Although Amazon ES and Amazon QuickSight offer similar final results, there are trade-offs to the technical approaches that I highlighted. If your use case requires the analysis of data in real time, then Amazon ES is more suitable for your needs. If you prefer a serverless approach that doesn’t require capacity planning or cluster management, then the solution with AWS Glue, Athena, and Amazon QuickSight is more suitable.

In this post, I described an easy way to build operational dashboards that track key metrics over time. Doing this with AWS Glue, Athena, and Amazon QuickSight relieves the heavy lifting of managing servers and infrastructure. To monitor metrics in real time instead, the Amazon ES solution provides a way to do this with little operational overhead. The key here is the adaptability of the solution: putting different services together can provide different solutions to your problems to fit your exact needs.

For more information and use cases, see the following resources:

Hopefully, you have found this post informative and the proposed solutions intriguing. As always, AWS welcomes all feedback or comment.

 


About the Authors

Aaron Franco is a solutions architect at Amazon Web Services .

 

 

 

 

 

 

 

 

Query your data created on-premises using Amazon Athena and AWS Storage Gateway

Post Syndicated from James Forrester original https://aws.amazon.com/blogs/big-data/query-your-data-created-on-premises-using-amazon-athena-and-aws-storage-gateway/

Enterprise customers have to maintain, protect, and provide access to the petabytes of data they produce in their data centers every day. Traditionally, this involves a set of complex, interrelated systems to store the raw data on Network Attached Storage (NAS), Storage Area Networks (SAN), or Direct Attached Storage (DAS), and to transform it and to load it into relational databases to support querying and analysis activities. This is commonly known as Extract Transform and Load or ETL.

Each of these systems must be separately maintained, often by separate teams: DBAs for the databases, systems engineers for the underlying physical infrastructure, and others. At AWS, we’re constantly looking at ways to invent and simplify on behalf of our customers. This post looks at using a combination of AWS technology that can be deployed in customers’ data centers (AWS Storage Gateway) and serverless, cloud-native technology (Amazon Athena) to simplify the process of querying critical data generated on-premises.

Customers using popular enterprise analysis tools, such as Tableau, to analyze their data rely on ODBC or JDBC to connect to and run queries against their data. Conversely, file systems use protocols like SMB or NFS to read and write files. Until now, it’s often been necessary to translate data from its raw format (often text files) into a relational database in order to allow analysis on it. Enter: AWS Storage Gateway and Amazon Athena.

In this blog post, I use this architecture to demonstrate the combined capabilities of Storage Gateway and Athena. AWS Storage Gateway is a hybrid storage service that enables your on-premises applications to seamlessly use AWS cloud storage. The File Gateway configuration of the AWS Storage Gateway offers you a seamless way to connect to the cloud in order to store application data files and backup images as durable objects on Amazon S3 cloud storage. File gateway offers SMB or NFS-based access to data in Amazon S3 with local caching, and files are stored and billed as S3 objects. Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Let’s walk through an example with ACME Corp. ACME is a fictitious, but representative enterprise that wants to store, protect, and analyze the data it receives from millions of IoT sensors around the world.

The figure below gives a high-level view of how data flows between each step in ACME’s workflow in the proposed solution. Once this solution is configured, the flow of data into the hands of ACME’s analysts is fully automated with no manual intervention required.

Today, ACME receives a daily file from each sensor via FTP in text (comma-separated) format. These files share a common set of columns, and the files are stored on an enterprise NAS device behind the FTP server. The NAS device is replicated to a secondary facility for disaster recovery purposes on a daily basis.

At the end of each day, an ETL process runs, reads each text file, and loads it to a relational database table with a similar column structure. ACME analysts receive an email in their in-boxes when the load process is complete, allowing them to begin their analysis of the previous day’s activities provided there were no issues with the load. In the event of a load issue, operations staff are paged, which can delay the start of the analysts’ day while the problem is resolved.

In the event of a NAS failure, the prior day’s data must be replayed into the FTP server — a costly and time-consuming process. ACME’s hypothetical Recovery Time Objective for the analysis activities in the event of a database failure is four hours; their Recovery Point Objective for the data is up to one day. Operations personnel must maintain FTP servers, the NAS environment and Database servers.

Without making changes to ACME’s FTP process, which they wish to maintain in its current state, our first step is to deploy a File Gateway on their VMware infrastructure to replace ACME’s existing NAS. Let me quickly demonstrate how you can setup File Gateway for testing purposes in your own Amazon EC2 environment.

Step 1: From the AWS Management Console, select “Storage Gateway,” then select “Create Gateway:”

Step 2: Select the “File Gateway” gateway type and hit “Next:”

Step 3: Under “Select host platform,” choose “Amazon EC2” and follow the on-screen instructions to launch a Gateway instance:

After configuring and testing the gateway, it is mounted to the FTP server in place of the existing NAS. Here’s ACME’s S3 bucket, where ACME can see the data from the IoT sensors is now appearing in Amazon S3:

Here we can see the contents of the configured S3 bucket with the object keys presented as files to the Windows machine, and hence accessible in Windows Explorer:

Here’s what the File Gateway configuration looks like in ACME’s account. We can see that the gateway we created, AthenaGateway, is up and running, up to date, and mapped to the file share storage resource:

More information on configuring a File Gateway is available here: Creating a File Gateway.

The next step is to configure Amazon Athena. Using the AWS Console, we create a new Athena database and table pointing to ACME’s S3 bucket to which File Gateway is writing, with a table definition representing the columns in the data.

ACME’s policies call for the data to be encrypted at rest, and File Gateway supports encryption via KMS when writing data to the S3 bucket. Athena supports a range of Amazon S3 encryption options, both for encrypted datasets in Amazon S3 and for encrypted query results.

These options encrypt data at rest in Amazon S3. Regardless of whether you use these options, transport layer security (TLS) encrypts objects in-transit between Athena resources and between Athena and Amazon S3. Query results stream to JDBC clients as plain text and are encrypted using TLS. We then run a test query in the Athena console to verify that data is being returned correctly. As new data is received by the File Gateway, it is automatically added to S3, and automatically included in Athena’s query scope. Now, we are going to create an Athena database using AWS Glue; this is to make ACME’s IoT device data in S3 via the File Gateway accessible for querying via Athena.

Glue is a fully managed ETL (extract, transform, and load) service that makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it reliably between various data stores. AWS Glue consists of a central metadata repository known as the AWS Glue Data Catalog, an ETL engine that automatically generates Python or Scala code, and a flexible scheduler that handles dependency resolution, job monitoring, and retries. AWS Glue is serverless, so there’s no infrastructure to set up or manage.

First, we open the Glue home page in the AWS Management Console, then select “Add tables using a crawler,” and follow the steps described, referencing your S3 bucket and prefix when asked. Documentation on configuring a Glue crawler is here:

Once the crawler is configured, run it. It will crawl your data in S3 and flag once completed:

Next, open the Athena home page in the AWS Management Console:

In the Athena home page, you’ll now see the database and tables created by Glue. Here is Athena, configured to point to the sensor data in S3 and running a test query against it. The test query we will use is as follows:

SELECT col1, count(col1)
FROM acmesensordata
WHERE (col3 > 50
	AND col3 < 60)
GROUP BY col1
LIMIT 100

This sample query scans all ACME’s data to count the top 100 cities with sensors that have emitted values in the range between 50 and 60, and reports how many such data points have been emitted.

The final step is to redirect ACME’s Tableau environment to point to Athena’s ODBC endpoint. Tableau’s ODBC configuration is managed centrally by ACME, and the necessary details are swapped to point to Athena in place of the existing on-premise relational database.

When you start Tableau, under “Connect,” you can see the file and database types that are supported by Tableau Desktop. Select “More” to see the complete list. Tableau considers ODBC (Open Database Connectivity) as a standard way to connect to a database. You can connect Tableau to your data using the ODBC driver for Amazon Athena and the Tableau Other Databases (ODBC) connector. Tableau’s complete documentation for connecting to ODBC is available here.

Let’s recap what we’ve changed, and the technology and end-user impact.

  • We have replaced ACME’s on-premises NAS with AWS Storage Gateway backed by an S3 bucket, and configured their FTP server to use the File Gateway’s file share in place of their existing one.
  • We have configured Storage Gateway as a File Gateway to provide access to the customer S3 bucket as a NAS. Their data is now in S3.
  • We have configured a serverless Amazon Athena database to mimic the previous relational database, and exposed an ODBC endpoint to this database.
  • We have re-configured ACME’s Tableau environment to point to this ODBC endpoint. Since the relational database in this scenario was only used to service ad-hoc SQL queries, it is no longer needed.

If there are no other dependencies, ACME can now decommission the on-premises ETL, relational database, and NAS infrastructure that were dedicated to supporting this scenario. Aside from the FTP servers and the Storage Gateway Virtual Machine hosts, there are now no servers to manage that support this scenario either.

End-user analysts working with this data no longer need to wait until start of day to begin their analysis. New sensor data arrives in the Athena S3 folder shortly after FTP delivery from the sensors, and is available for query immediately. The removal of the ETL and relational database infrastructure reduces the potential points of failure in the architecture, and in the event of a disaster, an Athena endpoint in a second AWS Region (backed by S3 Cross Region Replication) makes the data available to Tableau as soon as replication completes. Because S3 has the ability to trigger events when new data arrives, analysts can now be notified when data from particular groups of sensors becomes available, allowing them to begin their work at the earliest possible moment.

Data remains cached on the local gateway, allowing for extremely rapid access by other on-premise high-performance computing, big data, or other applications. For high availability, ACME has the ability to rapidly launch a second storage gateway instance on their existing VMware infrastructure should the primary fail. A further refinement would be to use the NotifyWhenUploaded functionality in AWS Storage Gateway to provide CloudWatch Events when groups of data are uploaded to enable batch processing.

And that’s it!

Summary

For our many enterprise customers, who deal with complex architectures for these types of hybrid cloud scenarios, the combination of AWS Storage Gateway and Amazon Athena can help simplify and lower costs while enabling on-premise, cloud native and hybrid scenarios across their application portfolios.

If you have any feedback or questions, please feel free to leave a comment.

 


About the Author

James Forrester is Head of Technology for AWS Global Accounts. He works with customers around the world to provide thought leadership on the transformative value, applicability and usage of the full breadth of AWS services.

 

 

Separating queries and managing costs using Amazon Athena workgroups

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/separating-queries-and-managing-costs-using-amazon-athena-workgroups/

Amazon Athena is a serverless query engine for data on Amazon S3. Many customers use Athena to query application and service logs, schedule automated reports, and integrate with their applications, enabling new analytics-based capabilities.

Different types of users rely on Athena, including business analysts, data scientists, security, and operations engineers. But how do you separate and manage these workloads so that users get the best experience while minimizing costs?

In this post, I show you how to use workgroups to do the following:

  • Separate workloads.
  • Control user access.
  • Manage query usage and costs.

Separate workloads

By default, all Athena queries execute in the primary workgroup.  As an administrator, you can create new workgroups to separate different types of workloads.  Administrators commonly turn to workgroups to separate analysts running ad hoc queries from automated reports.  Here’s how to build out that separation.

First create two workgroups, one for ad hoc users (ad-hoc-users) and another for automated reports (reporting).

Next, select a specific output location. All queries executed inside this workgroup save their results to this output location. Routing results to a single secure location helps make sure users only access data they are permitted to see. You can also enforce encryption of query results in S3 by selecting the appropriate encryption configuration.

Workgroups also help you simplify the onboarding of new users to Athena. By selecting override client-side settings, you enforce a predefined configuration on all queries within a workgroup. Users no longer have to configure a query results output location or S3 encryption keys. These settings default to the parameters defined for the workgroup where those queries execute. Additionally, each workgroup maintains a unique query history and saved query inventory, making queries easier for you to track down.

Finally, when creating a workgroup, you can add up to 50 key-value pair tags to help identify your workgroup resources. Tags are also useful when attempting to allocate Athena costs between groups of users. Create Name and Dept tags for the ad-hoc-users and reporting workgroups with their name and department association.

Control user access to workgroups

Now that you have two workgroups defined, ad-hoc-users and reporting, you must control who can use and update them.  Remember that workgroups are IAM resources and therefore have an ARN. You can use this ARN in the IAM policy that you associate with your users.  In this example, create a single IAM user representing the team of ad hoc users and add the individual to an IAM group. The group contains a policy that enforces what actions these users can perform.

Start by reviewing IAM Policies for Accessing Workgroups and Workgroup Example Policies to familiarize yourself with policy options. Use the following IAM policy to set up permissions for your analyst user. Grant this user only the permissions required for working in the ad-hoc-users workgroup. Make sure that you tweak this policy to match your exact needs:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "athena:ListWorkGroups"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "athena:StartQueryExecution",
                "athena:GetQueryResults",
                "athena:DeleteNamedQuery",
                "athena:GetNamedQuery",
                "athena:ListQueryExecutions",
                "athena:StopQueryExecution",
                "athena:GetQueryResultsStream",
                "athena:GetQueryExecutions",
                "athena:ListNamedQueries",
                "athena:CreateNamedQuery",
                "athena:GetQueryExecution",
                "athena:BatchGetNamedQuery",
                "athena:BatchGetQueryExecution",
                "athena:GetWorkGroup",
                "athena:ListTagsForResource"
            ],
            "Resource": "arn:aws:athena:us-east-1:112233445566:workgroup/ad-hoc-users"
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObjectAcl",
                "s3:GetObject",
                "s3:ListBucket",
                "s3:ListMultipartUploadParts"
            ],
            "Resource": "arn:aws:s3:::demo/workgroups/adhocusers/*"
        },
{
            "Effect": "Allow",
            "Action": [
                "glue:Get*"
            ],
            "Resource": [
                "arn:aws:glue:us-east-1:112233445566:catalog",
                "arn:aws:glue:us-east-1:112233445566:database/amazon",
                "arn:aws:glue:us-east-1:112233445566:table/amazon/*"
            ]
        }
    ]
}

Now your analyst user can execute queries only in the ad-hoc-users workgroup. The analyst user can switch to other workgroups, but they lose access when they try to perform any action. They are further restricted to list and query only those tables that belong to the Amazon database. For more information about controlling access to AWS Glue resources such as databases and tables, see AWS Glue Resource Policies for Access Control.

The following screenshot shows what the analyst user sees in the Athena console:

I’ve created a simple Node.js tool that executes SQL queries stored as files in a given directory. You can find my Athena test runner code in the athena_test_runner GitHub repo. You can use this code to simulate a reporting tool, after configuring it to use a workgroup. To do that, create an IAM role with permissions like those previously defined for the analyst user. This time, restrict access to the reporting workgroup.

The following JavaScript code example shows how to select a workgroup programmatically when executing queries:

function executeQueries(files) {
    params = 
    {
      "QueryString": "", 
      "ResultConfiguration": { 
        "OutputLocation": ""
      },
      "QueryExecutionContext": {
        "Database": "default"
      },
      "WorkGroup":"reporting"
    }
 
    params.QueryString = "SELECT * FROM amazon.final_parquet LIMIT 10"
    return new Promise((resolve, reject) => {
        athena.startQueryExecution(params, (err, results) => {
            if (err) {
                reject(err.message)
            } else {
                resolve(results)
            }
        })
    })
}

Run sample automated reports under the reporting workgroup, with the following command:

node index.js testsuite/

Query histories remain isolated between workgroups. A user logging into the Athena console as an analyst using the ad-hoc-users workgroup doesn’t see any automated reports that you ran under the reporting workgroup.

Managing query usage and cost

You have two workgroups configured: one for ad hoc users and another for automated reports. Now, you must safeguard against bad queries. In this use case, two potential situations for query usage should be monitored and controlled:

  • Make sure that users don’t run queries that scan more data than allowed by their budget.
  • Safeguard against automated script bugs that could cause indefinite query retirement.

First, configure data usage controls for your ad-hoc-users workgroup. There are two types of data usage controls: per-query and per-workgroup.

Set the per-query control for analysts to be 1 GB. This control cancels any query run in the ad-hoc-users workgroup that tries to scan more than 1 GB.

To observe this limit in action, choose Update, return to the query editor, and run a query that would scan more than 1 GB. This query triggers the error message, “Query cancelled! : Bytes scanned limit was exceeded”. Remember that you incur charges for data the query scanned up to the point of cancellation. In this case, you incur charges for 1 GB of data.

Now, switch to your reporting workgroup. For this workload, you’re not worried about individual queries scanning too much data. However, you want to control the aggregate amount of data scanned of all queries in this workgroup.

Create a per-workload data usage control for the reporting workgroup. You can configure the maximum amount of data scanned by all queries in the workgroup during a specific period.

For the automated reporting workload, you probably have a good idea of how long the process should take and the total amount of data that queries scan during this time. You only have a few reports to run, so you can expect them to run in a few minutes, only scanning a few megabytes of data. Begin by setting up a low watermark alarm to notify you when your queries have scanned more data than you would expect in five minutes. The following example is for demo purposes only. In most cases, this period would be longer. I configured the alarm to send a notification to an Amazon SNS topic that I created.

To validate the alarm, I made a minor change to my test queries, causing them to scan more data. This change triggered the SNS alarm, shown in the following Amazon CloudWatch dashboard:

Next, create a high watermark alarm that is triggered when the queries in your reporting workgroup exceed 1 GB of data over 15 minutes. In this case, the alarm triggers an AWS Lambda function that disables the workgroup, making sure that no additional queries execute in it. This alarm protects you from incurring faulty automation code or runaway query costs.

Before creating the data usage control, create a Node.js Lambda function to disable the workgroup. Paste in the following code:

exports.handler = async (event) => {
    const AWS = require('aws-sdk')
    let athena = new AWS.Athena({region: 'us-east-1'})
 
    let msg = JSON.parse(event.Records[0].Sns.Message)
    let wgname = msg.Trigger.Dimensions.filter((i)=>i.name=='WorkGroup')[0].value
    
    athena.updateWorkGroup({WorkGroup: wgname, State: 'DISABLED'})
 
    const response = {
        statusCode: 200,
        body: JSON.stringify(`Workgroup ${wgname} has been disabled`),
    };
    return response;
}

This code grabs the workgroup name from the SNS message body and calls the UpdateWorkGroup API action with the name and the state of DISABLED. The Athena API requires the most recent version of the AWS SDK. When you create the Lambda bundle, include the latest AWS SDK version in that bundle.

Next, create a new SNS topic and a subscription. For Protocol, select AWS Lambda. Then, select the Lambda function that you created in the previous step.

In the Athena console, create the second alarm, 1 GB for 15 min., and point it to the SNS topic that you created earlier. When triggered, this SNS topic calls the Lambda function that disables the reporting workgroup. No more queries can execute in this workgroup. You see this error message in the console when a workgroup is disabled:

Athena exposes other aggregated metrics per workgroup under the AWS/Athena namespace in CloudWatch, such as the query status and the query type (DDL or DML) per workgroup. To learn more, see Monitoring Athena Queries with CloudWatch Metrics.

Cost allocation tags

When you created your ad-hoc-users and reporting workgroups, you added Name and Dept tags. These tags can be used in your Billing and Cost Management console to determine the usage per workgroup.

Summary

In this post, you learned how to use workgroups in Athena to isolate different query workloads, manage access, and define data usage controls to protect yourself from runaway queries. Metrics exposed to CloudWatch help you monitor query performance and make sure that your users are getting the best experience possible. For more details, see Using Workgroups to Control Query Access.

About the Author

Roy Hasson is a Global Business Development Manager for AWS Analytics. He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan, cheering his team on and hanging out with his family.

 

 

 

Extracting Salesforce.com data using AWS Glue and analyzing with Amazon Athena

Post Syndicated from Behram Irani original https://aws.amazon.com/blogs/big-data/extracting-salesforce-com-data-using-aws-glue-and-analyzing-with-amazon-athena/

Salesforce is a popular and widely used customer relationship management (CRM) platform. It lets you store and manage prospect and customer information—like contact info, accounts, leads, and sales opportunities—in one central location. You can derive a lot of useful information by combining the prospect information stored in Salesforce with other structured and unstructured data in your data lake.

In this post, I show you how to use AWS Glue to extract data from a Salesforce.com account object and save it to Amazon S3. You then use Amazon Athena to generate a report by joining the account object data from Salesforce.com with the orders data from a separate order management system.

Preparing your data

I signed up for a free Salesforce.com account, which comes with a handful of sample records populated with many of the Salesforce.com objects. You can use your organization’s development Salesforce.com account and pull data from multiple objects at the same time by modifying the SOQL query in your AWS Glue code. To demonstrate extracting data from these objects, only use the Account object to keep the query simple.

To demonstrate joining Salesforce.com data with data from another system using Amazon Athena, you create a sample data file showing orders coming from an order management system.

Setting up an AWS Glue job

Use the open source springml library to connect Apache Spark with Salesforce.com. The library comes with plenty of handy features that allow you to read, write, and update Salesforce.com objects using the Apache Spark framework.

You can compile the jars from the springml GitHub repo or download with dependencies from the Maven repo. Upload these JAR files to your S3 bucket and make a note of the full path for each.

force-partner-api-40.0.0.jar
force-wsc-40.0.0.jar
salesforce-wave-api-1.0.9.jar
spark-salesforce_2.11-1.1.1.jar 

In the AWS Management Console, choose AWS Glue in the Region where you want to run the service. Choose Jobs, Add Job. Follow the wizard by filling in the necessary details.

Under the Security configuration, script libraries, and job parameters (optional) section, for Dependent jars path, list the paths for the four JAR files listed previously, separated by commas.

For this job, I allocated Maximum capacity as “2.” This field defines the number of AWS Glue data processing units (DPUs) that the system can allocate when this job runs. A DPU is a relative measure of processing power that consists of four vCPUs of compute capacity and 16 GB of memory. When you specify an Apache Spark ETL job, you can allocate 2–100 DPUs. The default is 10 DPUs.

Execute the AWS Glue job to extract data from the Salesforce.com object

The following Scala code extracts a few fields from the Account object in Salesforce.com and writes them as a table to S3 in Apache Parquet file format.

import com.amazonaws.services.glue.util.GlueArgParser  
import com.amazonaws.services.glue.util.Job  
import com.amazonaws.services.glue.util.JsonOptions  
import com.amazonaws.services.glue.{DynamicFrame, GlueContext}  
import org.apache.spark.SparkContext  
import scala.collection.JavaConverters.mapAsJavaMapConverter  
  
object SfdcExtractData {  
  def main(sysArgs: Array[String]) {  
      
    val sparkContext: SparkContext = new SparkContext()  
    val glueContext: GlueContext = new GlueContext(sparkContext)  
    val sparkSession = glueContext.getSparkSession  
      
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)  
    Job.init(args("JOB_NAME"), glueContext, args.asJava)  
      
    val soql = "select name, accountnumber, industry, type, billingaddress, sic from account"  
    val df = sparkSession.read.format("com.springml.spark.salesforce").option("soql",soql).option("username", "username").option("password","password+securitytoken").load()
     
    val datasource0 = DynamicFrame(df, glueContext).withName("datasource0").withTransformationContext("datasource0")  
        
    val datasink1 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> "s3://replace-with-your-s3-bucket/sfdc-output", "partitionKeys" -> Seq("Industry"))), format = "parquet", transformationContext = "datasink1").writeDynamicFrame(datasource0)  
  
    Job.commit()  
  }  
}

This code relies on a few key components:

val df = sparkSession.read.format("com.springml.spark.salesforce").option("soql",soql).option("username", "username").option("password","password+securitytoken").load()

This code example establishes a Salesforce.com connection, submits a SOQL-compatible query for the Account object, and loads the returned records into a Spark DataFrame. Don’t forget to replace username with your Salesforce.com username and password as a combination of your password and the security token of your profile.

Best practices suggest storing and retrieving the password using AWS Secrets Manager instead of hardcoding it. For simplicity, I left it hardcoded in this example.

Keep in mind that this query is simple and returns only a handful of records. For large volumes of data, you might want to limit the results returned by your query or use other techniques like bulk query and chunking. Check the springml page to learn more about the functionality that Salesforce.com supports.

val datasink1 = glueContext.getSinkWithFormat(connectionType = "s3", options = JsonOptions(Map("path" -> "s3://replace-with-your-s3-bucket/sfdc-output", "partitionKeys" -> Seq("Industry"))), format = "parquet", transformationContext = "datasink1").writeDynamicFrame(datasource0)  

This code does all the writing to your S3 bucket. In this example, you want to aggregate data by Industry segments. Because of that, you should partition the data by the Industry field.

Also, the code writes in Parquet format. Athena charges you by the amount of data scanned per query. You can save on costs and get better performance when you partition the data, compress data, or convert it to columnar formats like Parquet.

After you run this code in AWS Glue, you can go to your S3 bucket where the sink points and find something like the following structure:

Query the data with Athena

After the code drops your Salesforce.com data into your S3 bucket with the correct partition and format, AWS Glue can crawl the dataset. It creates the appropriate schema in the AWS Glue Data Catalog. Wait for AWS Glue to create the table. Then, Athena can query the table and join with other tables in the catalog.

First, use the AWS Glue crawler to discover the Salesforce.com Account data that you previously stored in the S3 bucket. For details about how to use the crawler, see populating the AWS Glue Data Catalog.

In this example, point the crawler to the S3 output prefix where you stored your Salesforce.com Account data, and run it. The crawler creates a new catalog table before it finally stops.

The AWS Glue Data Catalog table automatically captures all the column names, types, and partition column used, and stores everything in your S3 bucket in Parquet file format. You can now query this table with Athena. A simple SELECT query on that table shows the results of scanning the data from the S3 bucket.

Now your Salesforce.com data is ready for Athena to query. For this example, join this data with the sample orders from this sample order management system in S3. After your AWS Glue crawler finishes cataloging the sample orders data, Athena can query it.

Finally, use Athena to join both tables in an aggregation query.

Conclusion

In this post, I showed a simple example for extracting any Salesforce.com object data using AWS Glue and Apache Spark, and saving it to S3. You can then catalog your S3 data in AWS Glue Data Catalog, allowing Athena to query it. With this mechanism in place, you can easily incorporate Salesforce data into your AWS based data lake.

If you have comments or feedback, please leave them below.

 


About the Author

 

Behram Irani is a Data Architect at Amazon Web Services.

 

 

 

 

Detect fraudulent calls using Amazon QuickSight ML insights

Post Syndicated from Guy Ben Baruch original https://aws.amazon.com/blogs/big-data/detect-fraudulent-calls-using-amazon-quicksight-ml-insights/

The financial impact of fraud in any industry is massive. According to the Financial Times article Fraud Costs Telecoms Industry $17bn a Year (paid subscription required), fraud costs the telecommunications industry $17 billion in lost revenues every year.

Fraudsters constantly look for new technologies and devise new techniques. This changes fraud patterns and makes detection difficult. Companies commonly combat this with a rules-based fraud detection system. However, once the fraudsters realize their current techniques or tools are being identified, they quickly find a way around it. Also, rules-based detection systems tend to struggle and slow down with a lot of data. This makes it difficult to detect fraud and act quickly, resulting in loss of revenue.

Overview

There are several AWS services that implement anomaly detection and could be used to combat fraud, but lets focus on the following three:

When trying to detect fraud, there are two high-level challenges:

  • Scale – The amount of data to be analyzed. For example, each call generates a call detail record (CDR) event. These CDRs include many pieces of information such as originating and terminating phone numbers, and duration of call. Multiply these CDR events times the number of telephone calls placed each day and you can get an idea of the scale that operators must manage.
  • Machine learning knowledge and skill – The right set of skills to help solve business problems with machine learning. Developing these skills or hiring qualified data scientists with adequate domain knowledge is not simple.

Introducing Amazon QuickSight ML Insights

Amazon QuickSight is a fast, cloud-powered BI service that makes it easy for everyone in an organization to get business insights from their data through rich, interactive dashboards. With pay-per-session pricing and a dashboard that can be embedded into your applications, BI is now even more cost-effective and accessible to everyone.

However, as the volume of data that customers generate grows daily, it’s becoming more challenging to harness their data for business insights. This is where machine learning comes in. Amazon is a pioneer in using machine learning to automate and scale various aspects of business analytics in the supply chain, marketing, retail, and finance.

ML Insights integrates proven Amazon technologies into Amazon QuickSight to provide customers with ML-powered insights beyond visualizations.

  • ML-powered anomaly detection to help customers uncover hidden insights by continuously analyzing across billions of data points.
  • ML-powered forecasting and what-if analysis to predict key business metrics with point-and-click simplicity.
  • Auto-narratives to help customers tell the story of their dashboard in a plain-language narrative.

In this post, I demonstrate how a Telecom provider with little to no ML expertise can use Amazon QuickSight ML capabilities to detect fraudulent calls.

Prerequisites

To implement this solution, you need the following resources:

  • Amazon S3 to stage a ‘ribbon’ call detail record sample in a CSV format.
  • AWS Glue running an ETL job in PySpark.
  • AWS Glue crawlers to discover the schema of the tables and update the AWS Glue Data Catalog.
  • Amazon Athena to query the Amazon QuickSight dataset.
  • Amazon QuickSight to build visualizations and perform anomaly detection using ML Insights.

Diagram of fraudulent call-detecting architecture, using a PySpark script to prepare the data and transform it into Parquet and an AWS Glue crawler to build the AWS Glue Data Catalog.

The dataset

For this post, I use a synthetic dataset, thanks to Ribbon Communications. The data was generated by call test generators, and is not customer or sensitive data.

Inspecting the data

The example below is a typical CDR. The STOP CDR shown below is generated after a call has been terminated.


As you can see, there are a lot of values here. Most of them are not relevant for fraud identification or prevention.

Revenue shared fraud

Revenue shared fraud is one of the most common fraud schemes threatening the telecom industry today. It involves using fraudulent or stolen numbers to repeatedly call a premium rate B-number, who then shares the cash generated with the fraudster.

Say that you’d like to detect national and international revenue share fraud using Amazon QuickSight ML. Consider the typical traits of a revenue share fraud phone call. The pattern for revenue share fraud is multiple A-numbers calling the same B-number or a range of B-numbers with the same prefix. The call duration is usually higher than average and could be up to two hours, which is the maximum length of time international switches allow. Generally, the calls originate from one cell or a group of cells.

One SIM may make short test calls to a variety of B-numbers as a precursor to the fraud itself, which most often happens when the risk of detection is lowest, for example, Friday night, weekends, or holidays. Conference calling may be used to make several concurrent calls from one A-number.

Often, SIMs used for this type of fraud are sold or activated in bulk from the same distributor or group of distributors. SIMs could be topped up using fraudulent online or IVR payments, such as using stolen credit card numbers. Both PAYG credit and bundles may be used.Based on the above use case, the following pieces of information are most relevant to detecting fraud.

  • Call duration
  • Calling number (A number)
  • Called number (B number)
  • Start time of the call
  • Accounting ID

You can use this reference to help identify those fields in a CDR.

Figure 2: Decoded CDR data, highlighting the relevant fields.

I identified the columns that I need out of 235 columns in the CDR.

Inspecting the raw sample data, I quickly see that it’s missing a header.

To make life easier, I converted the raw CSV data, added the column names, and converted to Parquet.

Discovering the data

In the AWS Glue console, set up a crawler and name it CDR_CRAWLER.

Point the crawler to s3://telco-dest-bucket/blog where the Parquet CDR data resides.

Next, create a new IAM role to be used by the AWS Glue crawler.

For Frequency, leave the default definition of Run on Demand.

Next, choose Add database and define the name of the database. This database contains the table discovered by the AWS Glue crawler.

Choose next and review the crawler settings. When you’re satisfied, choose Finish.

Next, choose Crawlers, select the crawler that you just created (CDR_CRAWLER), and choose Run crawler.

The AWS Glue crawler starts crawling the database. This can take one minute or more to complete.

When it’s complete, under Data catalog, choose Databases.  You should be able to see the new database created by the AWS Glue crawler. In this case, the name of the database is blog.

To view the tables created under this database, select the relevant database and choose Tables. The crawler’s table also points to the location of the Parquet format CDRs.

To see the table’s schema, select the table created by the crawler.

Data preparation

You have defined the relevant dimensions to use in the ML model to detect fraud. Now, you can use a PySpark script that I built earlier using an Amazon SageMaker notebook and an AWS Glue endpoint. The script covers the following tasks:

  • Reduce the dataset and focus only on the relevant columns.
  • Create a timestamp column, which you need for creating an analysis using Amazon QuickSight.
  • Transform files from CSV to Parquet for improved performance.

You can run the PySpark script on the raw CSV format of the CDRs that you are using. Here is the location of the raw CSV format:

s3:/telco-source-bucket/machine-learning-for-all/v1.0.0/data/cdr-stop/cdr_stop.csv

Here is the PySpark script that I created.

import sys    
from awsglue.transforms import *    
from awsglue.utils import getResolvedOptions    
from pyspark.context import SparkContext    
from awsglue.context import GlueContext    
from awsglue.job import Job    
import pyspark.sql.functions as fn    
from awsglue.dynamicframe import DynamicFrame    
    
    
sc = SparkContext.getOrCreate()    
glueContext = GlueContext(sc)    
spark = glueContext.spark_session    
    
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "demo_ml", table_name = "cdr_stop_csv", transformation_ctx = "datasource0")    
#apply mapping from source table to destination , we pick only the relevant columns     
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col2", "string", "Accounting_ID", "string"), ("col13", "long", "Call_service_duration", "long"), ("col5", "string", "Start_Time_(MM/DD/YYYY)", "string"), ("col6", "string", "Start_Time_(HH/MM/SSs)", "string"), ("col19", "long", "Calling number", "string"), ("col20", "long", "Called number", "string")], transformation_ctx = "applymapping1")    
    
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_struct", transformation_ctx = "resolvechoice2")    
resolvechoice2.printSchema()    
    
resolvechoice3 = ResolveChoice.apply(frame = resolvechoice2, choice = "MATCH_CATALOG", database = "demo_ml", table_name = "cdr_stop_csv", transformation_ctx = "resolvechoice3")    
resolvechoice3.printSchema()    
    
customDF = resolvechoice3.toDF()    
#create timestamp column    
customDF = customDF.withColumn('timestamp', fn.concat(fn.col("Start_Time_(MM/DD/YYYY)"),fn.lit(" "),fn.col("Start_Time_(HH/MM/SSs)")))    
    
#create timestamp2 column which is a substring of timestamp column    
customDF = customDF.withColumn('timestamp2',fn.substring(fn.col("timestamp"),1,19))    
#create Date column     
customDF =customDF.withColumn("Date",fn.unix_timestamp(fn.col("timestamp2"),"MM/dd/yyyy HH:mm:ss").cast("timestamp"))    
    
#remove temporary fields     
customDF = customDF.drop('timestamp','timestamp2')    
    
customDynamicFrame = DynamicFrame.fromDF(customDF, glueContext, "customDF_df")    
#transform to parquet format and land in S3 path    
datasink4 = glueContext.write_dynamic_frame.from_options(frame = customDynamicFrame, connection_type = "s3", connection_options = {"path": "s3://telco-dest-bucket/blog/"}, format = "parquet", transformation_ctx = "datasink4")    

The dataset has been cataloged in AWS Glue Data Catalog and is queryable using Athena.

Amazon QuickSight and anomaly detection

Next, build out anomaly detection using Amazon QuickSight. To get started, follow these steps.

  1. In the Amazon QuickSight console, choose new analysis.
  2. click on create new data set
  3. select Athena
  4. enter a data source name
  5. click on create data source
  6. select from the drop down list the relevant database and table that were created by the AWS Glue crawlers and click on select
  7. select directly query your data and click visualize

Visualizing the data using Amazon QuickSight

  1. Under visual types, choose Line chart.
  2. Drag call_service_duration to the Value field well.
  3. Drag timestamp_new to the X axis field well.

Amazon QuickSight generates a dashboard, as in the following screenshot.

The x-axis is the timestamp. By default, it’s based on the aggregates of one day. This can be changed by choosing a different value.

Because I currently define the timestamp to look on one-day aggregations, the call duration is a sum of all call durations from all call records within a day. I can begin the search by looking for days where the total call duration is high.

Anomaly detection

Now look at how to start using the ML insights anomaly detection feature.

  1. On the top of the Insights panel, choose Add anomaly to sheet. This creates an insights visual for anomaly detection.
  2. On the top of the screen, choose Field Wells and add at least one field to the Categories, as in the following example. I added the calling/called number, as those become relevant for fraud use cases; for example, one A-number calling multiple B-numbers or multiple A-numbers calling B-numbers.
    The categories represent the dimensional values by which Amazon QuickSight splits the metric. For example, you can analyze anomalies on sales across all product categories and product SKUs—assuming there are 10 product categories, each with 10 product SKUs. Amazon QuickSight splits the metric by the 100 unique combinations and runs anomaly detection on each of the split metric.
  3. To configure the anomaly detection job, choose Get Started.
  4. On the anomaly detection configuration screen, set up the following options:
  • Analyze all combinations of these categories—By default, if you have selected three categories, Amazon QuickSight runs anomaly detection on the following combinations hierarchically: A, AB, ABC. If you select this option, QuickSight analyzes all combinations: A, AB, ABC, BC, AC. If your data is not hierarchical, check this option.
  • Schedule—Set this option to run anomaly detection on your data hourly, daily, weekly, or monthly, depending on your data and needs. For Start schedule on and Timezone, enter values and choose OK.Important: The schedule does not take effect until you publish the analysis as a dashboard. Within the analysis, you have the option to run the anomaly detection manually (without the schedule).Contribution analysis on anomaly – You can select up to four additional dimensions for Amazon QuickSight to analyze the top contributors when an anomaly is detected. For example, Amazon QuickSight can show you the top customers that contributed to a spike in sale. In my current example, I added one additional dimension: the accounting ID. If you think about a telecom fraud case, you can also consider fields like charging time or cell ID as additional dimensions.
  1. After setting the configuration, choose Run Now to execute the job manually, which includes the “Detecting anomalies… This may take a while…” message. Depending on the size of your dataset, this may take a few minutes or up to an hour.
  2. When the anomaly detection job is complete, any anomalies are called out in the insights visual. By default, only the top anomalies for the latest time period in the data are shown in the insights visuals.

    Anomaly detection reveals several B numbers being called from multiple A numbers with a high call service duration on August 29, 2018. That looks interesting!
  3. To explore all anomalies for this insight, select the menu on the top-right corner of the visual and choose Explore Anomalies.
  4. On the Anomalies detailed page, you can see all the anomalies for the latest period.
    In the view, you can see that two anomalies were detected, showing two time series.The title of the visuals represents the metric that is run on the unique combination of the categorical fields. In this case:
  • [All] | 9645000024
  • 3512000024 | [ALL]So the system detected anomalies for multiple A-numbers calling 9645000024, and 351200024 calling multiple B numbers. In both cases, it observed a high call duration. The labeled data point on the chart represents the most recent anomaly that is detected for that time series.
  1. To expose a date picker, choose show anomalies by date at the top-right corner. This chart shows the number of anomalies that were detected for each day (or hour, depending on your anomaly detection configuration). You can select a particular day to see the anomalies detected for that day.For example, selecting August 10, 2018 on the top chart shows the anomalies for that day:

    Important:
    The first 32 points in the dataset are used for training and are not scored by the anomaly detection algorithm. You may not see any anomalies on the first 32 data points.You can expand the filter controls on the top of the screen. With the filter controls, you can change the anomaly threshold to show high, medium, or low significance anomalies. You can choose to show only anomalies that are higher than expected or lower than expected. You can also filter by the categorical values that are present in your dataset to look at anomalies only for those categories.
  2. Look at the contributors columns. When you configured the anomaly detection, you defined the accounting ID as another dimension. If this were real call traffic instead of practice data, you would be able to single out specific accounting IDs that contribute to the anomaly.
  3. When you’re done, choose Back to analysis.

Summary

In this post, I explored a common fraud pattern called shared revenue fraud. I looked at how to extract the relevant data for training the anomaly detection model in Amazon QuickSight. I then used this data to detect anomalies based on call duration, calling party, and called party, looking at additional contributors like Accounting ID. The entire process used serverless technologies and little to no machine learning experience.

For more information about options and strategies, see Amazon QuickSight Announces General Availability of ML Insights.

If you have questions or suggestions, please comment below.

 


About the Author

Guy Ben Baruch is a solutions architect with Amazon Web Services.

 

 

 

Trigger cross-region replication of pre-existing objects using Amazon S3 inventory, Amazon EMR, and Amazon Athena

Post Syndicated from Michael Sambol original https://aws.amazon.com/blogs/big-data/trigger-cross-region-replication-of-pre-existing-objects-using-amazon-s3-inventory-amazon-emr-and-amazon-athena/

In Amazon Simple Storage Service (Amazon S3), you can use cross-region replication (CRR) to copy objects automatically and asynchronously across buckets in different AWS Regions. CRR is a bucket-level configuration, and it can help you meet compliance requirements and minimize latency by keeping copies of your data in different Regions. CRR replicates all objects in the source bucket, or optionally a subset, controlled by prefix and tags.

Objects that exist before you enable CRR (pre-existing objects) are not replicated. Similarly, objects might fail to replicate (failed objects) if permissions aren’t in place, either on the IAM role used for replication or the bucket policy (if the buckets are in different AWS accounts).

In our work with customers, we have seen situations where large numbers of objects aren’t replicated for the previously mentioned reasons. In this post, we show you how to trigger cross-region replication for pre-existing and failed objects.

Methodology

At a high level, our strategy is to perform a copy-in-place operation on pre-existing and failed objects. This operation uses the Amazon S3 API to copy the objects over the top of themselves, preserving tags, access control lists (ACLs), metadata, and encryption keys. The operation also resets the Replication_Status flag on the objects. This triggers cross-region replication, which then copies the objects to the destination bucket.

To accomplish this, we use the following:

  • Amazon S3 inventory to identify objects to copy in place. These objects don’t have a replication status, or they have a status of FAILED.
  • Amazon Athena and AWS Glue to expose the S3 inventory files as a table.
  • Amazon EMR to execute an Apache Spark job that queries the AWS Glue table and performs the copy-in-place operation.

Object filtering

To reduce the size of the problem (we’ve seen buckets with billions of objects!) and eliminate S3 List operations, we use Amazon S3 inventory. S3 inventory is enabled at the bucket level, and it provides a report of S3 objects. The inventory files contain the objects’ replication status: PENDING, COMPLETED, FAILED, or REPLICA. Pre-existing objects do not have a replication status in the inventory.

Interactive analysis

To simplify working with the files that are created by S3 inventory, we create a table in the AWS Glue Data Catalog. You can query this table using Amazon Athena and analyze the objects.  You can also use this table in the Spark job running on Amazon EMR to identify the objects to copy in place.

Copy-in-place execution

We use a Spark job running on Amazon EMR to perform concurrent copy-in-place operations of the S3 objects. This step allows the number of simultaneous copy operations to be scaled up. This improves performance on a large number of objects compared to doing the copy operations consecutively with a single-threaded application.

Account setup

For the purpose of this example, we created three S3 buckets. The buckets are specific to our demonstration. If you’re following along, you need to create your own buckets (with different names).

We’re using a source bucket named crr-preexisting-demo-source and a destination bucket named crr-preexisting-demo-destination. The source bucket contains the pre-existing objects and the objects with the replication status of FAILED. We store the S3 inventory files in a third bucket named crr-preexisting-demo-inventory.

The following diagram illustrates the basic setup.

You can use any bucket to store the inventory, but the bucket policy must include the following statement (change Resource and aws:SourceAccount to match yours).

{
    "Version": "2012-10-17",
    "Id": "S3InventoryPolicy",
    "Statement": [
        {
            "Sid": "S3InventoryStatement",
            "Effect": "Allow",
            "Principal": {
                "Service": "s3.amazonaws.com"
            },
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::crr-preexisting-demo-inventory/*",
            "Condition": {
                "StringEquals": {
                    "s3:x-amz-acl": "bucket-owner-full-control",
                    "aws:SourceAccount": "111111111111"
                }
            }
        }
    ]
}

In our example, we uploaded six objects to crr-preexisting-demo-source. We added three objects (preexisting-*.txt) before CRR was enabled. We also added three objects (failed-*.txt) after permissions were removed from the CRR IAM role, causing CRR to fail.

Enable S3 inventory

You need to enable S3 inventory on the source bucket. You can do this on the Amazon S3 console as follows:

On the Management tab for the source bucket, choose Inventory.

Choose Add new, and complete the settings as shown, choosing the CSV format and selecting the Replication status check box. For detailed instructions for creating an inventory, see How Do I Configure Amazon S3 Inventory? in the Amazon S3 Console User Guide.

After enabling S3 inventory, you need to wait for the inventory files to be delivered. It can take up to 48 hours to deliver the first report. If you’re following the demo, ensure that the inventory report is delivered before proceeding.

Here’s what our example inventory file looks like:

You can also look on the S3 console on the objects’ Overview tab. The pre-existing objects do not have a replication status, but the failed objects show the following:

Register the table in the AWS Glue Data Catalog using Amazon Athena

To be able to query the inventory files using SQL, first you need to create an external table in the AWS Glue Data Catalog. Open the Amazon Athena console at https://console.aws.amazon.com/athena/home.

On the Query Editor tab, run the following SQL statement. This statement registers the external table in the AWS Glue Data Catalog.

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo (
    `bucket` string,
    key string,
    replication_status string
)
PARTITIONED BY (dt string)
ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    ESCAPED BY '\\'
    LINES TERMINATED BY '\n'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
LOCATION 's3://crr-preexisting-demo-inventory/crr-preexisting-demo-source/crr-preexisting-demo/hive';

After creating the table, you need to make the AWS Glue Data Catalog aware of any existing data and partitions by adding partition metadata to the table. To do this, you use the Metastore Consistency Check utility to scan for and add partition metadata to the AWS Glue Data Catalog.

MSCK REPAIR TABLE crr_preexisting_demo;

To learn more about why this is required, see the documentation on MSCK REPAIR TABLE and data partitioning in the Amazon Athena User Guide.

Now that the table and partitions are registered in the Data Catalog, you can query the inventory files with Amazon Athena.

SELECT * FROM crr_preexisting_demo where dt='2019-02-24-04-00';

The results of the query are as follows.

The query returns all rows in the S3 inventory for a specific delivery date. You’re now ready to launch an EMR cluster to copy in place the pre-existing and failed objects.

Note: If your goal is to fix FAILED objects, make sure that you correct what caused the failure (IAM permissions or S3 bucket policies) before proceeding to the next step.

Create an EMR cluster to copy objects

To parallelize the copy-in-place operations, run a Spark job on Amazon EMR. To facilitate EMR cluster creation and EMR step submission, we wrote a bash script (available in this GitHub repository).

To run the script, clone the GitHub repo. Then launch the EMR cluster as follows:

$ git clone https://github.com/aws-samples/amazon-s3-crr-preexisting-objects
$ ./launch emr.sh

Note: Running the bash script results in AWS charges. By default, it creates two Amazon EC2 instances, one m4.xlarge and one m4.2xlarge. Auto-termination is enabled so when the cluster is finished with the in-place copies, it terminates.

The script performs the following tasks:

  1. Creates the default EMR roles (EMR_EC2_DefaultRole and EMR_DefaultRole).
  2. Uploads the files used for bootstrap actions and steps to Amazon S3 (we use crr-preexisting-demo-inventory to store these files).
  3. Creates an EMR cluster with Apache Spark installed using the create-cluster

After the cluster is provisioned:

  1. A bootstrap action installs boto3 and awscli.
  2. Two steps execute, copying the Spark application to the master node and then running the application.

The following are highlights from the Spark application. You can find the complete code for this example in the amazon-s3-crr-preexisting-objects repo on GitHub.

Here we select records from the table registered with the AWS Glue Data Catalog, filtering for objects with a replication_status of "FAILED" or “”.

query = """
        SELECT bucket, key
        FROM {}
        WHERE dt = '{}'
        AND (replication_status = '""'
        OR replication_status = '"FAILED"')
        """.format(inventory_table, inventory_date)

print('Query: {}'.format(query))

crr_failed = spark.sql(query)

We call the copy_object function for each key returned by the previous query.

def copy_object(self, bucket, key, copy_acls):
        dest_bucket = self._s3.Bucket(bucket)
        dest_obj = dest_bucket.Object(key)

        src_bucket = self._s3.Bucket(bucket)
        src_obj = src_bucket.Object(key)

        # Get the S3 Object's Storage Class, Metadata, 
        # and Server Side Encryption
        storage_class, metadata, sse_type, last_modified = \
            self._get_object_attributes(src_obj)

        # Update the Metadata so the copy will work
        metadata['forcedreplication'] = runtime

        # Get and copy the current ACL
        if copy_acls:
            src_acl = src_obj.Acl()
            src_acl.load()
            dest_acl = {
                'Grants': src_acl.grants,
                'Owner': src_acl.owner
            }

        params = {
            'CopySource': {
                'Bucket': bucket,
                'Key': key
            },
            'MetadataDirective': 'REPLACE',
            'TaggingDirective': 'COPY',
            'Metadata': metadata,
            'StorageClass': storage_class
        }

        # Set Server Side Encryption
        if sse_type == 'AES256':
            params['ServerSideEncryption'] = 'AES256'
        elif sse_type == 'aws:kms':
            kms_key = src_obj.ssekms_key_id
            params['ServerSideEncryption'] = 'aws:kms'
            params['SSEKMSKeyId'] = kms_key

        # Copy the S3 Object over the top of itself, 
        # with the Storage Class, updated Metadata, 
        # and Server Side Encryption
        result = dest_obj.copy_from(**params)

        # Put the ACL back on the Object
        if copy_acls:
            dest_obj.Acl().put(AccessControlPolicy=dest_acl)

        return {
            'CopyInPlace': 'TRUE',
            'LastModified': str(result['CopyObjectResult']['LastModified'])
        }

Note: The Spark application adds a forcedreplication key to the objects’ metadata. It does this because Amazon S3 doesn’t allow you to copy in place without changing the object or its metadata.

Verify the success of the EMR job by running a query in Amazon Athena

The Spark application outputs its results to S3. You can create another external table with Amazon Athena and register it with the AWS Glue Data Catalog. You can then query the table with Athena to ensure that the copy-in-place operation was successful.

CREATE EXTERNAL TABLE IF NOT EXISTS
crr_preexisting_demo_results (
  `bucket` string,
  key string,
  replication_status string,
  last_modified string
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY ','
  LINES TERMINATED BY '\n'
  STORED AS TEXTFILE
LOCATION 's3://crr-preexisting-demo-inventory/results';

SELECT * FROM crr_preexisting_demo_results;

The results appear as follows on the console.

Although this shows that the copy-in-place operation was successful, CRR still needs to replicate the objects. Subsequent inventory files show the objects’ replication status as COMPLETED. You can also verify on the console that preexisting-*.txt and failed-*.txt are COMPLETED.

It is worth noting that because CRR requires versioned buckets, the copy-in-place operation produces another version of the objects. You can use S3 lifecycle policies to manage noncurrent versions.

Conclusion

In this post, we showed how to use Amazon S3 inventory, Amazon Athena, the AWS Glue Data Catalog, and Amazon EMR to perform copy-in-place operations on pre-existing and failed objects at scale.

Note: Amazon S3 batch operations is an alternative for copying objects. The difference is that S3 batch operations will not check each object’s existing properties and set object ACLs, storage class, and encryption on an object-by-object basis. For more information, see Introduction to Amazon S3 Batch Operations in the Amazon S3 Console User Guide.

 


About the Authors

Michael Sambol is a senior consultant at AWS. He holds an MS in computer science from Georgia Tech. Michael enjoys working out, playing tennis, traveling, and watching Western movies.

 

 

 

 

Chauncy McCaughey is a senior data architect at AWS. His current side project is using statistical analysis of driving habits and traffic patterns to understand how he always ends up in the slow lane.

 

 

 

Easily query AWS service logs using Amazon Athena

Post Syndicated from Damon Cortesi original https://aws.amazon.com/blogs/big-data/easily-query-aws-service-logs-using-amazon-athena/

Many organizations use Amazon Simple Storage Service (Amazon S3) as a primary storage destination for a wide variety of logs including AWS service logs. One of the benefits of storing log data in Amazon S3 is that you can access it in any number of ways. One popular option is to query it using Amazon Athena, a serverless query engine for data on S3. Common use cases for querying logs are service and application troubleshooting, performance analysis, and security audits. To get the best performance and reduce query costs in Athena, we recommend following common best practices, as outlined in Top 10 Performance Tuning Tips for Amazon Athena on the AWS Big Data Blog. These best practices include converting the data to a columnar format like Apache Parquet and partitioning the resulting data in S3.

In this post, we’re open-sourcing a Python library known as Athena Glue Service Logs (AGSlogger). This library has predefined templates for parsing and optimizing the most popular log formats. The library provides a mechanism for defining schemas, managing partitions, and transforming data within an extract, transform, load (ETL) job in AWS Glue. AWS Glue is a serverless data transformation and cataloging service. You can use this library in conjunction with AWS Glue ETL jobs to enable a common framework for processing log data.

Using Python libraries with AWS Glue ETL

One of the features of AWS Glue ETL is the ability to import Python libraries into a job (as described in the documentation). We take advantage of this feature in our approach. With this capability, you first provide a link to a .zip file in Amazon S3 containing selected Python modules to AWS Glue. Then AWS Glue imports them at runtime.

We want our AWS Glue jobs to be as simple as possible while enabling the ability to easily roll out new versions of the library. To accomplish this, all of the setup, configuration, and transformation logic is contained in the library and AWS Glue simply executes the job. As new log formats are added or updated in the library, a new version of the .zip file can be deployed to S3. It’s then automatically imported by the relevant AWS Glue job. Here is an example ETL script:

from athena_glue_service_logs.job import JobRunner
 
job_run = JobRunner(service_name='s3_access')
job_run.convert_and_partition()

About the AGSlogger library

The library is available on GitHub in the athena-glue-service-logs repository. It’s designed to do an initial conversion of AWS Service logs and also perform ongoing conversion as new logs are delivered to S3. The following log types are supported:

  • Application Load Balancer
  • Classic Load Balancer
  • AWS CloudTrail
  • Amazon CloudFront
  • S3 Access
  • Amazon VPC Flow

To convert additional logs, update the service_name variable in the script, and also the different job parameters that point to your desired table names and Amazon S3 locations.

There are some limitations of the script:

  • The script has not been tested with large volumes of log data (greater than 100 GiB).
  • If you have a large number of log files, you might need to increase your Apache Spark executor settings. Edit the AWS Glue job and add the following job parameter:

key: --conf
value: spark.yarn.executor.memoryOverhead=1G

  • If you do not have any recent logs (less than 30 days old) for certain log types like S3 Access, the script may not be able to properly populate the optimized table.
  • Several CloudTrail fields such as requestParameters and responseElements are left as JSON strings – you can use Athena to extract data from this JSON at the time of query.

Before you begin

There are a few prerequisites before you get started:

  1. Create an IAM role to use with AWS Glue. For more information, see Create an IAM Role for AWS Glue in the AWS Glue documentation.
  2. Ensure that you have access to Athena from your account.
  3. We use Amazon S3 server access logs as our example for this script, so enable access logging on an Amazon S3 bucket. For more information, see How to Enable Server Access Logging in the
    S3 documentation.
  4. Download and store the Python library in an Amazon S3 bucket in the same AWS Region in which you run the AWS Glue ETL job. Download the latest release from https://github.com/awslabs/athena-glue-service-logs/releases. Then, copy the .zip file to your Amazon S3 bucket, as follows:

aws s3 cp athena_glue_converter_v5.3.0.zip s3://<bucket>/glue_scripts/

Now, you are ready to create the AWS Glue ETL job.

Create an AWS Glue ETL job using the library

For this post, we focus on Amazon S3 server access logs. (described in the documentation). By default, these logs are delivered to a single location in Amazon S3. Converting to Parquet and partitioning these logs can significantly improve query performance and decrease query costs.

If you’ve cloned the repository associated with this release, you can use a “make” command to automate the job creation. We also walk through the job creation process in the AWS Glue console. There are a few specific settings on the Job properties page we need to set.

To create the AWS Glue ETL job

  1. In the AWS Glue console, choose Jobs under ETL on the navigation pane, and then choose Add Job. Follow the job creation wizard. Ensure that “A new script to be authored by you” is selected. We provide the code for it later. Our ETL language is Python. Under advanced properties, enable the Job bookmark. Job metrics can also be useful when monitoring your job, but not required.
  2. Under Script libraries in the Python library path section, put the full path to the .zip file that you uploaded to your Amazon S3 bucket as shown previously:
    s3://<bucket>/glue_scripts/athena_glue_converter_v5.3.0.zip

    You can adjust the DPUs if you think you need more or less processing power. For our purposes, you can leave it at 10.
  1. Specify a few different types of parameters, described in detail following:
  • The source of your Amazon S3 Server Access Logs.
  • The destination where to save the converted logs.

AWS service logs can be stored in a number of different locations, as discussed in Service Log Specifics. For storing Amazon S3 server access logs, specify the bucket and prefix matching those that you configured on the S3 bucket where you enabled access logging.

  • The names of the databases and tables that are created in the AWS Glue Data Catalog.

By default, the converted logs are partitioned by date. The script creates the necessary tables and keeps the partitions up-to-date on subsequent runs of the job. You don’t need to use AWS Glue crawlers, although they can provide similar functionality. Here are the different properties you need to configure:

KeyValue
--raw_database_namesource_logs
--raw_table_names3_access
--converted_database_nameaws_service_logs
--converted_table_names3_access
--s3_converted_targets3://<bucket>/converted/s3_access
--s3_source_location

s3://<bucket>/s3_access

 

  1. Continue with the rest of the wizard, finishing the job creation flow. The script editor opens. Replace all the code in the script editor, even the import lines, with these lines:
    from athena_glue_service_logs.job import JobRunner
     
    job_run = JobRunner(service_name='s3_access')
    job_run.convert_and_partition()

  1. Save the script and choose Run Job! When the job begins, you see log output from the job scrolling under the script.

The script you just created is saved to S3 in a standard bucket. You can also use the AWS Command Line Interface to create the AWS Glue ETL job. Copy the script preceding to S3 first and provide that as the ScriptLocation parameter.

aws glue create-job --name S3AccessLogConvertor \
--description "Convert and partition S3 Access logs" \
--role AWSGlueServiceRoleDefault \
--command Name=glueetl,ScriptLocation=s3://<bucket>/glue_scripts/s3_access_job.py \
--default-arguments '{
  "--extra-py-files":"s3://<bucket>/glue_scripts/athena_glue_converter_v5.3.0.zip",
  "--job-bookmark-option":"job-bookmark-enable",
  "--raw_database_name":"source_logs",
  "--raw_table_name":"s3_access",
  "--converted_database_name":"aws_service_logs",
  "--converted_table_name":"s3_access",
  "--TempDir":"s3://<bucket>/tmp",
  "--s3_converted_target":"s3://<bucket>/converted/s3_access",
  "--s3_source_location":"s3://<bucket>/s3_access/"
}'

Scheduling future runs

By default, this job is configured to run on a manual basis. To run it on a regular basis, set up a new schedule trigger in AWS Glue to run the job at your desired frequency. We recommend scheduling it at hourly to make it easier to locate recent logs for your optimized queries.

On every run of the job, the script looks for the new log data and converts it to Parquet format. The script then adds any new partitions that might have been added as a result of the conversion. The script uses the AWS Glue job bookmarks to ensure that it processes newly delivered data. To find more information about bookmarks in the AWS Glue documentation, see Tracking Processed Data Using Job Bookmarks.

Querying your optimized data in Athena: examples

Now that you’ve converted your data from row-based log files to columnar-based Parquet, you can write queries against this data using Athena. After the first run of the script, the tables specified in the AWS Glue ETL job properties are created for you. Here are several sample queries to get you started.

Example 1: Most requested S3 keys

SELECT key, COUNT(*) AS count
FROM "aws_service_logs"."s3_access"
WHERE operation IN ('REST.GET.OBJECT', 'REST.COPY.OBJECT', 'REST.COPY.OBJECT_GET')
GROUP BY 1
ORDER BY 2 DESC
limit 100;

Example 2: Top IP addresses that accessed the bucket yesterday

SELECT remote_ip, COUNT(*) FROM "aws_service_logs"."s3_access"
WHERE year=date_format(current_date, '%Y') AND month=date_format(current_date, '%m') AND day=date_format(current_date + interval '-1' day, '%d')
GROUP BY 1
ORDER BY 2 DESC
limit 100;

Note the use of numbers instead of strings in the use of the GROUP BY and ORDER BY operations. This is one of the optimizations for Athena queries. For other optimizations, be sure to check out the Top 10 Performance Tuning Tips blog post.

In addition, we use the year, month, and day partition columns to limit the amount of data scanned and decrease the cost of the query.

Summary

This post introduces a new open-source library that you can use to efficiently process various types of AWS service logs using AWS Glue. The library automates the application of common best practices to allow high-performing and cost-effective querying of the data using Amazon Athena and Amazon Redshift. We hope this library comes in handy, and we’re open to pull requests. If you want to add a new log type, check out the code in the AWS Labs athena-glue-service-logs repository!

 


About the Author

Damon Cortesi is a big data architect with Amazon Web Services.

 

 

 

Enabling serverless security analytics using AWS WAF full logs, Amazon Athena, and Amazon QuickSight

Post Syndicated from Umesh Ramesh original https://aws.amazon.com/blogs/security/enabling-serverless-security-analytics-using-aws-waf-full-logs/

Traditionally, analyzing data logs required you to extract, transform, and load your data before using a number of data warehouse and business intelligence tools to derive business intelligence from that data—on top of maintaining the servers that ran behind these tools.

This blog post will show you how to analyze AWS Web Application Firewall (AWS WAF) logs and quickly build multiple dashboards, without booting up any servers. With the new AWS WAF full logs feature, you can now log all traffic inspected by AWS WAF into Amazon Simple Storage Service (Amazon S3) buckets by configuring Amazon Kinesis Data Firehose. In this walkthrough, you’ll create an Amazon Kinesis Data Firehose delivery stream to which AWS WAF full logs can be sent, and you’ll enable AWS WAF logging for a specific web ACL. Then you’ll set up an AWS Glue crawler job and an Amazon Athena table. Finally, you’ll set up Amazon QuickSight dashboards to help you visualize your web application security. You can use these same steps to build additional visualizations to draw insights from AWS WAF rules and the web traffic traversing the AWS WAF layer. Security and operations teams can monitor these dashboards directly, without needing to depend on other teams to analyze the logs.

The following architecture diagram highlights the AWS services used in the solution:

Figure 1: Architecture diagram

Figure 1: Architecture diagram

AWS WAF is a web application firewall that lets you monitor HTTP and HTTPS requests that are forwarded to an Amazon API Gateway API, to Amazon CloudFront or to an Application Load Balancer. AWS WAF also lets you control access to your content. Based on conditions that you specify—such as the IP addresses from which requests originate, or the values of query strings—API Gateway, CloudFront, or the Application Load Balancer responds to requests either with the requested content or with an HTTP 403 status code (Forbidden). You can also configure CloudFront to return a custom error page when a request is blocked.

Amazon Kinesis Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3, Amazon Redshift, Amazon Elasticsearch Service, and Splunk. With Kinesis Data Firehose, you don’t need to write applications or manage resources. You configure your data producers to send data to Kinesis Data Firehose, and it automatically delivers the data to the destination that you specified. You can also configure Kinesis Data Firehose to transform your data before delivering it.

AWS Glue can be used to run serverless queries against your Amazon S3 data lake. AWS Glue can catalog your S3 data, making it available for querying with Amazon Athena and Amazon Redshift Spectrum. With crawlers, your metadata stays in sync with the underlying data (more details about crawlers later in this post). Amazon Athena and Amazon Redshift Spectrum can directly query your Amazon S3 data lake by using the AWS Glue Data Catalog. With AWS Glue, you access and analyze data through one unified interface without loading it into multiple data silos.

Amazon Athena is an interactive query service that makes it easy to analyze data directly in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Amazon QuickSight is a business analytics service you can use to build visualizations, perform one-off analysis, and get business insights from your data. It can automatically discover AWS data sources and also works with your data sources. Amazon QuickSight enables organizations to scale to hundreds of thousands of users and delivers responsive performance by using a robust in-memory engine called SPICE.

SPICE stands for Super-fast, Parallel, In-memory Calculation Engine. SPICE supports rich calculations to help you derive insights from your analysis without worrying about provisioning or managing infrastructure. Data in SPICE is persisted until it is explicitly deleted by the user. SPICE also automatically replicates data for high availability and enables Amazon QuickSight to scale to hundreds of thousands of users who can all simultaneously perform fast interactive analysis across a wide variety of AWS data sources.

Step one: Set up a new Amazon Kinesis Data Firehose delivery stream

  1. In the AWS Management Console, open the Amazon Kinesis Data Firehose service and choose the button to create a new stream.
    1. In the Delivery stream name field, enter a name for your new stream that starts with aws-waf-logs- as shown in the screenshot below. AWS WAF filters all streams starting with the keyword aws-waf-logs when it displays the delivery streams. Note the name of your stream since you’ll need it again later in the walkthrough.
    2. For Source, choose Direct PUT, since AWS WAF logs will be the source in this walkthrough.

      Figure 2: Select the delivery stream name and source

      Figure 2: Select the delivery stream name and source

  2. Next, you have the option to enable AWS Lambda if you need to transform your data before transferring it to your destination. (You can learn more about data transformation in the Amazon Kinesis Data Firehose documentation.) In this walkthrough, there are no transformations that need to be performed, so for Record transformation, choose Disabled.
    Figure 3: Select "Disabled" for record transformations

    Figure 3: Select “Disabled” for record transformations

    1. You’ll have the option to convert the JSON object to Apache Parquet or Apache ORC format for better query performance. In this example, you’ll be reading the AWS WAF logs in JSON format, so for Record format conversion, choose Disabled.

      Figure 4: Choose "Disabled" to not convert the JSON object

      Figure 4: Choose “Disabled” to not convert the JSON object

  3. On the Select destination screen, for Destination, choose Amazon S3.
    Figure 5: Choose the destination

    Figure 5: Choose the destination

    1. For the S3 destination, you can either enter the name of an existing S3 bucket or create a new S3 bucket. Note the name of the S3 bucket since you’ll need the bucket name in a later step in this walkthrough.
    2. For Source record S3 backup, choose Disabled, because the destination in this walkthrough is an S3 bucket.

      Figure 6: Enter the S3 bucket name, and select "Disabled" for the Source record S3 backup

      Figure 6: Enter the S3 bucket name, and select “Disabled” for the source record S3 backup

  4. On the next screen, leave the default conditions for Buffer size, Buffer interval, S3 compression and S3 encryption as they are. However, we recommend that you set Error logging to Enabled initially, for troubleshooting purposes.
    1. For IAM role, select Create new or choose. This opens up a new window that will prompt you to create firehose_delivery_role, as shown in the following screenshot. Choose Allow in this window to accept the role creation. This grants the Kinesis Data Firehose service access to the S3 bucket.

      Figure 7: Select "Create new or choose" for IAM Role

      Figure 7: Select “Allow” to create the IAM role “firehose_delivery_role”

  5. On the last step of configuration, review all the options you’ve chosen, and then select Create delivery stream. This will cause the delivery stream to display as “Creating” under Status. In a couple of minutes, the status will change to “Active,” as shown in the below screenshot.

    Figure 8: Review the options you selected

    Figure 8: Review the options you selected

Step two: Enable AWS WAF logging for a specific Web ACL

  1. From the AWS Management Console, open the AWS WAF service and choose Web ACLs. Open your Web ACL resource, which can either be deployed on a CloudFront distribution or on an Application Load Balancer.
    1. Choose the Web ACL for which you want to enable logging. (In the below screenshot, we’ve selected a Web ACL in the US East Region.)
    2. On the Logging tab, choose Enable Logging.

      Figure 9: Choose "Enable Logging"

      Figure 9: Choose “Enable Logging”

  2. The next page displays all the delivery streams that start with aws-waf-logs. Choose the Amazon Kinesis Data Firehose delivery stream that you created for AWS WAF logs at the start of this walkthrough. (In the screenshot below, our example stream name is “aws-waf-logs-us-east-1)
    1. You can also choose to redact certain fields that you wish to exclude from being captured in the logs. In this walkthrough, you don’t need to choose any fields to redact.
    2. Select Create.

      Figure 10: Choose your delivery stream, and select "Create"

      Figure 10: Choose your delivery stream, and select “Create”

After a couple of minutes, you’ll be able to inspect the S3 bucket that you defined in the Kinesis Data Firehose delivery stream. The log files are created in directories by year, month, day, and hour.

Step three: Set up an AWS Glue crawler job and Amazon Athena table

The purpose of a crawler within your Data Catalog is to traverse your data stores (such as S3) and extract the metadata fields of the files. The output of the crawler consists of one or more metadata tables that are defined in your Data Catalog. When the crawler runs, the first classifier in your list to successfully recognize your data store is used to create a schema for your table. AWS Glue provides built-in classifiers to infer schemas from common files with formats that include JSON, CSV, and Apache Avro.

  1. In the AWS Management Console, open the AWS Glue service and choose Crawler to setup a crawler job.
  2. Choose Add crawler to launch a wizard to setup the crawler job. For Crawler name, enter a relevant name. Then select Next.

    Figure 11: Enter "Crawler name," and select "Next"

    Figure 11: Enter “Crawler name,” and select “Next”

  3. For Choose a data store, select S3 and include the path of the S3 bucket that stores your AWS WAF logs, which you made note of in step 1.3. Then choose Next.

    Figure 12: Choose a data store

    Figure 12: Choose a data store

  4. When you’re given the option to add another data store, choose No.
  5. Then, choose Create an IAM role and enter a name. The role grants access to the S3 bucket for the AWS Glue service to access the log files.

    Figure 13: Choose "Create an IAM role," and enter a name

    Figure 13: Choose “Create an IAM role,” and enter a name

  6. Next, set the frequency to Run on demand. You can also schedule the crawler to run periodically to make sure any changes in the file structure are reflected in your data catalog.

    Figure 14: Set the "Frequency" to "Run on demand"

    Figure 14: Set the “Frequency” to “Run on demand”

  7. For output, choose the database in which the Athena table is to be created and add a prefix to identify your table name easily. Select Next.

    Figure 15: Choose the database, and enter a prefix

    Figure 15: Choose the database, and enter a prefix

  8. Review all the options you’ve selected for the crawler job and complete the wizard by selecting the Finish button.
  9. Now that the crawler job parameters are set up, on the left panel of the console, choose Crawlers to select your job and then choose Run crawler. The job creates an Amazon Athena table. The duration depends on the size of the log files.

    Figure 16: Choose "Run crawler" to create an Amazon Athena table

    Figure 16: Choose “Run crawler” to create an Amazon Athena table

  10. To see the Amazon Athena table created by the AWS Glue crawler job, from the AWS Management Console, open the Amazon Athena service. You can filter by your table name prefix.
      1. To view the data, choose Preview table. This displays the table data with certain fields showing data in JSON object structure.
    Figure 17: Choose "Preview table" to view the data

    Figure 17: Choose “Preview table” to view the data

Step four: Create visualizations using Amazon QuickSight

  1. From the AWS Management Console, open Amazon QuickSight.
  2. In the Amazon QuickSight window, in the top left, choose New Analysis. Choose New Data set, and for the data source choose Athena. Enter an appropriate name for the data source name and choose Create data source.

    Figure 18: Enter the "Data source name," and choose "Create data source"

    Figure 18: Enter the “Data source name,” and choose “Create data source”

  3. Next, choose Use custom SQL to extract all the fields in the JSON object using the following SQL query:
    
        ```
        with d as (select
        waf.timestamp,
            waf.formatversion,
            waf.webaclid,
            waf.terminatingruleid,
            waf.terminatingruletype,
            waf.action,
            waf.httpsourcename,
            waf.httpsourceid,
            waf.HTTPREQUEST.clientip as clientip,
            waf.HTTPREQUEST.country as country,
            waf.HTTPREQUEST.httpMethod as httpMethod,
            map_agg(f.name,f.value) as kv
        from sampledb.jsonwaflogs_useast1 waf,
        UNNEST(waf.httprequest.headers) as t(f)
        group by 1,2,3,4,5,6,7,8,9,10,11)
        select d.timestamp,
            d.formatversion,
            d.webaclid,
            d.terminatingruleid,
            d.terminatingruletype,
            d.action,
            d.httpsourcename,
            d.httpsourceid,
            d.clientip,
            d.country,
            d.httpMethod,
            d.kv['Host'] as host,
            d.kv['User-Agent'] as UA,
            d.kv['Accept'] as Acc,
            d.kv['Accept-Language'] as AccL,
            d.kv['Accept-Encoding'] as AccE,
            d.kv['Upgrade-Insecure-Requests'] as UIR,
            d.kv['Cookie'] as Cookie,
            d.kv['X-IMForwards'] as XIMF,
            d.kv['Referer'] as Referer
        from d;
        ```        
        

  4. To extract individual fields, copy the previous SQL query and paste it in the New custom SQL box, then choose Edit/Preview data.
    Figure 19: Paste the SQL query in "New custom SQL query"

    Figure 19: Paste the SQL query in “New custom SQL query”

    1. In the Edit/Preview data view, for Data source, choose SPICE, then choose Finish.

      Figure 20: Choose "Spice" and then "Finish"

      Figure 20: Choose “Spice” and then “Finish”

  5. Back in the Amazon Quicksight console, under the Fields section, select the drop-down menu and change the data type to Date.

    Figure 21: In the Amazon Quicksight console, change the data type to "Date"

    Figure 21: In the Amazon Quicksight console, change the data type to “Date”

  6. After you see the Date column appear, enter an appropriate name for the visualizations at the top of the page, then choose Save.

    Figure 22: Enter the name for the visualizations, and choose "Save"

    Figure 22: Enter the name for the visualizations, and choose “Save”

  7. You can now create various visualization dashboards with multiple visual types by using the drag-and-drop feature. You can drag and drop combinations of fields such as Action, Client IP, Country, Httpmethod, and User Agents. You can also add filters on Date to view dashboards for a specific timeline. Here are some sample screenshots:
    Figure 23: Visualization dashboard samples

    Figure 23a: Visualization dashboard samples

    Figure 23: Visualization dashboard samples

    Figure 23b: Visualization dashboard samples

    Figure 23: Visualization dashboard samples

    Figure 23c: Visualization dashboard samples

    Figure 23: Visualization dashboard samples

    Figure 23d: Visualization dashboard samples

Conclusion

You can enable AWS WAF logs to Amazon S3 buckets and analyze the logs while they are being streamed by configuring Amazon Kinesis Data Firehose. You can further enhance this solution by automating the streaming of data and using AWS Lambda for any data transformations based on your specific requirements. Using Amazon Athena and Amazon QuickSight makes it easy to analyze logs and build visualizations and dashboards for executive leadership teams. Using these solutions, you can go serverless and let AWS do the heavy lifting for you.

Author photo

Umesh Kumar Ramesh

Umesh is a Cloud Infrastructure Architect with Amazon Web Services. He delivers proof-of-concept projects, topical workshops, and lead implementation projects to various AWS customers. He holds a Bachelor’s degree in Computer Science & Engineering from National Institute of Technology, Jamshedpur (India). Outside of work, Umesh enjoys watching documentaries, biking, and practicing meditation.

Author photo

Muralidhar Ramarao

Muralidhar is a Data Engineer with the Amazon Payment Products Machine Learning Team. He has a Bachelor’s degree in Industrial and Production Engineering from the National Institute of Engineering, Mysore, India. Outside of work, he loves to hike. You will find him with his camera or snapping pictures with his phone, and always looking for his next travel destination.

This Is My Architecture: Mobile Cryptocurrency Mining

Post Syndicated from Annik Stahl original https://aws.amazon.com/blogs/architecture/this-is-my-architecture-mobile-cryptocurrency-mining/

In North America, approximately 95% of adults over the age of 25 have a bank account. In the developing world, that number is only about 52%. Cryptocurrencies can provide a platform for millions of unbanked people in the world to achieve financial freedom on a more level financial playing field.

Electroneum, a cryptocurrency company located in England, built its cryptocurrency mobile back end on AWS and is using the power of blockchain to unlock the global digital economy for millions of people in the developing world.

Electroneum’s cryptocurrency mobile app allows Electroneum customers in developing countries to transfer ETNs (exchange-traded notes) and pay for goods using their smartphones. Listen in to the discussion between AWS Solutions Architect Toby Knight and Electroneum CTO Barry Last as they explain how the company built its solution. Electroneum’s app is a web application that uses a feedback loop between its web servers and AWS WAF (a web application firewall) to automatically block malicious actors. The system then uses Athena, with a gamified approach, to provide an additional layer of blocking to prevent DDoS attacks. Finally, Electroneum built a serverless, instant payments system using AWS API Gateway, AWS Lambda, and Amazon DynamoDB to help its customers avoid the usual delays in confirming cryptocurrency transactions.

 

Visualize over 200 years of global climate data using Amazon Athena and Amazon QuickSight

Post Syndicated from Joe Flasher original https://aws.amazon.com/blogs/big-data/visualize-over-200-years-of-global-climate-data-using-amazon-athena-and-amazon-quicksight/

Climate Change continues to have a profound effect on our quality of life. As a result, the investigation into sustainability is growing. Researchers in both the public and private sector are planning for the future by studying recorded climate history and using climate forecast models.

To help explain these concepts, this post introduces the Global Historical Climatology Network Daily (GHCN-D). This registry is used by the global climate change research community.

This post also provides a step-by-step demonstration of how Amazon Web Services (AWS) services improve access to this data for climate change research. Data scientists and engineers previously had to access hundreds of nodes on high-performance computers to query this data. Now they can get the same data by using a few steps on AWS.

Background

Global climate analysis is essential for researchers to assess the implications of climate change on the Earth’s natural capital and ecosystem resources. This activity requires high-quality climate datasets, which can be challenging to work with because of their scale and complexity. To have confidence in their findings, researchers must be confident about the provenance of the climate datasets that they work with. For example, researchers may be trying to answer questions like: has the climate of a particular food producing area changed in a way that impacts food security? They must be able to easily query authoritative and curated datasets.

The National Centers for Environmental Information (NCEI) in the U.S. maintains a dataset of climate data that is based on observations from weather stations around the globe. It’s the Global Historical Climatology Network Daily (GHCN-D) — a central repository for daily weather summaries from ground-based stations. It is comprised of millions of quality-assured observations that are updated daily.

The most common parameters recorded are daily temperatures, rainfall, and snowfall. These are useful parameters for assessing risks for drought, flooding, and extreme weather.

The challenge

The NCEI makes the GHCN_D data available in CSV format through an FTP server, organized by year. Organizing the data by year means that a complete copy of the archive requires over 255 files (the first year in the archive is 1763). Traditionally, if a researcher wants to work on this dataset they must download it and work on it locally. For a researcher to be sure of using the latest data for their analysis, they must repeat this download every day.

For researchers, deriving insight from this data can be a challenge. They must be able to fully engage with the data, because that requires technical skill, computing resources, and subject matter expertise.

A new efficient approach

Through AWS’s collaboration with the NOAA Big Data Project, a daily snapshot of the GHCN_D dataset is now available on AWS. The data is publically accessible through an Amazon S3 bucket. For more information, see the Registry of Open Data on AWS.

Having the data available in this way offers several advantages:

  • The data is globally available to a community of users. Users no longer must download data to work on it. Everyone can work with the same, authoritative copy.
  • Time to insight is reduced. By taking advantage of AWS services, researchers can immediately start to perform analysis.
  • The cost of research is reduced. Researchers can switch off resources as soon as their analysis is finished.

This blog post illustrates a workflow using Amazon S3, Amazon Athena, AWS Glue, and Amazon QuickSight that demonstrates how quickly one can derive insights from this dataset.

The workflow presented in this post follows these general steps:

  • Extract data files from the NOAA bucket and make the data available as tables.
  • Use SQL to query the data contained in the tables.
  • Show how to speed up analysis by creating tables from queries and storing those tables in a private Amazon S3 bucket.
  • Visualize the data to gain insight.

Overview of the GHCN_D dataset

The GHCN-D is a quality-assured dataset that contains daily weather summaries from weather stations across global land areas. It has the following properties:

  • Data is integrated from approximately 30 sources that provide weather observations from various national and international networks.
  • A comprehensive dataset for the US and good coverage for many parts of the world.
  • There are many types of daily weather observations in this dataset, but the majority are maximum temperature, minimum temperature, precipitation, snow fall, and snow depth. These observations include:
    • Over 35,000 temperature stations.
    • Over 100,000 precipitation stations.
    • Over 50,000 snowfall or snow depth stations
  • The source of each datum, the term used for a single record, is contained in the dataset. Each datum has a quality control flag associated with it.
  • The dataset is updated daily. The historic sources are reprocessed weekly.

You can see in the graphic below how the data volume has grown in recent decades.

Figure 1: 1763 to 2018. For 1763 there are less than a thousand observations. For 2017 there are over 34 million observations.

Organization of the data on Amazon S3

As previously mentioned, the GHCN-D dataset is accessible through an Amazon S3 bucket. The details of the dataset are on the Registry of Open Data on AWS (RODA). The landing page for the dataset on RODA contains a link to a comprehensive readme file for the dataset. This readme contains all of the lookup tables and variable definitions.

This section shows the pertinent information required to start working with the dataset.

The data is in a text, or comma-separated values (CSV), format and is contained in the Amazon S3 bucket called noaa-ghcn-pds.

The noaa-ghcn-pds bucket contains virtual folders, and is structured like this:

  • noaa-ghcn-pds. This is the root of the bucket with two subdirectories and a number of useful files. For the purposes of this exercise, we use only the ghcnd-stations.txt file. This file contains information about the observation stations that produced the data for the GHCN_D dataset. You must download the ghcnd-stations.txt file.
  • noaa-ghcn-pds/csv/. This virtual folder contains all of the observations from 1763 to the present organized in .csv files, one file for every year. For this exercise, we’ll collate this data into a single table.

Also for the purpose of this exercise, the data from ‘ghcnd-stations.txt’ and the data contained in noaa-ghcn-pds/csv/ are extracted and added to two separate tables. These tables are the basis of the analysis.

The tables are labeled as:

  • tblallyears. This table contains all the records stored in the yearly .csv files from 1763 to present.
  • tblghcnd_stations. This table contains information for over 106,430 weather stations.

Point of interest: the .csv file from the year 1763 contains the data for one weather station. That station was located in the center of Milan, Italy.

The tools

To implement the general workflow in this exercise, we’re using the following tools:

  • Amazon Simple Storage Service (Amazon S3) to stage the data for analysis. The GHCN_D dataset is stored in a bucket on Amazon S3. We also use a private bucket to store new tables created from queries.
  • Amazon Athena to query data stored on Amazon S3 using standard SQL.
  • AWS Glue to extract and load data into Athena from the Amazon S3 buckets in which it is stored. AWS Glue is a fully managed extract, transform, and load (ETL) service.
  • AWS Glue Data Catalog to catalog the data that we query with Athena.
  • Amazon QuickSight to build visualizations, perform ad hoc analyses, and get insights from the dataset. Queries and tables from Athena can be read directly from Amazon QuickSight. Amazon QuickSight can also run queries against tables in Athena.

To implement the processes outlined in this post, you need an AWS Account. For more information about creating an AWS account, see Getting Started with AWS. You also must create a private Amazon S3 bucket located in the N. Virginia AWS Region. For more information, see Create a Bucket.

When you create the bucket, it must contain the following empty directories:

  1. [your_bucket_name]/stations_raw/
  2. [your_bucket_name]/ghcnblog/
  3. [your_bucket_name]/ghcnblog/stations/
  4. [your_bucket_name]/ghcnblog/allyears/
  5. [your_bucket_name]/ghcnblog/1836usa/

The following is an overview of how the various AWS services interact in this workflow.

Note

The AWS services are in the same AWS Region. One of the Amazon S3 buckets is the existing one that stores the GHCN_D data. The other Amazon S3 bucket is the bucket that you use for storing tables.

Figure 2: How the AWS services work together to compose this workflow.

The workflow

Now that we have the tools and the data, we are ready to:

  1. Extract the yearly .csv files and add them to a table in Amazon Athena.
  2. Extract the stations text file and add it to a separate table in Amazon Athena.

Extract the yearly .csv files and add it to a table in Amazon Athena

The complete set of daily weather observations is organized by year in one folder of the Amazon S3 bucket in .csv format. The path to the data is s3://noaa-ghcn-pds/csv/.

Each file is named by year beginning with 1763.csv and progressing one year at a time up to the present.

From the AWS console, click on AWS Athena. This takes you to the main dashboard for Athena. From here, click on AWS Glue Data Catalog. This brings you to AWS Glue.

In AWS Glue, choose the Tables section on the left side. Then, in the Add table drop-down menu, choose Add table manually. A series of forms displays for you to add the following information:

  • Set up your table’s properties:
    • Give the new table a name, for example, tblallyears
    • Create a database and name it ghcnblog.

The database then appears in the Athena dashboard.

  • Add a data store:
    • Choose the Specified path in another account option, and enter the following path in the text box: s3://noaa-ghcn-pds/csv/
  • Choose a data format:
    • Select CSV, then select Comma as the delimiter.
  • Define a schema:
    • Add the following columns as string variables:
      • id
      • year_date
      • element
      • data_value
      • m_flag
      • q_flag
      • s_flag
      • obs_time

For a full description of the variables and data structures, see the readme file.

  • Choose OK, then Finish.

Now return to the Athena dashboard, and choose the database that you created. The table will appear in the list of tables on the left. You can now preview the data by choosing the ‘Preview table’ option to the right of the table.

Use CTAS to speed up queries

As a final step, create a table using the SQL statement called CREATE TABLE AS SELECT (CTAS). Store the table in a private Amazon S3 bucket.

This step dramatically speeds up queries. The reason is because in this process we extract the data once and store the extracted data in a columnar format (Parquet) in the private Amazon S3 bucket.

To illustrate the improvement in speed, here are two examples:

  • A query that counts all of the distinct IDs, meaning unique weather stations, takes around 55 seconds and scans around 88 GB of data.
  • The same query on the converted data takes around 13 seconds and scans about 5 GB of data.

To create the table for this final step:

  1. Open the Athena console.
  2. In the dashboard, select New query, then enter the query as shown in the following example. Make sure to enter the information that’s applicable to your particular situation, such as your bucket name.
    Figure 3: Query to create a table data converting the data into Parquet and storing it in your S3 bucket.
  3. Make sure that the data format is Parquet.
  4. Name your table tblallyears_qa.
  5. Add this path to this folder in the private Amazon S3 bucket: [your_bucket_name]/ghcnblog/allyearsqa/. Replace your_bucket_name with your specific bucket name.

The new table appears in your database, listed on the left side of the Athena dashboard. This is the table that we work with going forward.

Extract the stations text file and add it to a separate table in Amazon Athena

The stations text file contains information about the weather stations, such as location, nationality, and ID. This data is kept in a separate file from the yearly observations. We need to import this data to look at the geographical spread of weather observations. While dealing with this file is a bit more complicated, the steps to importing this data into Athena are similar to what we have already done.

To import this data into Athena:

  1. Download the ghcnd-stations text file.
  2. Open the file in a spreadsheet program and use the fixed width-delimited data import function. The fixed widths of the columns are described in the readme file in the section called FORMAT OF “ghcnd-stations.txt” file.
  3. After you successfully import the data, save the spreadsheet as a .csv text file.
  4. Copy the new .csv file to [your_bucket_name]/stations_raw/. Replace your_bucket_name with your specific bucket name.
  5. Using this new .csv file, follow the Add table process steps in AWS Glue, as described earlier in this post.
    • Use the following field names:
      • id
      • latitude
      • longitude
      • elevation
      • state
      • name
      • gsn_flag
      • hcn_flag
      • wmo_id
    • Name the table tblghcnd_stations.
  6. After the table is created, follow the CREATE TABLE AS SELECT (CTAS) steps for this table as described earlier in this post.
    • Name the new table tblghcnd_stations_qa.
    • Store the new table in [your_bucket_name]/ghcnblog/stations/. Replace your_bucket_name with your specific bucket name.

The two most important datasets of GHCN_D are now in Athena.

In the next section, we run queries against these tables and visualize the results using Amazon QuickSight.

Exploratory data analysis and visualization

With our two tables created, we are now ready to query and visualize to gain insight.

Exploratory data analysis

In the Athena query window, run the following queries to get an idea of the size of the dataset.

Query #1: the total number of observations since 1763:

Figure 4: Query for total number of observations. This was run in autumn 2018. The dataset is continuingly growing over time.

Query #2: the number of stations since 1763:

Figure 5: Query for total number of stations that have made observations since 1763. Deactivated stations are included.

Average weather parameters for the Earth

The following figure shows a query that calculates the average maximum temperature (Celsius), average minimum temperature (Celsius), and average rainfall (mm) for the Earth since 1763.

In the query, we must convert the data_value from a String variable to a Real variable. We also must divide by 10, because the temperature and precipitation measurements are in tenths of their respective units. For more information about these details and the element codes (TMIB, TMAX and PRCP), see the readme file.

Figure 6. Querying for global averages to get to Earth’s pulse.

It would be convenient if we could just run simple queries, such as this one, on this dataset and accept that the results are correct.

The previous query is assuming an even and equal spread of weather stations around the world since 1763. In fact, the number and spread of weather stations varied over time.

Visualizing the growth in numbers of weather stations over time

The following query visualizes the number of weather stations for each year since 1763, by using Amazon QuickSight.

Note: You must be signed up for Amazon QuickSight to complete these steps. During the sign-up process, you are prompted to manage your Amazon QuickSight data source permissions. At this time, use step 3 in the following procedure to grant access to the Amazon S3 buckets and to Athena.

The steps are as follows:

  1. Open the Amazon QuickSight console.
  2. On the far right of the dashboard, choose Manage QuickSight.
  3. Choose Account Setting, then Manage QuickSight permissions. Give Amazon QuickSight permission to access Athena, and to read the Amazon S3 bucket that contains the new tables.
  4. Return to Amazon QuickSight by choosing the logo on the top left side of the screen.
  5. From the Amazon QuickSight dashboard, choose New analysis, then New data set.
  6. From the Create a Data Set tiles, choose Athena. Name the data source, for example ghcnblog, then choose Create data source.
  7. Choose the option to add a custom SQL, then add the SQL, as shown in the following example:

Figure 7: Location to add a custom SQL query.

  1. Choose Confirm query.
  2. Choose Directly query your data.
  3. Choose Visualize.
  4. To make the graph, choose the line chart graph. Add year to the X-axis and number_of_stations to the Value field wells. The options appear to the left of the visualization dashboard.

Figure 8. The number of global weather stations used by GHCN_D over time.

The resulting graph shows that the number and spread of stations around the world has varied over time.

A look at the development of observation in the US

1836 is the year of the first US observation station in the data set. To get an insight into the development of observations the US, we extracted a subset of US data from the main data source (tblallyears_qa). This dataset features annual data every 30th year from 1836 to 2016.

This query generates a large dataset. To improve performance, save the query as a table stored in an Amazon S3 bucket using the previously described procedure.

The query to do this in one step is shown in the following figure.

Figure 9: The SQL to create a table from a query and store it in Parquet format in a user-specified Amazon S3 bucket.

The table appears in the Amazon Athena dashboard as tbl1836every30thyear and it forms the basis for our analysis.

In the Amazon QuickSight console, use the follow SQL to generate a new dataset.

Figure 10: The SQL to create a dataset for viewing USA data in Amazon QuickSight.

  1. Choose Confirm query.
  2. Choose Directly query your data.
  3. Choose Visualize.

This brings you back to the visualization dashboard. From this dashboard, chose the Points on a map visualization, and set up the fields as follows:

  • Geospatial: state
  • Size: number_of_stations, aggregate by count.
  • Color: year

The results should be the following map of the US showing the growth of weather stations used by GHCN_D from 1836 to 2016 at 30-year increments. In 1836, there was one station. By 2016, there were thousands.

Figure 11: The growth of the number of observations stations in the US.

Interestingly, some states had more stations in 1956 than they did in 1986. This is also illustrated in the following figure. The data for the figure was derived from the previous dataset.

Figure 12: This heat map illustrates the number of stations per state over time. This is a 30th year snapshot.

A look at more data

We have now a data lake of GHN_D data. By using the tools that we have assembled, we are free to experiment with the data. It is now possible to construct queries and visualization on this huge dataset to gain insights.

The following figure shows the heat map that we created. It shows the average minimum temperature in US states over time. As before, we are looking at 30-year snapshots; that is to say, every 30th year we take a yearly average.

Figure 13: This heat map illustrates the minimum temperate for each state over time. A yearly average every 30th year starting at 1836.

Summary

Our headlines are full of Climate Change and Sustainability stories, and research and analysis has become more crucial than ever.

We showed researchers, analysts, and scientists how AWS services have reduced the level of technical skills required to fully use the GHCN_D dataset.

This GHCN-D is available on AWS. The details can be found on the Registry of Open Data on AWS. This data is available to researchers studying climate change and weather impacts.

This blog post demonstrated a typical workflow that a researcher could use to engage with and analyze this important data by using Amazon Athena, AWS Glue, and Amazon S3, and how they can visualize insights by using Amazon QuickSight.

By making this data available, Amazon has removed the heavy lifting that was traditionally required to work with the GHCN_D, thus expanding the opportunity for new research and new discoveries.

 


About the Authors

Joe Flasher is the Open Geospatial Data Lead at Amazon Web Services, helping organizations most effectively make data available for analysis in the cloud. Joe has been working with geospatial data and open source projects for the past decade, both as a contributor and maintainer. He has been a member of the Landsat Advisory Group, and has worked on projects, ranging from building GIS software to making the space shuttle fly. Joe’s background is in astrophysics, but he kindly requests you don’t ask him any questions about constellations.

 

 

Conor Delaney, PhD. is an environmental data scientist.

Create real-time clickstream sessions and run analytics with Amazon Kinesis Data Analytics, AWS Glue, and Amazon Athena

Post Syndicated from Hugo Rozestraten original https://aws.amazon.com/blogs/big-data/create-real-time-clickstream-sessions-and-run-analytics-with-amazon-kinesis-data-analytics-aws-glue-and-amazon-athena/

Clickstream events are small pieces of data that are generated continuously with high speed and volume. Often, clickstream events are generated by user actions, and it is useful to analyze them.

For example, you can detect user behavior in a website or application by analyzing the sequence of clicks a user makes, the amount of time the user spends, where they usually begin the navigation, and how it ends. By tracking this user behavior in real time, you can update recommendations, perform advanced A/B testing, push notifications based on session length, and much more. To track and analyze these events, you need to identify and create sessions from them. The process of identifying events in the data and creating sessions is known as sessionization.

Capturing and processing data clickstream events in real time can be difficult. As the number of users and web and mobile assets you have increases, so does the volume of data. Amazon Kinesis provides you with the capabilities necessary to ingest this data in real time and generate useful statistics immediately so that you can take action.

When you run sessionization on clickstream data, you identify events and assign them to a session with a specified key and lag period. After each event has a key, you can perform analytics on them. The use cases for sessionization vary widely, and have different requirements. For example, you might need to identify and create sessions from events in web analytics to track user actions. Sessionization is also broadly used across many different areas, such as log data and IoT.

This blog post demonstrates how to identify and create sessions from real-time clickstream events and then analyze them using Amazon Kinesis Data Analytics.

Why did we choose Kinesis Data Analytics?

Clickstream data arrives continuously as thousands of messages per second receiving new events. When you analyze the effectiveness of new application features, site layout, or marketing campaigns, it is important to analyze them in real time so that you can take action faster.

To perform the sessionization in batch jobs, you could use a tool such as AWS Glue or Amazon EMR. But with daily schedules, queries and aggregation, it can take more resources and time because each aggregation involves working with large amounts of data. Performing sessionization in Kinesis Data Analytics takes less time and gives you a lower latency between the sessions generation. You can trigger real-time alerts with AWS Lambda functions based on conditions, such as session time that is shorter than 20 seconds, or a machine learning endpoint.

Identifying a session among thousands of clicks

A session is a short-lived and interactive exchange between two or more devices and/or users. For example, it can be a user browsing and then exiting your website, or an IoT device waking up to perform a job and then going back to sleep. These interactions result in a series of events that occur in sequence that start and end, or a session. A start and an end of a session can be difficult to determine, and are often defined by a time period without a relevant event associated with a user or device. A session starts when a new event arrives after a specified “lag” time period has passed without an event arriving. A session ends in a similar manner, when a new event does not arrive within the specified lag period.

This blog post relies on several other posts about performing batch analytics on SQL data with sessions. My two favorite posts on this subject are Sessionization in SQL, Hive, Pig and Python from Dataiku and Finding User Session with SQL by Benn Stancil at Mode. Both posts take advantage of SQL window functions to identify and build sessions from clickstream events.

ANSI added SQL window functions to the SQL standard in 2003 and has since expanded them. Window functions work naturally with streaming data and enable you to easily translate batch SQL examples to Kinesis Data Analytics.

In this use case, I group the events of a specific user as described in the following simplified example. In this example, I use distinct navigation patterns from three users to analyze user behavior. To begin, I group events by user ID to obtain some statistics from data, as shown following:

In this example, for “User ID 20,” the minimum timestamp is 2018-11-29 23:35:10 and the maximum timestamp is 2018-11-29 23:35:44. This provides a 34 seconds-long session, starting with action “B_10” and ending with action “A_02.” These “actions” are identification of the application’s buttons in this example.

Suppose that after several minutes, new “User ID 20” actions arrive. Would you consider them as running in the same session? A user can abort a navigation or start a new one. Also, applications often have timeouts. You have to decide what is the maximum session length to consider it a new session. A session can run anywhere from 20 to 50 seconds, or from 1 to 5 minutes.

There are other elements that you might want to consider, such as a client IP or a machine ID. These elements allow you to separate sessions that occur on different devices.

High-level solution overview

The end-to-end scenario described in this post uses Amazon Kinesis Data Streams to capture the clickstream data and Kinesis Data Analytics to build and analyze the sessions. The aggregated analytics are used to trigger real-time events on Lambda and then send them to Kinesis Data Firehose. Kinesis Data Firehose sends data to an Amazon S3 bucket, where it is ingested to a table by an AWS Glue crawler and made available for running queries with Amazon Athena. You can use this table for ad hoc analysis.

The following diagram shows an end-to-end sessionization solution.

  • Data ingestion: You can use Kinesis Data Streams to build custom applications that process or analyze streaming data for specialized needs. Kinesis Data Streams can continuously capture and store terabytes of data per hour from hundreds of thousands of sources, such as website clickstreams, financial transactions, social media feeds, IT logs, and location-tracking events.
  • Data sessionization: Kinesis Data Analytics is the easiest way to process streaming data in real time with standard SQL without having to learn new programming languages or processing frameworks. With Kinesis Data Analytics, you can query streaming data or build entire streaming applications using SQL, so that you can gain actionable insights and respond to your business and customer needs promptly.
  • Data processing and storage: The sessionization stream is read from Kinesis Data Analytics using an AWS Lambda function. The function triggers two events: one real-time dashboard in Amazon CloudWatch and a second one to persist data with Kinesis Data Firehose.
  • Data analysis: AWS Glue is used to crawl Amazon S3 and build or update metadata definition for Amazon Athena tables.

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena provides connectivity to any application using JDBC or ODBC drivers.

  • Data visualization: Amazon QuickSight is a visualization tool that is natively used to build dashboards over Amazon Athena data.
  • Monitoring: Amazon CloudWatch is a tool that lets you monitor the streaming activities, such as the number of bytes processed or delivered per second, or the number of failures.

After you finish the sessionization stage in Kinesis Data Analytics, you can output data into different tools. For example, you can use a Lambda function to process the data on the fly and take actions such as send SMS alerts or roll back a deployment. To learn how to implement such workflows based on AWS Lambda output, see my other blog post Implement Log Analytics using Amazon Kinesis Data Analytics. In this post, we send data to Amazon CloudWatch, and build a real-time dashboard.

Lambda clickstream generator

To generate the workload, you can use a Python Lambda function with random values, simulating a beer-selling application.

The same user ID can have sessions on different devices, such as a tablet, a browser, or a phone application. This information is captured by the device ID. As a result, the data for the Lambda function payload has these parameters: a user ID, a device ID, a client event, and a client timestamp, as shown in the following example.

The following is the code for the Lambda function payload generator, which is scheduled using CloudWatch Events scheduled events:

...
def getReferrer():
    x = random.randint(1,5)
    x = x*50 
    y = x+30 
    data = {}
    data['user_id'] = random.randint(x,y)
    data['device_id'] = random.choice(['mobile','computer', 'tablet', 'mobile','computer'])
    data['client_event'] = random.choice(['beer_vitrine_nav','beer_checkout','beer_product_detail',
    'beer_products','beer_selection','beer_cart'])
    now = datetime.datetime.now()
    str_now = now.isoformat()
    data['client_timestamp'] = str_now
    return data

def lambda_handler(event, context):
...
        data = json.dumps(getReferrer())
        kinesis.put_record(
                StreamName='sessionsclicks',
                Data=data,
                PartitionKey='partitionkey')

As a result, the following payloads are sent to Kinesis Data Analytics:

Using window SQL functions in Kinesis Data Analytics

Grouping sessions lets us combine all the events from a given user ID or a device ID that occurred during a specific time period. Amazon Kinesis Data Analytics SQL queries in your application code execute continuously over in-application streams. You need to specify bounded queries using a window defined in terms of time or rows. These queries are called window SQL functions.

I had three available options for windowed query functions in Kinesis Data Analytics: sliding windows, tumbling windows, and stagger windows. I chose stagger window because it has some good features for the sessionization use case, as follows:

  • Stagger windows open when the first event that matches a partition key condition arrives. So for each key, it evaluates its particular window as opposed to the other window functions that evaluate one unique window for all the partition keys matched.
  • When dealing with clickstreams, you cannot rely on the order that events arrive in the stream, but when the stream was generated. Stagger windows handle the arrival of out-of-order events well. The time when the window is opened and when the window closes is considered based on the age specified, which is measured from the time when the window opened.

To partition by the timestamp, I chose to write two distinct SQL functions.

In Kinesis Data Analytics, SOURCE_SQL_STREAM_001 is by default the main stream from the source. In this case, it’s receiving the source payload from Kinesis Data Streams.

Kinesis Data Analytics SQL – Create a stream

The following function creates a stream to receive the query aggregation result:

-- CREATE a Stream to receive the query aggregation result
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(
  session_id VARCHAR(60),
  user_id INTEGER,
  device_id VARCHAR(10),
  timeagg timestamp,
  events INTEGER,
  beginnavigation VARCHAR(32),
  endnavigation VARCHAR(32),
  beginsession VARCHAR(25),
  endsession VARCHAR(25),
  duration_sec INTEGER
);

Kinesis Data Analytics SQL – Using a SECOND interval “STEP” function

The following function creates the PUMP and inserts it as SELECT to STREAM:

-- Create the PUMP
CREATE OR REPLACE PUMP "WINDOW_PUMP_SEC" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Insert as Select 
    SELECT  STREAM
-- Make the Session ID using user_ID+device_ID and Timestamp
    UPPER(cast("user_id" as VARCHAR(3))|| '_' ||SUBSTRING("device_id",1,3)
    ||cast( UNIX_TIMESTAMP(STEP("client_timestamp" by interval '30' second))/1000 as VARCHAR(20))) as session_id,
    "user_id" , "device_id",
-- create a common rounded STEP timestamp for this session
    STEP("client_timestamp" by interval '30' second),
-- Count the number of client events , clicks on this session
    COUNT("client_event") events,
-- What was the first navigation action
    first_value("client_event") as beginnavigation,
-- what was the last navigation action    
    last_value("client_event") as endnavigation,
-- begining minute and second  
    SUBSTRING(cast(min("client_timestamp") AS VARCHAR(25)),15,19) as beginsession,
-- ending minute and second      
    SUBSTRING(cast(max("client_timestamp") AS VARCHAR(25)),15,19) as endsession,
-- session duration    
    TSDIFF(max("client_timestamp"),min("client_timestamp"))/1000 as duration_sec
-- from the source stream    
    FROM "SOURCE_SQL_STREAM_001"
-- using stagger window , with STEP to Seconds, for Seconds intervals    
    WINDOWED BY STAGGER (
                PARTITION BY "user_id", "device_id", STEP("client_timestamp" by interval '30' second) 
                RANGE INTERVAL '30' SECOND );

Kinesis Data Analytics SQL – Using a MINUTE interval “FLOOR” function

The following code creates the PUMP and inserts as SELECT to STREAM:

-- Create the PUMP
CREATE OR REPLACE PUMP "WINDOW_PUMP_MIN" AS INSERT INTO "DESTINATION_SQL_STREAM"
-- Insert as Select 
SELECT  STREAM
-- Make the Session ID using user_ID+device_ID and Timestamp
UPPER(cast("user_id" as VARCHAR(3))|| '_' ||SUBSTRING("device_id",1,3)
||cast(UNIX_TIMESTAMP(FLOOR("client_timestamp" TO MINUTE))/1000 as VARCHAR(20))) as session_id,
"user_id" , "device_id",
-- create a common rounded timestamp for this session
FLOOR("client_timestamp" TO MINUTE),
-- Count the number of client events , clicks on this session
COUNT("client_event") events,
-- What was the first navigation action
first_value("client_event") as beginnavigation,
-- what was the last navigation action
last_value("client_event") as endnavigation,
-- begining minute and second
SUBSTRING(cast(min("client_timestamp") AS VARCHAR(25)),15,19) as beginsession,
-- ending minute and second
SUBSTRING(cast(max("client_timestamp") AS VARCHAR(25)),15,19) as endsession,
-- session duration
TSDIFF(max("client_timestamp"),min("client_timestamp"))/1000 as duration_sec
-- from the source stream
FROM "SOURCE_SQL_STREAM_001"
-- using stagger window , with floor to Minute, for Minute intervals
WINDOWED BY STAGGER (
            PARTITION BY "user_id", "device_id", FLOOR("client_timestamp" TO MINUTE) 
            RANGE INTERVAL '1' MINUTE);

Sessions

In Kinesis Data Analytics, you can view the resulting data transformed by the SQL, with the sessions identification and information. Session_ID is calculated by User_ID + (3 Chars) of DEVICE_ID + rounded Unix timestamp without the milliseconds.

Automated deployment with AWS CloudFormation

All the steps of this end-to-end solution are included in an AWS CloudFormation template. Fire up the template, add the code on your web server, and voilà, you get real-time sessionization.

This AWS CloudFormation template is intended to be deployed only in the us-east-1 Region.

Create the stack

Step 1: To get started, sign into the AWS Management Console, and then open the stagger window template.

Step 2: On the AWS CloudFormation console, choose Next, and complete the AWS CloudFormation parameters:

  • Stack name: The name of the stack (blog-sessionization or sessions-blog)
  • StreamName: sessionsblog
  • Stream Shard Count: 1 or 2 (1 MB/s) per shard.
  • Bucket Name:  Change to a unique name, for example session-n-bucket-hhug123121.
  • Buffer Interval: 60–900 seconds buffering hint for Kinesis Data Firehose before the data is send to Amazon S3 from Kinesis Data Firehose.
  • Buffer Size: 1–128 MB per file, if the interval is not achieved first.
  • Destination Prefix: Aggregated (internal folder of the bucket to save aggregated data).
  • Base sessions on seconds or minutes: Choose which you want (minutes will start with 1 minute, seconds will start with 30 seconds).

Step 3: Check if the launch has completed, and if it has not, check for errors.

The most common error is when you point to an Amazon S3 bucket that already exists.

Process the data

Step 1: After the deployment, navigate to the solution on the Amazon Kinesis console.

Step 2: Go to the Kinesis Analytics applications page, and choose AnalyticsApp-blog-sessionizationXXXXX, as follows.

Step 3: Choose Run application to start the application.

Step 4: Wait a few seconds for the application to be available, and then choose Application details.

Step 5: On the Application details page, choose Go to SQL results.

Step 6: Examine the SQL code and SOURCE_SQL_STREAM, and change the INTERVAL if you’d like.

Step 7: Choose the Real-time analytics tab to check the DESTINATION_SQL_STREAM results.

 

Step 8: Check the Destination tab to view the AWS Lambda function as the destination to your aggregation.

Step 8: Check the CloudWatch real-time dashboard.

Open the Sessionization-<your cloudformation stack name> dashboard.

Check the number of “events” during the sessions, and the “session duration” behavior from a timeframe. Then you can make decisions, such as whether you need to roll back a new site layout or new features of your application.

Step 9: Open the AWS Glue console and run the crawler that the AWS CloudFormation template created for you.

Choose the crawler job, and then choose Run crawler.

Analyze the data

Step 1: After the job finishes, open the Amazon Athena console and explore the data.

On the Athena console, choose the sessionization database in the list. You should see two tables created based on the data in Amazon S3: rawdata and aggregated.

Step 2: Choose the vertical ellipsis (three dots) on the right side to explore each of the tables, as shown in the following screenshots.

Step 3: Create a view on the Athena console to query only today’s data from your aggregated table, as follows:

CREATE OR REPLACE VIEW clicks_today AS
SELECT 
*
FROM "aggregated" 
WHERE
cast(partition_0 as integer)=year(current_date) and
cast(partition_1 as integer)=month(current_date) and
cast(partition_2 as integer)=day(current_date) ;

The successful query appears on the console as follows:

Step 4: Create a view to query only the current month data from your aggregated table, as in the following example:

CREATE OR REPLACE VIEW clicks_month AS
SELECT 
*
FROM "aggregated" 
WHERE
cast(partition_0 as integer)=year(current_date) and
cast(partition_1 as integer)=month(current_date) ;

The successful query appears as follows:

Step 5: Query data with the sessions grouped by the session duration ordered by sessions, as follows:

SELECT duration_sec, count(1) sessions 
FROM "clicks_today"
where duration_sec>0
group by duration_sec
order by sessions desc;

The query results appear as follows:

Visualize the data

Step 1: Open the Amazon QuickSight console.

If you have never used Amazon QuickSight, perform this setup first.

Step 2: Set up Amazon QuickSight account settings to access Athena and your S3 bucket.

First, select the Amazon Athena check box. Select the Amazon S3 check box to edit Amazon QuickSight access to your S3 buckets.

Choose the buckets that you want to make available, and then choose Select buckets.

Step 3: Choose Manage data.

Step 4: Choose NEW DATASET.

In the list of data sources, choose Athena.

Step 5: Enter daily_session as your data source name.

Step 6: Choose the view that you created for daily sessions, and choose Select.

Step 7: Then you can choose to use either SPICE (cache) or direct query access.

Step 8: Choose beginnavigation and duration_sec as metrics.

Step 9: Choose +Add to add a new visualization.

Step 10: In Visual types, choose the Tree map graph type.

Step 11: For Group by, choose device_id; for Size, choose duration_sec (Sum); and for Color, choose events (Sum).

Summary

In this post, I described how to perform sessionization of clickstream events and analyze them in a serverless architecture. The use of a Kinesis Data Analytics stagger window makes the SQL code short and easy to write and understand. The integration between the services enables a complete data flow with minimal coding.

You also learned about ways to explore and visualize this data using Amazon Athena, AWS Glue, and Amazon QuickSight.

To learn more about the Amazon Kinesis family of use cases, check the Amazon Kinesis Big Data Blog page.

If you have questions or suggestions, please leave a comment below.

Do more with Amazon Kinesis Data Analytics

To explore other ways to gain insights using Kinesis Data Analytics, see Real-time Clickstream Anomaly Detection with Amazon Kinesis Analytics.

 


About the Author

Hugo is an analytics and database specialist solutions architect at Amazon Web Services out of São Paulo (Brazil). He is currently engaged with several Data Lake and Analytics projects for customers in Latin America. He loves family time, dogs and mountain biking.

 

 

 

 

Podcast 293: Diving into Data with Amazon Athena

Post Syndicated from Simon Elisha original https://aws.amazon.com/blogs/aws/podcast-293-diving-into-data-with-amazon-athena/

Do you have lots of data to analyze? Is writing SQL a skill you have? Would you like to analyze massive amounts of data at low cost without capacity planning? In this episode, Simon shares how Amazon Athena can give you options you may not have considered before.

Additional Resources

About the AWS Podcast

The AWS Podcast is a cloud platform podcast for developers, dev ops, and cloud professionals seeking the latest news and trends in storage, security, infrastructure, serverless, and more. Join Simon Elisha and Jeff Barr for regular updates, deep dives and interviews. Whether you’re building machine learning and AI models, open source projects, or hybrid cloud solutions, the AWS Podcast has something for you. Subscribe with one of the following:

Like the Podcast?

Rate us on iTunes and send your suggestions, show ideas, and comments to [email protected]. We want to hear from you!

Our data lake story: How Woot.com built a serverless data lake on AWS

Post Syndicated from Karthik Kumar Odapally original https://aws.amazon.com/blogs/big-data/our-data-lake-story-how-woot-com-built-a-serverless-data-lake-on-aws/

In this post, we talk about designing a cloud-native data warehouse as a replacement for our legacy data warehouse built on a relational database.

At the beginning of the design process, the simplest solution appeared to be a straightforward lift-and-shift migration from one relational database to another. However, we decided to step back and focus first on what we really needed out of a data warehouse. We started looking at how we could decouple our legacy Oracle database into smaller microservices, using the right tool for the right job. Our process wasn’t just about using the AWS tools. More, it was about having a mind shift to use cloud-native technologies to get us to our final state.

This migration required developing new extract, transform, load (ETL) pipelines to get new data flowing in while also migrating existing data. Because of this migration, we were able to deprecate multiple servers and move to a fully serverless data warehouse orchestrated by AWS Glue.

In this blog post, we are going to show you:

  • Why we chose a serverless data lake for our data warehouse.
  • An architectural diagram of Woot’s systems.
  • An overview of the migration project.
  • Our migration results.

Architectural and design concerns

Here are some of the design points that we considered:

  • Customer experience. We always start with what our customer needs, and then work backwards from there. Our data warehouse is used across the business by people with varying level of technical expertise. We focused on the ability for different types of users to gain insights into their operations and to provide better feedback mechanisms to improve the overall customer experience.
  • Minimal infrastructure maintenance. The “Woot data warehouse team” is really just one person—Chaya! Because of this, it’s important for us to focus on AWS services that enable us to use cloud-native technologies. These remove the undifferentiated heavy lifting of managing infrastructure as demand changes and technologies evolve.
  • Responsiveness to data source changes. Our data warehouse gets data from a range of internal services. In our existing data warehouse, any updates to those services required manual updates to ETL jobs and tables. The response times for these data sources are critical to our key stakeholders. This requires us to take a data-driven approach to selecting a high-performance architecture.
  • Separation from production systems. Access to our production systems is tightly coupled. To allow multiple users, we needed to decouple it from our production systems and minimize the complexities of navigating resources in multiple VPCs.

Based on these requirements, we decided to change the data warehouse both operationally and architecturally. From an operational standpoint, we designed a new shared responsibility model for data ingestion. Architecturally, we chose a serverless model over a traditional relational database. These two decisions ended up driving every design and implementation decision that we made in our migration.

As we moved to a shared responsibility model, several important points came up. First, our new way of data ingestion was a major cultural shift for Woot’s technical organization. In the past, data ingestion had been exclusively the responsibility of the data warehouse team and required customized pipelines to pull data from services. We decided to shift to “push, not pull”: Services should send data to the data warehouse.

This is where shared responsibility came in. For the first time, our development teams had ownership over their services’ data in the data warehouse. However, we didn’t want our developers to have to become mini data engineers. Instead, we had to give them an easy way to push data that fit with the existing skill set of a developer. The data also needed to be accessible by the range of technologies used by our website.

These considerations led us to select the following AWS services for our serverless data warehouse:

The following diagram shows at a high level how we use these services.

Tradeoffs

These components together met all of our requirements and enabled our shared responsibility model. However, we made few tradeoffs compared to a lift-and-shift migration to another relational database:

  • The biggest tradeoff was upfront effort vs. ongoing maintenance. We effectively had to start from scratch with all of our data pipelines and introduce a new technology into all of our website services, which required a concerted effort across multiple teams. Minimal ongoing maintenance was a core requirement. We were willing to make this tradeoff to take advantage of the managed infrastructure of the serverless components that we use.
  • Another tradeoff was balancing usability for nontechnical users vs. taking advantage of big data technologies. Making customer experience a core requirement helped us navigate the decision-making when considering these tradeoffs. Ultimately, only switching to another relational database would mean that our customers would have the same experience, not a better one.

Building data pipelines with Kinesis Data Firehose and Lambda

Because our site already runs on AWS, using an AWS SDK to send data to Kinesis Data Firehose was an easy sell to developers. Things like the following were considerations:

  • Direct PUT ingestion for Kinesis Data Firehose is natural for developers to implement, works in all languages used across our services, and delivers data to Amazon S3.
  • Using S3 for data storage means that we automatically get high availability, scalability, and durability. And because S3 is a global resource, it enables us to manage the data warehouse in a separate AWS account and avoid the complexity of navigating multiple VPCs.

We also consume data stored in Amazon DynamoDB tables. Kinesis Data Firehose again provided the core of the solution, this time combined with DynamoDB Streams and Lambda. For each DynamoDB table, we enabled DynamoDB Streams and then used the stream to trigger a Lambda function.

The Lambda function cleans the DynamoDB stream output and writes the cleaned JSON to Kinesis Data Firehose using boto3. After doing this, it converges with the other process and outputs the data to S3. For more information, see How to Stream Data from Amazon DynamoDB to Amazon Aurora using AWS Lambda and Amazon Kinesis Firehose on the AWS Database Blog.

Lambda gave us more fine-grained control and enabled us to move files between accounts:

  • We enabled S3 event notifications on the S3 bucket and created an Amazon SNS topic to receive notifications whenever Kinesis Data Firehose put an object in the bucket.
  • The SNS topic triggered a Lambda function, which took the Kinesis output and moved it to the data warehouse account in our chosen partition structure.

S3 event notifications can trigger Lambda functions, but we chose SNS as an intermediary because the S3 bucket and Lambda function were in separate accounts.

Migrating existing data with AWS DMS and AWS Glue

We needed to migrate data from our existing RDS database to S3, which we accomplished with AWS DMS. DMS natively supports S3 as a target, as described in the DMS documentation.

Setting this up was relatively straightforward. We exported data directly from our production VPC to the separate data warehouse account by tweaking the connection attributes in DMS. The string that we used was this:

"cannedAclForObjects=BUCKET_OWNER_FULL_CONTROL;compressionType=GZIP;addColumnName=true;”

This code gives ownership to the bucket owner (the destination data warehouse account), compresses the files to save on storage costs, and includes all column names. After the data was in S3, we used an AWS Glue crawler to infer the schemas of all exported tables and then compared against the source data.

With AWS Glue, some of the challenges we overcame were these:

  • Unstructured text data, such as forum and blog posts. DMS exports these to CSV. This approach conflicted with the commas present in the text data. We opted to use AWS Glue to export data from RDS to S3 in Parquet format, which is unaffected by commas because it encodes columns directly.
  • Cross-account exports. We resolved this by including the code

"glueContext._jsc.hadoopConfiguration().set("fs.s3.canned.acl", "BucketOwnerFullControl”)”

at the top of each AWS Glue job to grant bucket owner access to all S3 files produced by AWS Glue.

Overall, AWS DMS was quicker to set up and great for exporting large amounts of data with rule-based transformations. AWS Glue required more upfront effort to set up jobs, but provided better results for cases where we needed more control over the output.

If you’re looking to convert existing raw data (CSV or JSON) into Parquet, you can set up an AWS Glue job to do that. The process is described in the AWS Big Data Blog post Build a data lake foundation with AWS Glue and Amazon S3.

Bringing it all together with AWS Glue, Amazon Athena, and Amazon QuickSight

After data landed in S3, it was time for the real fun to start: actually working with the data! Can you tell I’m a data engineer? For me, a big part of the fun was exploring AWS Glue:

  • AWS Glue handles our ETL job scheduling.
  • AWS Glue crawlers manage the metadata in the AWS Glue Data Catalog.

Crawlers are the “secret sauce” that enables us to be responsive to schema changes. Throughout the pipeline, we chose to make each step as schema-agnostic as possible, which allows any schema changes to flow through until they reach AWS Glue.

However, raw data is not ideal for most of our business users, because it often has duplicates or incorrect data types. Most importantly, the data out of Firehose is in JSON format, but we quickly observed significant query performance gains from using Parquet format. Here, we used one of the performance tips in the Big Data Blog post Top 10 performance tuning tips for Amazon Athena.

With our shared responsibility model, the data warehouse and BI teams are responsible for the final processing of data into curated datasets ready for reporting. Using Lambda and AWS Glue enables these teams to work in Python and SQL (the core languages for Amazon data engineering and BI roles). It also enables them to deploy code with minimal infrastructure setup or maintenance.

Our ETL process is as follows:

  • Scheduled triggers.
  • Series of conditional triggers that control the flow of subsequent jobs that depend on previous jobs.
  • A similar pattern across many jobs of reading in the raw data, deduplicating the data, and then writing to Parquet. We centralized this logic by creating a Python library of functions and uploading it to S3. We then included that library in the AWS Glue job as an additional Python library. For more information on how to do this, see Using Python Libraries with AWS Glue in the AWS Glue documentation.

We also migrated complex jobs used to create reporting tables with business metrics:

  • The AWS Glue use of PySpark simplified the migration of these queries, because you can embed SparkSQL queries directly in the job.
  • Converting to SparkSQL took some trial and error, but ultimately required less work than translating SQL queries into Spark methods. However, for people on our BI team who had previously worked with Pandas or Spark, working with Spark dataframes was a natural transition. As someone who used SQL for several years before learning Python, I appreciate that PySpark lets me quickly switch back and forth between SQL and an object-oriented framework.

Another hidden benefit of using AWS Glue jobs is that the AWS Glue version of Python (like Lambda) already has boto3 installed. Thus, ETL jobs can directly use AWS API operations without additional configuration.

For example, some of our longer-running jobs created read inconsistency if a user happened to query that table while AWS Glue was writing data to S3. We modified the AWS Glue jobs to write to a temporary directory with Spark and then used boto3 to move the files into place. Doing this reduced read inconsistency by up to 90 percent. It was great to have this functionality readily available, which may not have been the case if we managed our own Spark cluster.

Comparing previous state and current state

After we had all the datasets in place, it was time for our customers to come on board and start querying. This is where we really leveled up the customer experience.

Previously, users had to download a SQL client, request a user name and password, set it up, and learn SQL to get data out. Now, users just sign in to the AWS Management Console through automatically provisioned IAM roles and run queries in their browser with Athena. Or if they want to skip SQL altogether, they can use our Amazon QuickSight account with accounts managed through our pre-existing Active Directory server.

Integration with Active Directory was a big win for us. We wanted to enable users to get up and running without having to wait for an account to be created or managing separate credentials. We already use Active Directory across the company for access to multiple resources. Upgrading to Amazon QuickSight Enterprise Edition enabled us to manage access with our existing AD groups and credentials.

Migration results

Our legacy data warehouse was developed over the course of five years. We recreated it as a serverless data lake using AWS Glue in about three months.

In the end, it took more upfront effort than simply migrating to another relational database. We also dealt with more uncertainty because we used many products that were relatively new to us (especially AWS Glue).

However, in the months since the migration was completed, we’ve gotten great feedback from data warehouse users about the new tools. Our users have been amazed by these things:

  • How fast Athena is.
  • How intuitive and beautiful Amazon QuickSight is. They love that no setup is required—it’s easy enough that even our CEO has started using it!
  • That Athena plus the AWS Glue Data Catalog have given us the performance gains of a true big data platform, but for end users it retains the look and feel of a relational database.

Summary

From an operational perspective, the investment has already started to pay off. Literally: Our operating costs have fallen by almost 90 percent.

Personally, I was thrilled that recently I was able to take a three-week vacation and didn’t get paged once, thanks to the serverless infrastructure. And for our BI engineers in addition to myself, the S3-centric architecture is enabling us to experiment with new technologies by integrating seamlessly with other services, such as Amazon EMR, Amazon SageMaker, Amazon Redshift Spectrum, and Lambda. It’s been exciting to see how these services have grown in the time since we’ve adopted them (for example, the recent AWS Glue launch of Amazon CloudWatch metrics and Athena’s launch of views).

We are thrilled that we’ve invested in technologies that continue to grow as we do. We are incredibly proud of our team for accomplishing this ambitious migration. We hope our experience can inspire other engineers to dive in to building a data lake of their own.

For additional information, see these similar AWS Big Data blog posts:


About the authors

Chaya Carey is a data engineer at Woot.com. At Woot, she’s responsible for managing the data warehouse and other scalable data solutions. Outside of work, she’s passionate about Seattle’s bar and restaurant scene, books, and video games.

 

 

 

Karthik Odapally is a senior solutions architect at AWS. His passion is to build cost-effective and highly scalable solutions on the cloud. In his spare time, he bakes cookies and cupcakes for family and friends here in the PNW. He loves vintage racing cars.

 

 

 

 

Analyze and visualize nested JSON data with Amazon Athena and Amazon QuickSight

Post Syndicated from Mariano Kamp original https://aws.amazon.com/blogs/big-data/analyze-and-visualize-nested-json-data-with-amazon-athena-and-amazon-quicksight/

Although structured data remains the backbone for many data platforms, increasingly unstructured or semistructured data is used to enrich existing information or to create new insights. Amazon Athena enables you to analyze a wide variety of data. This includes tabular data in comma-separated value (CSV) or Apache Parquet files, data extracted from log files using regular expressions, and JSON-formatted data. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

In this blog post, I show you how to use JSON-formatted data and translate a nested data structure into a tabular view. For data engineers, using this type of data is becoming increasingly important. For example, you can use API-powered data feeds from operational systems to create data products. Such data can also help to add more finely grained facets to your understanding of customers and interactions. Understanding the fuller picture helps you better understand your customers and tailor experiences or predict outcomes.

To illustrate, I use an end-to-end example. It processes financial data retrieved from an API operation that is formatted as JSON. We analyze the data in Amazon Athena and visualize the results in Amazon QuickSight. Along the way, we compare and contrast alternative options.

The result looks similar to what you see below.

Analyzing JSON-formatted data

For our end-to-end example, we use financial data as provided by IEX. The financials API call pulls income statement, balance sheet, and cash flow data from four reported years of a stock.

Following, you can see example output. On the top level is an attribute called symbol, which identifies the stock described here: Apple. On the same level is an attribute called financials. This is a data container. The actual information is one level below, including such attributes as reportDatecashflow, and researchAndDevelopment.

The data container is an array. In the example following, financial data for only one year is shown. However, the {...} indicates that there might be more. In our case, data for four years is returned when making the actual API call.

{
  "symbol": "AAPL",
  "financials": [
    {
      "reportDate": "2017-03-31",
      "grossProfit": 20591000000,
      "costOfRevenue": 32305000000,
      "operatingRevenue": 52896000000,
      "totalRevenue": 52896000000,
      "operatingIncome": 14097000000,
      "netIncome": 11029000000,
      "researchAndDevelopment": 2776000000,
      "operatingExpense": 6494000000,
      "currentAssets": 101990000000,
      "totalAssets": 334532000000,
      "totalLiabilities": 200450000000,
      "currentCash": 15157000000,
      "currentDebt": 13991000000,
      "totalCash": 67101000000,
      "totalDebt": 98522000000,
      "shareholderEquity": 134082000000,
      "cashChange": -1214000000,
      "cashFlow": 12523000000,
      "operatingGainsLosses": null
    } // , { ... }
  ]
}

Data is provided for free by IEX (see the IEX Terms of Use).

It has become commonplace to use external data from API operations as feeds into Amazon S3. Although this is usually done in an automated fashion, in our case we manually acquire the API call’s results.

To download the data, you can use a script, described following.

Alternatively, you can click the following three links: 123. You can then save the resulting JSON files to your local disk, then upload the JSON to an Amazon S3 bucket. In my case, the location of the data is s3://athena-json/financials, but you should create your own bucket. The result looks similar to this:

You can also use a Unix-like shell on your local computer or on an Amazon EC2 instance to populate a S3 location with the API data:

$ curl -s "https://api.iextrading.com/1.0/stock/aapl/financials?period=annual" > aapl.json 
$ curl -s "https://api.iextrading.com/1.0/stock/nvda/financials?period=annual" > nvda.json 
$ curl -s "https://api.iextrading.com/1.0/stock/fb/financials?period=annual" > fb.json 

$ ls -ltrh *.json
-rw-r--r--  1 mkamp  ANT\Domain Users   2.2K Nov 21 16:57 aapl.json
-rw-r--r--  1 mkamp  ANT\Domain Users   2.1K Nov 21 16:57 nvda.json
-rw-r--r--  1 mkamp  ANT\Domain Users   2.1K Nov 21 16:57 fb.json 

$ aws s3 sync . s3://athena-json/financials/ --exclude "*" --include "*.json"
upload: ./aapl.json to s3://athena-json/financials/aapl.json   
upload: ./nvda.json to s3://athena-json/financials/nvda.json   
upload: ./fb.json to s3://athena-json/financials/fb.json       

$ aws s3 ls s3://athena-json/financials/
2018-11-21 16:58:30       2245 aapl.json
2018-11-21 16:58:30       2162 fb.json
2018-11-21 16:58:30       2150 nvda.json

Mapping JSON structures to table structures

Now we have the data in S3. Let’s make it accessible to Athena. This is a simple two-step process:

  1. Create metadata. Doing so is analogous to traditional databases, where we use DDL to describe a table structure. This step maps the structure of the JSON formatted data to columns.
  2. Specify where to find the JSON files.

We can use all information of the JSON file at this time, or we can concentrate on mapping the information that we need today. The new data structure in Athena overlays the files in S3 only virtually. Therefore, even though we just map a subset of the contained information at this time, all information is retained in the files and can be used later on as needed. This is a powerful concept and enables an iterative approach to data modeling.

You can use the following SQL statement to create the table. The table is then named financials_raw—see (1) following. We use that name to access the data from this point on. We map the symbol and the list of financials as an array and some figures. We define that the underlying files are to be interpreted as JSON in (2), and that the data lives following s3://athena-json/financials/ in (3).

CREATE EXTERNAL TABLE financials_raw ( -- (1)
    symbol string,
    financials array<
        struct<reportdate: string,
             grossprofit: bigint,
             totalrevenue: bigint,
             totalcash: bigint,
             totaldebt: bigint,
             researchanddevelopment: bigint>>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' -- (2)
LOCATION 's3://athena-json/financials/' -- (3)

You can run this statement using the Athena console as depicted following:

After you run the SQL statement on the left, the just-created table financials_raw is listed under the heading Tables. Now let’s have a look what’s in this table. Choose the three vertical dots to the right of the table name and choose Preview table. Athena creates a SELECT statement to show 10 rows of the table:

Looking at the output, you can see that Athena was able to understand the underlying data in the JSON files. Specifically, we can see two columns:

  • symbol, which contains flat data, the symbol of the stock
  • financials, which now contains an array of financials reports

If you look closely and observe the reportdate attribute, you find that the row contains more than one financial report.

Even though the data is nested—in our case financials is an array—you can access the elements directly from your column projections:

SELECT
  symbol, 
  financials[1].reportdate one_report_date, -- indexes start with 1
  financials[1].totalrevenue one_total_revenue,
  financials[2].reportdate another_report_date,
  financials[2].totalrevenue another_total_revenue
FROM
  financials_raw
ORDER BY
  1 -- the 1 indicates to order by the first column

As you can see preceding, all data is accessible. From this point on, it is structured, nested data, but not JSON anymore.

It’s still not tabular, though. We come back to this in a minute. First let’s have a look at a different way that would also have brought us to this point.

Alternative approach: Deferring the JSON extraction to query time

There are many different ways to use JSON formatted data in Athena. In the previous section, we use a simple, explicit, and rigid approach. In contrast, we now see a rather generic, dynamic approach.

In this case, we defer the final decisions about the data structures from table design to query design. To do that, we leave the data untouched in its JSON form as long as possible. As a consequence, the CREATE TABLE statement is much simpler than in the previous section:

CREATE EXTERNAL TABLE financials_raw_json (
  -- Using a mixed approach, extracting the symbol for 
  -- convenience directly from the JSON data
  symbol string,
  -- But otherwise storing the RAW JSON Data as string
  financials string 
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://athena-json/financials/' 
Executing
SELECT * FROM financials_raw_json

This shows that the data is accessible:

Even though the data is now accessible, it is only treated as a single string or varchar. This type is generic and doesn’t reflect the rich structure and the attributes of the underlying data.

But before diving into the richness of the data, I want to acknowledge that it’s hard to see from the query results which data type a column is. When using your queries, the focus is on the actual data, so seeing the data types all the time can be distracting. However in this case, when creating your queries and data structures, it is useful to use typeof. For example, use the following SQL statement:

SELECT typeof(financials) FROM financials_raw_json

Using this SQL statement, you can verify for yourself that the column is treated as a varchar.

To now introduce the data structure during query design, Athena provides specific functionality covered in the documentation to work with JSON formatted data.

The following table shows how to extract the data, starting at the root of the record in the first example. The table then shows additional examples on how to navigate further down the document tree. The first column shows the expression that can be used in a SQL statement like SELECT <expr> FROM financials_raw_json, where <expr> is to be replaced by the expression in the first column. The remaining columns explain the results.

Expression ResultTypeDescription 
json_extract(financials, '$')[{.., "reportdate":"2017-12-31",..},{..}, {..}, {.., "reportdate":"2014-12-31", ..}]jsonSelecting the root of the document (financials).
json_extract(financials, '$[0]'){.., "reportdate":"2017-12-31", "totalcash":"41711000000", ..}jsonSelecting the first element of the financials array. The indexing starts at 0, as opposed to 1, which is customary in SQL.
json_extract(financials, '$[0].reportdate')"2017-12-31"jsonSelecting the totalcash attribute of the first element of the financials array.
json_extract_scalar(financials, '$[0].reportdate')2017-12-31varcharAs preceding, but now the type became a varchar because we are now using json_extract_scalar.
json_size(financials, '$')4bigintThe size of the financials array; 4 represents the four years contained in each JSON.

To implement our example, we now have more than enough skills and we can leave it at that.
However, there are more functions to go back and forth between JSON and Athena. You can find more information in the Apache Presto documentation. Athena is our managed service based on Apache Presto. Thus, when looking for information it is also helpful to consult Presto documentation.

Let’s put the JSON functions introduced preceding to use:

SELECT 
  symbol,
  -- indexes start with 0, as is customary with JavaScript/JSON
  json_extract_scalar(financials, '$[0].reportdate') one_report_date,  
  json_extract_scalar(financials, '$[0].totalrevenue') one_total_revenue,
  json_extract_scalar(financials, '$[1].reportdate') another_report_date,
  json_extract_scalar(financials, '$[1].totalrevenue') another_total_revenue
FROM
  financials_raw_json
ORDER BY 
  1

As with the first approach, we still have to deal with the nested data inside the rows. By doing so, we can get rid of the explicit indexing of the financial reports as used preceding.

Comparing approaches

If you go back and compare our latest SQL query with our earlier SQL query, you can see that they produce the same output. On the surface, they even look alike because they project the same attributes. But a closer look reveals that the first statement uses a structure that has already been created during CREATE TABLE. In contrast, the second approach interprets the JSON document for each column projection as part of the query.

Interpreting the data structures during the query design enables you to change the structures across different SQL queries or even within the same SQL query. Different column projections in the same query can interpret the same data, even the same column, differently. This can be extremely powerful, if such a dynamic and differentiated interpretation of the data is valuable. On the other hand, it takes more discipline to make sure that during maintenance different interpretations are not introduced by accident.

In both approaches, the underlying data is not touched. Athena only overlays the physical data, which makes changing the structure of your interpretation fast. Which approach better suits you depends on the intended use.

To determine this, you can ask the following questions. As a rule of thumb, are your intended users data engineers or data scientists? Do they want to experiment and change their mind frequently? Maybe they even want to have different use case–specific interpretations of the same data, Then they would fare better with the latter approach of leaving the JSON data untouched until query design. They would also then likely be willing to invest in learning the JSON extensions to gain access to this dynamic approach.

If on the other hand your users have established data sources with stable structures, the former approach fits better. It enables your users to query the data with SQL only, with no need for information about the underlying JSON data structures.

Use the following side-by-side comparison to choose the appropriate approach for your case at hand.

Data structure interpretation happens at table creation timeData structure interpretation happens at query creation time
The interpretation of data structures is scoped to the whole table. All subsequent queries use the same structures.The data interpretation is scoped to an individual query. Each query can potentially interpret the data differently
The interpretation of data structures evolves centrally.The interpretation of data structures can be changed on a per-query basis so that different queries can evolve with different speeds and into different directions.
It is easy to provide a single version of the truth, because there is just a single interpretation of the underlying data structures.A single version of the truth is hard to maintain and needs coordination across the different queries using the same data. Rapidly evolving data interpretations can easily go hand-in-hand with an evolving understanding of use cases.
Applicable to well-understood data structures that are slowly and consciously evolving. A single interpretation of the underlying data structures is valued more than change velocity.Applicable to experimental, rapidly evolving interpretations of data structures and use cases. Change velocity is more important than a single, stable interpretation of data structures.
Production data pipelines benefit from this approach.Exploratory data analysis benefit from this approach.

Both approaches can serve well at different times in the development lifecycle, and each approach can be migrated to the other.

In any case, this is not a black and white decision. In our example, we keep the tables financials_raw and financials_raw_json, both accessing the same underlying data. The data structures are just metadata, so keeping both around doesn’t store the actual data redundantly.

For example, financials_raw might be used by data engineers as the source of productive pipelines where the attributes and their meaning are well-understood and stable across use cases. At the same time, data scientists might use financials_raw_json for exploratory data analysis where they refine their interpretation of the data rapidly and on a per-query basis.

Working with nested data

At this point, we can access data that is JSON formatted through Athena. However, the underlying structure is still hierarchical, and the data is still nested. For many use cases, especially for analytical uses, expressing data in a tabular fashion—as rows—is more natural. This is also the standard way when using SQL and business intelligence tools. To unnest the hierarchical data into flattened rows, we need to reconcile these two approaches.

To simplify, we can set the financial reports example aside for the moment. Instead, let’s experiment with a narrower example. Reconciling different ways of thinking can sometimes be hard to follow. The narrow example and hands-on experimentation should make this easier. Copy the code we discuss into the Athena console to play along.

The following code is self-contained and uses synthetic data. This lends itself particular well to experimentation:

SELECT 
  parent, children
FROM (
  VALUES
    ('Parent 1', ARRAY['Child 1.1', 'Child 1.2']),
    ('Parent 2', ARRAY['Child 2.1', 'Child 2.2', 'Child 2.3'])
) AS t(parent, children)

Looking at the data, this is similar to our situation with the financial reports. There we had multiple financial reports for one stock symbol, multiple children for each parent. To flatten the data, we first unnest the individual children for each parent. Then we cross-join each child with its parent, which creates an individual row for each child that contains the child and its parent.

In the following SQL statement, UNNEST takes the children column from the original table as a parameter. It creates a new dataset with the new column child, which is later cross-joined. The enclosing SELECT statement can then reference the new child column directly.

SELECT 
  parent, child
FROM (
  VALUES
    ('Parent 1', ARRAY['Child 1.1', 'Child 1.2']),
    ('Parent 2', ARRAY['Child 2.1', 'Child 2.2', 'Child 2.3'])
) AS t(parent, children)
CROSS JOIN UNNEST(children) AS t(child)

If you played along with the simplified example, it should be easy now to see how this method can be applied to our financial reports:

SELECT 
    symbol,
    report
FROM 
    financials_raw
CROSS JOIN UNNEST(financials) AS t(report)

Bam! Now that was easy, wasn’t it?

Using this as a basis, let’s select the data that we want to provide to our business users and turn the query into a view. The underlying data has still not been touched, is still formatted as JSON, and is still expressed using nested hierarchies. The new view makes all of this transparent and provides a tabular view.

Let’s create the view:

CREATE OR REPLACE VIEW financial_reports_view AS
SELECT 
  symbol,
  CAST(report.reportdate AS DATE) reportdate,
  report.totalrevenue,
  report.researchanddevelopment
FROM 
  financials_raw
CROSS JOIN UNNEST(financials) AS t(report)
ORDER BY 1 ASC, 2 DESC
and then check our work:
SELECT
  *
FROM
  financial_reports_view

This is a good basis and acts as an interface for our business users.

The previous steps were based on the initial approach of mapping the JSON structures directly to columns. Let’s also explore the alternative path that we discussed before. How does this look like when we keep the data JSON formatted for longer, as we did in our alternative approach?

For variety, this approach also shows json_parse, which is used here to parse the whole JSON document and converts the list of financial reports and their contained key-value pairs into an ARRAY(MAP(VARCHAR, VARCHAR)). This array in turn is then used in the unnesting and its children eventually in the column projections. With element_at elements in the JSON, you can access the value by name. You can also see the use of WITH to define subqueries, helping to structure the SQL statement.

If you run the following query, it returns the same result as the approach preceding. You can also turn this query into a view.

WITH financial_reports_parsed AS (
  SELECT 
    symbol,   
    CAST(json_parse(financials) AS ARRAY(MAP(VARCHAR, VARCHAR))) financial_reports
  FROM         
    financials_raw_json)
SELECT 
  symbol,
  CAST(element_at(report, 'reportdate') AS DATE) reportdate,  
  element_at(report, 'totalrevenue') totalrevenue,
  element_at(report, 'researchanddevelopment') researchanddevelopment
FROM
  financial_reports_parsed
CROSS JOIN UNNEST(financial_reports) AS t(report)
ORDER BY 1 ASC, 2 DESC

Visualizing the data

Let’s get back to our example. We created the financial_reports_view that acts as our interface to other business intelligence tools. In this blog post, we use it to provide data for visualization using Amazon QuickSight. Amazon QuickSight can directly access data through Athena. Its pay-per-session pricing enables you to put analytical insights into the hands of everyone in your organization.

Let’s set this up together. We first need to select our view to create a new data source in Athena and then we use this data source to populate the visualization.

We are creating the visual that is displayed at the top of this post. If you want just the data and you’re not interested in condensing data to a visual story, you can skip ahead to the post conclusion section.

Creating an Athena data source in Amazon QuickSight

Before we can use the data in Amazon QuickSight, we need to first grant access to the underlying S3 bucket. If you haven’t done so already for other analyses, see our documentation on how to do so.

On the Amazon QuickSight home page, choose Manage data from the upper-right corner, then choose New data set and pick Athena as data source. In the following dialog box, give the data source a descriptive name and choose Create data source.

Choose the default database and our view financial_reports_view, then choose Select to confirm. If you used multiple schemas in Athena, you could pick them here as your database.

In the next dialog box, you can choose if you want to import the data into SPICE for quicker analytics or to directly query the data.

SPICE is the super-fast, parallel, in-memory calculation engine in Amazon QuickSight. For our example, you can go either way. Using SPICE results in the data being loaded from Athena only once, until it is either manually refreshed or automatically refreshed (using a schedule). Using direct query means that all queries are run on Athena.

Our view now is a data source for Amazon QuickSight and we can turn to visualizing the data.

Creating a visual in Amazon QuickSight

You can see the data fields on the left. Notice that reportdate is shown with a calendar symbol and researchanddevelopment as a number. Amazon QuickSight picks up the data types that we defined in Athena.

The canvas on the right is still empty. Before we populate it with data, let’s select Line Chart from the available visual types.

To populate the graph, drag and drop the fields from the field list on the left onto their respective destinations. In our case, we put the reportdate onto the X axis well. We put our metric researchanddevelopment towards the value well, so that it’s displayed on the y-axis. We put the symbol onto the Color well, helping us to tell the different stocks apart.

An initial version of our visualization is now shown on the canvas. Drag the handle at the lower-right corner to adjust the size to your liking. Also, pick Format visual from the drop-down menu in the upper right corner. Doing this opens a dialog with more options to enhance the visualization.

Expand the Data labels section and choose Show data labels. Your changes are immediately reflected in the visualization.

You can also interact with the data directly. Given that Amazon QuickSight picked up on the reportdate being a DATE, it provides a date slider at the bottom of the visual. You can use this slider to adjust the time frame shown.

You can add further customizations. These can include changing the title of the visual or the axis, adjusting the size of the visual, and adding additional visualizations. Other possible customizations are adding data filters and capturing the combination of visuals into a dashboard. You might even turn the dashboard into a scheduled report that gets sent out once a day by email.

Conclusion

We have seen how to use JSON formatted data that is stored in S3. We contrasted two approaches to map the JSON formatted data to data structures in Athena:

  • Mapping the JSON structures at table creation time to columns.
  • Leaving the JSON structures untouched and instead mapping the contents as a whole to a string, so that the JSON contents remains intact. The JSON contents can later be interpreted and the structures at query creation time mapped to columns.

The approaches are not mutually exclusive, but can be used in parallel for the same underlying data.

Furthermore, JSON data can be hierarchical, which must be unnested and cross-joined to provide the data in a flattened, tabular fashion.

For our example, we provided the data in a tabular fashion and created a view that encapsulates the transformations, hiding the complexity from its users. We used the view as an interface to Amazon QuickSight. Amazon QuickSight directly accesses the Athena view and visualizes the data.

More on using JSON

JSON features blend nicely into the existing SQL oriented functions in Athena, but are not ANSI SQL compatible. Also, the JSON file is expected to carry each record in a separate line (see the JSON lines website).

In the documentation for the JSON SerDe Libraries, you can find how to use the property ignore.malformed.json to indicate if malformed JSON records should be turned into nulls or an error. Further information about the two possible JSON SerDe implementations is linked in the documentation. If necessary, you can dig deeper and find out how to take explicit control of how column names are parsed, for example to avoid clashing with reserved keywords.

How to efficiently store data

During our excursions, we never touched the actual data. We only defined different ways to interpret the data. This approach works well for us here, because we are only dealing with a small amount of data. If you want to use these concepts at scale, consider how to apply partitioning of data and possibly how to consolidate data into larger files.

Depending on the data, also consider whether storing it in a columnar fashion, using for example Apache Parquet might be beneficial. You can find additional practical suggestions in our AWS Big Data Blog post Top 10 Performance Tuning Tips for Amazon Athena.

All these options don’t replace what you learned in this article, but benefit from your being able to compare and contrast JSON formatted data and nested data. They can be used in a complementary fashion.

Further, this AWS Big Data Blog post walks you through a real-world scenario showing how to store and query data efficiently.


About the Author

Mariano Kamp is a principal solutions architect with Amazon Web Services. He works with financial services customers in Germany and has more than 25 years of industry experience covering a wide range of technologies. His area of depth is Analytics.

In his spare time, Mariano enjoys hiking with his wife.

 

 

 

 

Chasing earthquakes: How to prepare an unstructured dataset for visualization via ETL processing with Amazon Redshift

Post Syndicated from Ian Funnell original https://aws.amazon.com/blogs/big-data/chasing-earthquakes-how-to-prepare-an-unstructured-dataset-for-visualization-via-etl-processing-with-amazon-redshift/

As organizations expand analytics practices and hire data scientists and other specialized roles, big data pipelines are growing increasingly complex. Sophisticated models are being built using the troves of data being collected every second.

The bottleneck today is often not the know-how of analytical techniques. Rather, it’s the difficulty of building and maintaining ETL (extract, transform, and load) jobs using tools that might be unsuitable for the cloud.

In this post, I demonstrate a solution to this challenge. I start with a noisy semistructured dataset of seismic events, spanning several years and recorded at different locations across the globe. I set out to obtain broad insights about the nature of the rocks forming the Earth’s surface itself—the tectonic plate structure—to be visualized using the mapping capability in Amazon QuickSight.

To accomplish this, I use several AWS services, orchestrated together using Matillion ETL for Amazon Redshift:

Tectonic plate structure context

An earthquake is caused by a buildup of pressure that gets suddenly released. Earthquakes tend to be more severe at the boundaries of destructive tectonic plates. These boundaries are formed when a heavier and denser oceanic plate collides with a lighter continental plate, or when two oceanic plates collide. Due to the difference in density, the oceanic lithosphere is pushed underneath the continental plate, forming what is called a subduction zone. (See the following diagram.) In subduction zones, earthquakes can occur at depths as great as 700 kilometers.

Photography by KDS4444 [CC BY-SA 4.0 (https://creativecommons.org/licenses/by-sa/4.0)], from Wikimedia Commons

For our analysis, we ultimately want to visualize the depth of an earthquake focus to locate subduction zones, and therefore find places on earth with the most severe earthquakes.

Seismographic data source

The data source is from the International Federation of Digital Seismograph Networks (FDSN). The event data is in JSON format (from the European Mediterranean Seismological Centre, or EMSC). An external process accumulates files daily into an Amazon S3 bucket, as shown following.

Each individual file contains all the seismic events for one day—usually several hundred—in an embedded array named “features,” as shown in the following example:

{
  "type": "FeatureCollection",
  "metadata": {
    "totalCount": 103
  },
  "features": [
    {
      "geometry": {
        "type": "Point",
        "coordinates": [26.76, 45.77, -140]
      },
      "type": "Feature",
      "id": "20180302_0000103",
      "properties": {
        "lastupdate": "2018-03-02T23:27:00.0Z",
        "lon": 26.76, "lat": 45.77, "depth": 140,
        "mag": 3.7,
        "time": "2018-03-02T23:22:52.1Z",
        "flynn_region": "ROMANIA"
      }
    },
    {
      "geometry": {
        "type": "Point",

Architecture overview

Athena reads and flattens the S3 data and makes it available for Matillion ETL to load into Amazon Redshift via JDBC. Matillion orchestrates this data movement, and it also provides a graphical framework to design and build the more complex data enrichment and aggregation steps to be performed by Amazon Redshift. Finally, the prepared data is queried by Amazon QuickSight for visualization.

Amazon Athena setup

You can use Athena to query data in S3 using standard SQL, via a serverless infrastructure that is managed entirely by AWS on your behalf. Before you can query your data, start by creating an external table. By doing this, you are defining the schema to apply to the data when it is being queried.

You can choose to use an AWS Glue crawler to assist in automatically discovering the schema and format of your source data files.

The following is the CREATE TABLE statement that you can copy and paste into the Athena console to create the schema needed to query the seismic data. Make sure that you substitute the correct S3 path to your seismic data in the LOCATION field of the statement.

CREATE EXTERNAL TABLE `sp_events`(
  `type` string COMMENT 'from deserializer', 
  `metadata` struct<totalcount:int> COMMENT 'from deserializer', 
  `features` array<struct<geometry:struct<type:string,coordinates:array<double>>,type:string,id:string,properties:struct<lastupdate:string,magtype:string,evtype:string,lon:double,auth:string,lat:double,depth:double,unid:string,mag:double,time:string,source_id:string,source_catalog:string,flynn_region:string>>> COMMENT 'from deserializer')
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://your-bucket/SeismicPortal'

After the table is created, you are ready to query it. Athena uses the in-memory, distributed SQL engine called Apache Presto. It provides the ability to unnest arrays, which you use next.

Transfer to Amazon Redshift

The embedded array in every source record gets flattened out and converted to individual records during the JDBC export (download the .jar file) into Amazon Redshift. You use a Matillion ETL Database Query component to assist with the data transfer during this step, as shown in the following image.

This component simplifies ETL by automating the following steps:

Runs the SQL SELECT statement (shown in the following example).

  1. Streams query results across the network from Athena and into temporary storage in S3.
  2. Performs a bulk data load into Amazon Redshift.

Athena executes the following SQL statement:

SELECT f.id,
	   f.properties.time AS event_time,
	   f.properties.lastupdate,
   f.properties.lon,
   f.properties.lat,
   f.properties.depth,
   f.properties.mag,
   f.properties.flynn_region
FROM “seismic”.”sp_events”
CROSS JOIN UNNEST (features) as t(f)

The CROSS JOIN UNNEST syntax flattens the embedded array, generating hundreds of individual event records per day.

Now that the data has been copied and flattened into individual event records (shown in the following image), it’s ready for enrichment and aggregation.

Data enrichment

Earthquakes occur along a continuous range of spatial coordinates. In order to aggregate them, as we’ll be doing very soon, it’s necessary to first group them together. A convenient method is to assign every event into a Universal Transverse Mercator (UTM) zone. These zones are six-degree bands of longitudes that convert the spherical latitude/longitude coordinates into a 2D representation. Performing this conversion provides good granularity for visualization later.

The calculation to convert a spherical longitude/latitude coordinate into a two-dimensional UTM coordinate is complex. It can be performed ideally using an Amazon Redshift user-defined function (UDF). I chose a UDF for the ability to invoke it, via a Matillion component, in the next step.

CREATE OR REPLACE FUNCTION f_ll_utm (Lat float, Long float)
      RETURNS VARCHAR
STABLE
AS $$
From math import pi, sin, cos, tan, sqrt

_deg2rad = pi / 180.0
_rad2deg = 180.0 / pi

_EquatorialRadius = 1
_eccentricitySquared = 2
_ellipsoid = [ “WGS-84”, 6378137, 0.00669438]

The UDF has to return three pieces of information:

  • UTM Zone code
  • Easting (x-axis measurement in meters)
  • Northing (ditto, for the y-axis)

A scalar UDF can only return a single value. Therefore the three results were returned as a pipe-delimited string, in which the three values are pipe-separated:

To bring the values out into individual fields, the UDF is first invoked using a Matillion ETL Calculator component, followed by a field splitter and a Calculator to perform data type conversion and rounding.

Data aggregation

To reiterate, we’re interested in the depth of earthquake focus specifically on destructive plate boundaries. Knowing the depth helps us estimate the potential severity of earthquakes.

We need to find the average event depth within each UTM zone, in the expectation that a spatial pattern will appear that will highlight the subduction zones.

The last three steps in the Matillion transformation (shown in the following diagram) perform the necessary aggregation, add a depth quartile, and create an output table from the resulting data.

The ”Aggregate to UTM ref” step gets Amazon Redshift to perform a GROUP BY function in SQL, which approximates every event to the appropriate UTM zone. While doing this aggregation, you simultaneously do the following:

  • Count the events (which determines the size of the visual representation).
  • Find the average depth (which determines the color of the visual representation).
  • Determine the average latitude and longitude (which approximates to the center of the UTM zone, and determines the position of the visual representation).

The following image shows the aggregation type for each column:

Average depth is a useful measure, but to maximize the visual impact of the final presentation, we also take the opportunity to rank the results into quartiles. This allows the zones with the deepest quartile to stand out consistently on the map.

NTILE(4) OVER (ORDER BY "avg_depth")

Amazon Redshift is great at performing this type of analytics, which is delivered inside another Matillion ETL Calculator component.

The Recreate Output step materializes the dataset into an Amazon Redshift table, ready for Amazon QuickSight to visualize.

Amazon QuickSight visualization

The Amazon QuickSight “points on map” visualization is perfect for this 2D rendering. The values for the field wells come straight from the aggregated data in Amazon Redshift:

  • Geospatial — the average lat/long per UTM grid.
  • Size — the record count, in other words, the number of events in that UTM zone.
  • Color — the Depth Ntile, with the fourth quartile in pink.

The resulting map shows the global subduction zones highlighted clearly in pink, these being the areas with the deepest earthquake’s focus on average.

Recap and summary

In this post, I used seismological data as an example to explore challenges around the visualization of unstructured data and to provide best practices. I suggested a way to overcome these challenges with an architecture that is also applicable for datasets from a wide array of sources, beyond geology. I then explored how to orchestrate activities of different data processing tasks between S3, Athena, and Amazon Redshift using Matillion ETL for Amazon Redshift.

If you’re trying to solve similar analytics challenges and want to see how Amazon Redshift and Matillion can help, launch a 14 day free trial of Matillion ETL for Amazon Redshift on the AWS Marketplace or schedule a demo today. If you have questions or suggestions, please comment below.


Additional Reading

If you found this post helpful, be sure to check out Top 8 Best Practices for High-Performance ETL Processing Using Amazon Redshift, and Orchestrate multiple ETL jobs using AWS Step Functions and AWS Lambda.

 


About the Author

Ian Funnell, a lead solution architect at Matillion, is a seasoned cloud and data warehousing specialist. Ian is dedicated to supporting customers in driving their data transformation forward and solving their deepest technical challenges. Ian’s has 25+ years experience in the tech industry.