Tag Archives: Analytics

Monitoring and management with Amazon QuickSight and Athena in your CI/CD pipeline

Post Syndicated from Umair Nawaz original https://aws.amazon.com/blogs/devops/monitoring-and-management-with-amazon-quicksight-and-athena-in-your-ci-cd-pipeline/

One of the many ways to monitor and manage required CI/CD metrics is to use Amazon QuickSight to build customized visualizations. Additionally, by applying Lean management to software delivery processes, organizations can improve delivery of features faster, pivot when needed, respond to compliance and security changes, and take advantage of instant feedback to improve the customer delivery experience. This blog post demonstrates how AWS resources and tools can provide monitoring and information pertaining to their CI/CD pipelines.

There are three principles in Lean management that this artifact enables and to which it contributes:

  • Limiting work in progress by establishing constraints that drive process improvement and increase throughput.
  • Creating and maintaining dashboards displaying key quality information, productivity metrics, and current status of work (including defects).
  • Using data from development performance and operations monitoring tools to enable business decisions more frequently.

Overview

The following architectural diagram shows how to use AWS services to collect metrics from a CI/CD pipeline and deliver insights through Amazon QuickSight dashboards.

Architecture diagram showing an overview of how CI/CD metrics are extracted and transformed to create a dynamic QuickSight dashboard

In this example, the orchestrator for the CI/CD pipeline is AWS CodePipeline with the entry point as an AWS CodeCommit Git repository for source control. When a developer pushes a code change into the CodeCommit repository, the change goes through a series of phases in CodePipeline. AWS CodeBuild is responsible for performing build actions and, upon successful completion of this phase, AWS CodeDeploy kicks off the actions to execute the deployment.

For each action in CodePipeline, the following series of events occurs:

  • An Amazon CloudWatch rule creates a CloudWatch event containing the action’s metadata.
  • The CloudWatch event triggers an AWS Lambda function.
  • The Lambda function extracts relevant reporting data and writes it to a CSV file in an Amazon S3 bucket.
  • Amazon Athena queries the Amazon S3 bucket and loads the query results into SPICE (an in-memory engine for Amazon QuickSight).
  • Amazon QuickSight obtains data from SPICE to build dashboard displays for the management team.

Note: This solution is for an AWS account with an existing CodePipeline(s). If you do not have a CodePipeline, no metrics will be collected.

Getting started

To get started, follow these steps:

  • Create a Lambda function and copy the following code snippet. Be sure to replace the bucket name with the one used to store your event data. This Lambda function takes the payload from a CloudWatch event and extracts the field’s pipeline, time, state, execution, stage, and action to transform into a CSV file.

Note: Athena’s performance can be improved by compressing, partitioning, or converting data into columnar formats such as Apache Parquet. In this use-case, the dataset size is negligible therefore, a transformation from CSV to Parquet is not required.


import boto3
import csv
import datetime
import os

 # Analyze payload from CloudWatch Event
 def pipeline_execution(data):
     print (data)
     # Specify data fields to deliver to S3
     row=['pipeline,time,state,execution,stage,action']
     
     if "stage" in data['detail'].keys():
         stage=data['detail']['execution']
     else:
         stage='NA'
         
     if "action" in data['detail'].keys():
         action=data['detail']['action']
     else:
         action='NA'
     row.append(data['detail']['pipeline']+','+data['time']+','+data['detail']['state']+','+data['detail']['execution']+','+stage+','+action)  
     values = '\n'.join(str(v) for v in row)
     return values

 # Upload CSV file to S3 bucket
 def upload_data_to_s3(data):
     s3=boto3.client('s3')
     runDate = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M:%S:%f")
     csv_key=runDate+'.csv'
     response = s3.put_object(
         Body=data,
         Bucket='*<example-bucket>*',
         Key=csv_key
     )

 def lambda_handler(event, context):
     upload_data_to_s3(pipeline_execution(event))
  • Create an Athena table to query the data stored in the Amazon S3 bucket. Execute the following SQL in the Athena query console and provide the bucket name that will hold the data.
CREATE EXTERNAL TABLE `devops`(
   `pipeline` string, 
   `time` string, 
   `state` string, 
   `execution` string, 
   `stage` string, 
   `action` string)
 ROW FORMAT DELIMITED 
   FIELDS TERMINATED BY ',' 
 STORED AS INPUTFORMAT 
   'org.apache.hadoop.mapred.TextInputFormat' 
 OUTPUTFORMAT 
   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
 LOCATION
   's3://**<example-bucket>**/'
 TBLPROPERTIES (
   'areColumnsQuoted'='false', 
   'classification'='csv', 
   'columnsOrdered'='true', 
   'compressionType'='none', 
   'delimiter'=',', 
   'skip.header.line.count'='1',  
   'typeOfData'='file')  
  • Create a CloudWatch event rule that passes events to the Lambda function created in Step 1. In the event rule configuration, set the Service Name as CodePipeline and, for Event Type, select All Events.

Sample Dataset view from Athena.

Sample Athena query and the results

Amazon QuickSight visuals

After the initial setup is done, you are ready to create your QuickSight dashboard. Be sure to check that the Athena permissions are properly set before creating an analysis to be published as an Amazon QuickSight dashboard.

Below are diagrams and figures from Amazon QuickSight that can be generated using the event data queried from Athena. In this example, you can see how many executions happened in the account and how many were successful.

The following screenshot shows that most pipeline executions are failing. A manager might be concerned that this points to a significant issue and prompt an investigation in which they can allocate resources to improve delivery and efficiency.

QuickSight Dashboard showing total execution successes and failures

The visual for this solution is dynamic in nature. In case the pipeline has more or fewer actions, the visual will adjust automatically to reflect all actions. After looking at the success and failure rates for each CodePipeline action in Amazon QuickSight, as shown in the following screenshot, users can take targeted actions quickly. For example, if the team sees a lot of failures due to vulnerability scanning, they can work on improving that problem area to drive value for future code releases.

QuickSight Dashboard showing the successes and failures of pipeline actions

Day-over-day visuals reflect date-specific activity and enable teams to see their progress over a period of time.

QuickSight Dashboard showing day over day results of successful CI/CD executions and failures

Amazon QuickSight offers controls that can be configured to apply filters to visuals. For example, the following screenshot demonstrates how users can toggle between visuals for different applications.

QuickSight's control function to switch between different visualization options

Cleanup (optional)

In order to avoid unintended charges, delete the following resources:

  • Amazon CloudWatch event rule
  • Lambda function
  • Amazon S3 Bucket (the location in which CSV files generated by the Lambda function are stored)
  • Athena external table
  • Amazon QuickSight data sets
  • Analysis and dashboard

Conclusion

In this blog, we showed how metrics can be derived from a CI/CD pipeline. Utilizing Amazon QuickSight to create visuals from these metrics allows teams to continuously deliver updates on the deployment process to management. The aggregation of the captured data over time allows individual developers and teams to improve their processes. That is the goal of creating a Lean DevOps process: to oversee the meta-delivery pipeline and optimize all future releases by identifying weak spots and points of risk during the entire release process.

___________________________________________________________

About the Authors

Umair Nawaz is a DevOps Engineer at Amazon Web Services in New York City. He works on building secure architectures and advises enterprises on agile software delivery. He is motivated to solve problems strategically by utilizing modern technologies.
Christopher Flores is an Engagement Manager at Amazon Web Services in New York City. He leads AWS developers, partners, and client teams in using the customer engagement accelerator framework. Christopher expedites stakeholder alignment, enterprise cohesion and risk mitigation while ensuring feedback loops to close the engagement lifecycle.
Carol Liao is a Cloud Infrastructure Architect at Amazon Web Services in New York City. She enjoys designing and developing modern IT solutions in the cloud where there is always more to learn, more problems to solve, and more to build.

 

Apache Hive is 2x faster with Hive LLAP on EMR 6.0.0

Post Syndicated from Suthan Phillips original https://aws.amazon.com/blogs/big-data/apache-hive-is-2x-faster-with-hive-llap-on-emr-6-0-0/

Customers use Apache Hive with Amazon EMR to provide SQL-based access to petabytes of data stored on Amazon S3. Amazon EMR 6.0.0 adds support for Hive LLAP, providing an average performance speedup of 2x over EMR 5.29, with up to 10x improvement on individual Hive TPC-DS queries. This post shows you how to enable Hive LLAP, and outlines the performance gains we’ve observed using queries from the TPC-DS benchmark.

Twice as fast compared to Amazon EMR 5.29.0

To evaluate the performance benefits of running Hive with Amazon EMR release 6.0.0, we’re using 70 TCP-DS queries with a 3 TB Apache Parquet dataset on a six-node c4.8xlarge EMR cluster to compare the total runtime and geometric mean with results from EMR release 5.29.0.

The results show that the TPC-DS queries run twice as fast in Amazon EMR 6.0.0 (Hive 3.1.2) compared to Amazon EMR 5.29.0 (Hive 2.3.6) with the default Amazon EMR Hive configuration.

The following graph shows performance improvements measured as total runtime for 70 TPC-DS queries. Amazon EMR 6.0.0 has the better (lower) runtime.

The following graph shows performance improvements measured as geometric mean for 70 TPC-DS queries. Amazon EMR 6.0.0 has the better (lower) geometric mean.

The following graph shows the performance improvements on a per-query basis sorted by highest performance gain. In this comparison, the higher numbers are better.

Hive Live Long and Process (LLAP)

Hive LLAP enhances the execution model of Hive, using persistent daemons with dynamic in-memory caching for low-latency queries. These daemons run on the core and task nodes in EMR clusters, caching data and metadata, and avoid the container startup overhead of traditional Hive queries because they are long lived processes.

By default, LLAP daemons do not start as part of EMR cluster start-up. You can enable LLAP in Amazon EMR 6.0.0 with the following hive configuration:

[
  {
    "Classification": "hive",
    "Properties": {
      "hive.llap.enabled": "true"
    }
  }
]

Since Hive LLAP uses persistent daemons that run on YARN, a percentage of the EMR cluster’s YARN resources will be reserved for Hive LLAP daemons when LLAP is enabled. You can override the following properties, which are predefined/calculated by EMR, using the hive configuration when launching an EMR cluster.

PropertyDescriptionDefault
hive.llap.num-instancesDefines the number of LLAP instances to run on the EMR cluster. There can be at max one LLAP instance per Node manager node (Core/Task).Number of Core/Task nodes in the cluster
hive.llap.percent-allocationDefines percentage of YARN NodeManager resources allocated to LLAP instance. This only applies to nodes running an LLAP instance defined by hive.llap.num-instances.0.6 (60%)

For example, to define 80% of YARN NodeManager resources to LLAP, use the following configuration:

 [
      {
        "classification": "hive",
        "properties": {
          "hive.llap.percent-allocation": "0.8",
          "hive.llap.enabled": "true"
        },
        "configurations": []
      }
    ]

You can override the following properties which are predefined/calculated by EMR using hive-site classification when launching an EMR cluster.

PropertyDescription
hive.llap.daemon.yarn.container.mbYARN container size for each LLAP daemon
hive.llap.daemon.memory.per.instance.mbLLAP Xmx
hive.llap.io.memory.sizeSize of LLAP IO cache in an LLAP daemon
hive.llap.daemon.num.executorsNumber of executors (tasks that can execute in parallel) per LLAP daemon

For more details on configuring Hive LLAP in Amazon EMR 6.0.0, please refer to Using Hive LLAP.

Resizing LLAP Daemons

You can modify the number of LLAP instances using the YARN CLI. Here’s a sample command:

$ yarn app -flex <Application Name> -component llap -1

  • Application Name – llap0 (Default)

You can check the status of the Hive LLAP daemons with the following command:

$ hive --service llapstatus

Hive LLAP Web Services

The Hive LLAP Monitor listens on port 15002 in each core and task node running the LLAP daemon.

The following table provides an overview of the Web Services available in LLAP.

URIDescription
http://coretask-public-dns-name:15002Shows the overview of heap, cache, executor and system metrics
http://coretask-public-dns-name:15002/confShows the current LLAP configuration
http://coretask-public-dns-name:15002/peersShows the details of LLAP nodes in the cluster extracted from the Zookeeper server
http://coretask-public-dns-name:15002/iomemShows details about the cache contents and usage
http://coretask-public-dns-name:15002/jmxShows the LLAP daemon’s JVM metrics
http://coretask-public-dns-name:15002/stacksShows JVM stack traces of all threads
http://coretask-public-dns-name:15002/conflogShows the current log levels
http://coretask-public-dns-name:15002/statusShows the status of LLAP

 Hive Performance (LLAP vs Container)

In EMR 6.0.0, Hive LLAP is optional and all Hive queries are executed using dynamically allocated containers when Hive LLAP is disabled. To illustrate the differences of running Hive queries with persistent Hive LLAP daemons versus dynamically allocated containers, we’ve used a subset of the TCP-DS benchmark. The results show an overall performance improvement of 27%, with some queries sped up by up to 4x.

The following graph shows performance improvements measured as total runtime for 50 TPC-DS queries. Amazon EMR 6.0.0 using LLAP has the better (lower) runtime.

The following graph shows performance improvements measured as geometric mean for 50 TPC-DS queries. Amazon EMR 6.0.0 using LLAP has the better (lower) runtime.

The following graph shows the performance improvements on a per-query basis sorted by highest performance gain. In this comparison, the higher numbers are better.

Summary

This post demonstrated the performance improvement of Hive on Amazon EMR 6.0.0 in comparison to the previous Amazon EMR 5.29 release. This improvement in performance helps to reduce query runtime and cost. You also learned about to use Hive LLAP with Amazon EMR 6.0.0, how to configure it, how to view the status and metrics using LLAP Monitor, and saw the performance gains when Hive LLAP is enabled. Stay tuned for additional updates on new features and further improvements in Apache Hive on Amazon EMR.

 


About the Author

Suthan Phillips is a big data architect at AWS. He works with customers to provide them architectural guidance and helps them achieve performance enhancements for complex applications on Amazon EMR. In his spare time, he enjoys hiking and exploring the Pacific Northwest.

 

 

Formula 1: Using Amazon SageMaker to Deliver Real-Time Insights to Fans

Post Syndicated from Annik Stahl original https://aws.amazon.com/blogs/architecture/formula-1-using-amazon-sagemaker-to-deliver-real-time-insights-to-fans-live/

The Formula one Group (F1) is responsible for the promotion of the FIA Formula One World Championship, a series of auto racing events in 21 countries where professional drivers race single-seat cars on custom tracks or through city courses in pursuit of the World Championship title.

Formula 1 works with AWS to enhance its race strategies, data tracking systems, and digital broadcasts through a wide variety of AWS services—including Amazon SageMaker, AWS Lambda, and AWS analytics services—to deliver new race metrics that change the way fans and teams experience racing.

In this special live segment of This is My Architecture, you’ll get a look at what’s under the hood of Formula 1’s F1 Insights. Hear about the machine learning algorithms the company trains on Amazon SageMaker and how inferences are made during races to deliver insights to fans.

For more content like this, subscribe to our YouTube channels This is My Architecture, This is My Code, and This is My Model, or visit the This is My Architecture AWS website, which has search functionality and the ability to filter by industry, language, and service.

Delve into the Forumla 1 case study to learn more about how AWS fuels analytics through machine learning.

Speeding up Etleap models at AXS with Amazon Redshift materialized views

Post Syndicated from Christian Romming original https://aws.amazon.com/blogs/big-data/speeding-up-etleap-models-at-axs-with-amazon-redshift-materialized-views/

The materialized views feature in Amazon Redshift is now generally available and has been benefiting customers and partners in preview since December 2019. One customer, AXS, is a leading ticketing, data, and marketing solutions provider for live entertainment venues in the US, UK, Europe, and Japan. Etleap, an Amazon Redshift partner, is an extract, transform, load, and transform (ETLT) service built for AWS. AXS uses Etleap to ingest data into Amazon Redshift from a variety of sources, including file servers, Amazon S3, relational databases, and applications. These ingestion pipelines parse, structure, and load data into Amazon Redshift tables with appropriate column types and sort and distribution keys.

Improving dashboard performance with Etleap models

To analyze data, AXS typically runs queries against large tables that originate from multiple sources. One of the ways that AXS uses Amazon Redshift is to power interactive dashboards. To achieve fast dashboard load times, AXS pre-computes partial answers to the queries dashboards use. These partial answers are orders of magnitude smaller in terms of the number of rows than the tables on which they are based. Dashboards can load much faster than they would if they were querying the base tables directly by querying Amazon Redshift tables that hold the pre-computed partial answers.

Etleap supports creating and managing such pre-computations through a feature called models. A model consists of a SELECT query and triggers for when it should be updated. An example of a trigger is a change to a base table, that is, a table the SELECT statement uses that defines the model. This way, the model can remain consistent with its base tables.

The following screenshot shows an Etleap model with two base table dependencies.

Etleap represents their models as tables in Amazon Redshift. To create the model table, Etleap wraps the SELECT statement in a CREATE TABLE AS (CTAS) query. When an update is triggered, for example, due to base table inserts, updates, or deletes, Etleap recomputes the model table through the following code:

CREATE TABLE model_temporary AS SELECT …
DROP TABLE model;
RENAME TABLE model_temporary TO model;

Analyzing CTAS performance as data grows

AXS manages a large number of Etleap models. For one particular model, the CTAS query takes over 6 minutes, on average. This query performs an aggregation on a join of three different tables, including an event table that is constantly ingesting new data and contains over a billion rows. The following graph shows that the CTAS query time increases as the event table increases in number of rows.

There are two key problems with the query taking longer:

  • There’s a longer delay before the updated model is available to analysts
  • The model update consumes more Amazon Redshift cluster resources

To address this, AXS would have to resort to workarounds that are either inconvenient or costly, such as archiving older data from the event table or expanding the Amazon Redshift cluster to increase available resources.

Comparing CTAS to materialized views

Etleap decided to run an experiment to verify that Amazon Redshift’s materialized views feature is an improvement over the CTAS approach for this AXS model. First, they built the materialized view by wrapping the SELECT statement in a CREATE MATERIALIZED VIEW AS query. For updates, instead of recreating the materialized view every time that data in a base table changes, a REFRESH MATERIALIZED VIEW query is sufficient. The expectation was that using materialized views would be significantly faster than the CTAS-based procedure. The following graph compares query times of CTAS to materialized view refresh.

Running REFRESH MATERIALIZED VIEW was 7.9 times faster than the CTAS approach—it took 49 seconds instead of 371 seconds on average at the current scale. Additionally, the update time was roughly proportional to the number of rows that were added to the base table since the last update, rather than the total size of the base table. In this use case, this number is 3.8 million, which corresponds to the approximate number of events ingested per day.

This is great news. The solution solves the previous problems because the delay the model update caused stays constant as new data comes in, and so do the resources that Amazon Redshift consume (assuming the growth of the base table is constant). In other words, using materialized views eliminates the need for workarounds, such as archiving or cluster expansion, as the dataset grows. It also simplifies the refresh procedure for model updates by reducing the number of SQL statements from three (CREATE, DROP, and RENAME) to one (REFRESH).

Achieving fast refresh performance with materialized views

Amazon Redshift can refresh a materialized view efficiently and incrementally. It keeps track of the last transaction in the base tables up to which the materialized view was previously refreshed. During subsequent refreshes, Amazon Redshift processes only the newly inserted, updated, or deleted tuples in the base tables, referred to as a delta, to bring the materialized view up-to-date with its base tables. In other words, Amazon Redshift can incrementally maintain the materialized view by reading only base table deltas, which leads to faster refresh times.

For AXS, Amazon Redshift analyzed their materialized view definitions, which join multiple tables, filters, and aggregates, to figure out how to incrementally maintain their specific materialized view. Each time AXS refreshes the materialized view, Amazon Redshift quickly determines if a refresh is needed, and if so, incrementally maintains the materialized view. As records are ingested into the base table, the materialized view refresh times shown are much faster and grow very slowly because each refresh reads a delta that is small and roughly the same size as the other deltas. In comparison, the refresh times using CTAS are much slower because each refresh reads all the base tables. Moreover, the refresh times using CTAS grow much faster because the amount of data that each refresh reads grows with the ingest rate.

You are in full control of when to refresh your materialized views. For example, AXS refreshes their materialized views based on triggers defined in Etleap. As a result, transactions that are run on base tables do not incur additional cost to maintain dependent materialized views. Decoupling the base tables’ updates from the materialized view’s refresh gives AXS an easy way to insulate their dashboard users and offers them a well-defined snapshot to query, while ingesting new data into base tables. When AXS vets the next batch of base table data via their ETL pipelines, they can refresh their materialized views to offer the next snapshot of dashboard results.

In addition to efficiently maintaining their materialized views, AXS also benefits from the simplicity of Amazon Redshift storing each materialized view as a plain table. Queries on the materialized view perform with the same world-class speed that Amazon Redshift runs any query. You can organize a materialized view like other tables, which means that you can exploit distribution key and sort columns to further improve query performance. Finally, when you need to process many queries at peak times, Amazon Redshift’s concurrency scaling kicks in automatically to elastically scale query processing capacity.

Conclusion

Now that the materialized views feature is generally available, Etleap gives you the option of using materialized views rather than tables when creating models. You can use models more actively as part of your ETLT strategies, and also choose more frequent update schedules for your models, due to the performance benefits of incremental refreshes.

For more information about Amazon Redshift materialized views, see Materialize your Amazon Redshift Views to Speed Up Query Execution and Creating Materialized Views in Amazon Redshift.

 


About the Author

Christian Romming is the founder and CEO of Etleap.  Etleap is a managed ETL solution for AWS that doesn’t require extensive engineering work to set up, maintain, and scale.

 

 

 

 

Prasad Varakur is a Database, Big Data & Distributed Systems enthusiast, and Product Manager at Amazon Web Services. Prior to this, he has developed Database and Storage engines at SAP/Sybase, Couchbase, Huawei, Novell, EMC, and Veritas. He holds 11 patents in database systems and distributed computing, and his thesis has contributed foundational works of Parametric Query Optimization. He holds Master’s degree in Computer Science from IIT, Kanpur.

 

Vuk Ercegovac is a principal engineer for Redshift at AWS.

 

 

 

AWS Architecture Monthly Magazine: Data Lakes

Post Syndicated from Annik Stahl original https://aws.amazon.com/blogs/architecture/aws-architecture-monthly-magazine-data-lakes/

A data lake is the fastest way to get answers from all your data to all your users. It’s a centralized repository that allows you to store all your structured and unstructured data at any scale. You can store your data as-is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning—to guide better decisions.

In This Issue

In this month’s Architecture Monthly, we speak to AWS Analytics Tech Leader, Taz Sayed, about general architecture trends in data lakes, the questions customers need to ask themselves before considering a data lake, and we get his outlook on the role the cloud will play in future development efforts.

We also introduce you to two companies that are utilizing data lakes for deep analytics, point you to an AWS managed solution, provide some real-world videos, and more.

  • Ask an Expert: Taz Sayed, Tech Leader, AWS Analytics
  • Blog: Kayo Sports builds real-time view of the customer on AWS
  • Case Study: Yulu Uses a Data Lake on AWS to Pedal a Change
  • Solution: Data Lake on AWS
  • Managed Solution: AWS Lake Formation
  • Whitepaper: Building Big Data Storage Solutions (Data Lakes) for Maximum Flexibility

How to Access the Magazine

We hope you’re enjoying Architecture Monthly, and we’d like to hear from you—leave us star rating and comment on the Amazon Kindle Newsstand page or contact us anytime at [email protected].

Ingest Excel data automatically into Amazon QuickSight

Post Syndicated from Ying Wang original https://aws.amazon.com/blogs/big-data/ingest-excel-data-automatically-into-amazon-quicksight/

Amazon QuickSight is a fast, cloud-powered, business intelligence (BI) service that makes it easy to deliver insights to everyone in your organization. This post demonstrates how to build a serverless data ingestion pipeline to automatically import frequently changed data into a SPICE (Super-fast, Parallel, In-memory Calculation Engine) dataset of Amazon QuickSight dashboards.

It is sometimes quite difficult to be agile in BI development. For example, end-users that perform self-service analytics may want to add their additional ad hoc data into an existing dataset and have a view of the corresponding updated dashboards and reports in a timely fashion. However, dashboards and reports are usually built on top of a single online analytic processing (OLAP) data warehouse with a rigid schema. Therefore, an end-user (who doesn’t have permission to update the dataset directly) has to go through a complicated and time-consuming procedure to have their data updated in the warehouse. Alternatively, they could open a ticket for you to edit the dataset manually, but it is still a very inconvenient solution that involves a significant amount of repetitive manual effort, especially if they frequently need to update the data.

Therefore, an automated data processing tool that can perform real-time data ingestion is very useful. This post discusses a tool that, when an end-user uploads Excel files into Amazon S3 or any other data file sharing location, performs the following end-to-end process:

  • Cleans the raw data from the Excel files, which might contain a lot of formatting and redundant information.
  • Ingests the cleaned data.
  • Performs a status check to monitor the data cleaning and ingestion process.
  • Sends a notification of the results to the end-user and BI development team.

With the recently launched feature cross data source joins, you can join across all data sources that Amazon QuickSight supports, including file-to-file, file-to-database, and database-to-database joins. For more information, see Joining across data sources on Amazon QuickSight.

In addition to cross data source joins, Amazon QuickSight has also launched new APIs for SPICE ingestion. For more information, see Importing Data into SPICE and Evolve your analytics with Amazon QuickSight’s new APIs and theming capabilities.

This post shows how you can combine these features to build an agile solution that cleans and ingests an Excel file into a SPICE dataset of Amazon QuickSight automatically. In SPICE, the real-time data from Excel joins with the Amazon Redshift OLAP data warehouse, and end-users receive Amazon SNS messages about its status throughout the process.

Solution overview

The following diagram illustrates the workflow of the solution.

The workflow includes the following steps:

  1. An end-user uploads an Excel file into an on-premises shared folder.
  2. The Excel files upload to the Amazon S3 bucket excel-raw-data.Alternatively, the end-user can skip this step and upload the Excel file into this Amazon S3 bucket directly.
  3. This upload event triggers the SNS message Excel-Raw-Data-Uploaded.
  4. Both the end-user and the BI team receive a message about the new upload event.
  5. The upload event also triggers the AWS Lambda function DataClean to process the Excel data.
  6. The Lambda function removes the formatting and redundant information of the Excel file, saves the cleaned data as a CSV file into the S3 bucket autoingestionqs, and publishes an SNS message to notify end-users about the data cleansing status.
  7. This cleaned CSV file is mapped as an Amazon Athena table.
  8. In the Amazon QuickSight SPICE dataset, this table joins with the Amazon Redshift table through the cross data source join functionality.
  9. The CSV file creation event in the S3 bucket autoingestionqs triggers the Lambda function qsAutoIngestion.
  10. This function calls the data ingestion API of Amazon QuickSight and checks the data ingestion status.
  11. When the data ingestion is complete, end-users receive the Ingestion-Finished SNS message.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Creating resources

Create your resources by launching the following AWS CloudFormation stack:

During the stack creation process, you have to provide a valid email address as the endpoint of Amazon SNS services. After the stack creation is successful, you have three SNS topics, two S3 buckets, and the corresponding IAM policies.

Walkthrough

To implement this solution, complete the following steps:

  1. Enable SNS notification of new object creation event in S3 bucket excel-raw-data. For more information, see How Do I Enable and Configure Event Notifications for an S3 Bucket? When an end-user uploads an Excel file into the excel-raw-data S3 bucket, the event triggers an Amazon SNS message.The following screenshot shows the example Excel file that this post uses.The following screenshot shows the SNS message Excel-Raw-Data-Upload, which includes details of the upload event.
  2. Download the sample code DataClean.py in Python 3.7 from the GitHub repo.
  3. Create a Lambda function named DataClean.
  4. Configure the function to be a subscriber of the SNS topic Excel-Raw-Data-Uploaded.
  5. Edit the SNS topic Cleaning-is-Done, and add the following code to the access policy:
    "Sid": "example-statement-ID",
          "Effect": "Allow",
          "Principal": {
            "AWS": "*"
          },
          "Action": "SNS:Publish",
          "Resource": " arn:aws:sns:us-east-1:AWS Account ID: SNS Topic Name",
          "Condition": {
            "ArnLike": {
              "aws:SourceArn": "arn:aws:lambda:us-east-1:AWSAccountID:function:DataClean"
            }
          }

    The policy allows the Lambda function DataClean to trigger the SNS message Cleaning-is-Done.

    The function DataClean saves the CSV file of the cleaned data into the S3 bucket autoingestionqs. You should see the new CSV file in this bucket. See the following screenshot.

    When the Lambda function ends, it triggers the SNS message Cleaning-is-Done. The following screenshot shows the text of the notification message.

  6. Add an event notification into the S3 bucket autoingestionqs to trigger a Lambda function named qsAutoIngestion.This function calls the Amazon QuickSight data API to ingest data into the SPICE dataset.The cleaned CSV file in the S3 bucket autoingestionqs is mapped as an Athena table. The following screenshot shows the sample data in the CSV file.

    In the Amazon QuickSight SPICE dataset, the Athena table joins with the Amazon Redshift table through the cross data source join functionality.

  7. Create the SPICE dataset. For more information, see Joining across data sources on Amazon QuickSight.The following screenshot shows the Data page in Amazon QuickSight where your data set details appear. The Athena table joins a Redshift table.The new object creation event in the Amazon S3 bucket autoingestionqs triggers another Lambda function named qsAutoIngestion. This function calls the data ingestion API of Amazon QuickSight and checks the data ingestion status. If the data ingestion is completed successfully, end-users receive the SNS message Ingestion-Finished. You can download the sample code of qsAutoIngestion from the GitHub repo.

Cleaning up

To avoid incurring future charges, delete the resources you created: the two Lambda functions, three SNS topics, two S3 buckets, and corresponding IAM policies.

Conclusion

This post discussed how BI developers and architects can use data API, Lambda functions, and other AWS services to complete an end-to-end automation process. End-users can have their real-time data ingested and joined with OLAP data warehouse tables and visualize their data in a timely fashion without the need to wait for nightly or hourly ETL or the need to understand the complex technical development steps. You should now be fully equipped to construct a solution in a development environment and demo it to non-technical business end-users.

 


About the Author

Ying Wang is a Data Visualization Engineer with the Data & Analytics Global Specialty Practice in AWS Professional Services.

Announcing Network Analytics

Post Syndicated from Omer Yoachimik original https://blog.cloudflare.com/announcing-network-analytics/

Our Analytics Platform

Announcing Network Analytics

Back in March 2019, we released Firewall Analytics which provides insights into HTTP security events across all of Cloudflare’s protection suite; Firewall rule matches, HTTP DDoS Attacks, Site Security Level which harnesses Cloudflare’s threat intelligence, and more. It helps customers tailor their security configurations more effectively. The initial release was for Enterprise customers, however we believe that everyone should have access to powerful tools, not just large enterprises, and so in December 2019 we extended those same enterprise-level analytics to our Business and Pro customers.

Announcing Network Analytics
Source: https://imgflip.com/memegenerator

Since then, we’ve built on top of our analytics platform; improved the usability, added more functionality and extended it to additional Cloudflare services in the form of Account Analytics, DNS Analytics, Load Balancing Analytics, Monitoring Analytics and more.

Our entire analytics platform harnesses the powerful GraphQL framework which is also available to customers that want to build, export and share their own custom reports and dashboards.

Extending Visibility From L7 To L3

Until recently, all of our dashboards were mostly HTTP-oriented and provided visibility into HTTP attributes such as the user agent, hosts, cached resources, etc. This is valuable to customers that use Cloudflare to protect and accelerate HTTP applications, mobile apps, or similar. We’re able to provide them visibility into the application layer (Layer 7 in the OSI model) because we proxy their traffic at L7.

Announcing Network Analytics
DDoS Protection for Layer 3-7

However with Magic Transit, we don’t proxy traffic at L7 but rather route it at L3 (network layer). Using BGP Anycast, customer traffic is routed to the closest point of presence of Cloudflare’s network edge where it is filtered by customer-defined network firewall rules and automatic DDoS mitigation systems. Clean traffic is then routed via dynamic GRE Anycast tunnels to the customer’s data-centers. Routing at L3 means that we have limited visibility into the higher layers. So in order to provide Magic Transit customers visibility into traffic and attacks, we needed to extend our analytics platform to the packet-layer.

Announcing Network Analytics
Magic Transit Traffic Flow

On January 16, 2020, we released the Network Analytics dashboard for Magic Transit customers and Bring Your Own IP (BYOIP) customers. This packet and bit oriented dashboard provides near real-time visibility into network- and transport-layer traffic patterns and DDoS attacks that are blocked at the Cloudflare edge in over 200 cities around the world.

Announcing Network Analytics
Network Analytics – Packets & Bits Over Time

Analytics For A Year

The way we’ve architected the analytics data-stores enables us to provide our customers one year’s worth of insights. Traffic is sampled at the edge data-centers. From those samples we structure IP flow logs which are similar to SFlow and bucket one minute’s worth of traffic, grouped by destination IP, port and protocol. IP flows includes multiple packet attributes such as TCP flags, source IPs and ports, Cloudflare data-center, etc. The source IP is considered PII data and is therefore only stored for 30 days, after which the source IPs are discarded and logs are rolled up into one hour groups, and then one day groups. The one hour roll-ups are stored for 6 months and the one day roll-ups for 1 year.

Similarly, attack logs are also stored efficiently. Attacks are stored as summaries with start/end timestamps, min/max/average/total bits and packets per second, attack type, action taken and more. A DDoS attack could easily consist of billions of packets which could impact performance due to the number of read/write calls to the data-store. By storing attacks as summary logs, we’re able to overcome these challenges and therefore provide attack logs for up to 1 year back.

Network Analytics via GraphQL API

We built this dashboard on the same analytics platform, meaning that our packet-level analytics are also available by GraphQL. As an example, below is an attack report query that would show the top attacker IPs, the data-center cities and countries where the attack was observed, the IP version distribution, the ASNs that were used by the attackers and the ports. The query is done at the account level, meaning it would provide a report for all of your IP ranges. In order to narrow the report down to a specific destination IP or port range, you can simply add additional filters. The same filters also exist in the UI.

{
  viewer {
    accounts(filter: { accountTag: $accountTag }) {
      topNPorts: ipFlows1mGroups(
        limit: 5
        filter: $portFilter
        orderBy: [sum_packets_DESC]
      ) {
        sum {
          count: packets
          __typename
        }
        dimensions {
          metric: sourcePort
          ipProtocol
          __typename
        }
        __typename
      }
      topNASN: ipFlows1mGroups(
        limit: 5
        filter: $filter
        orderBy: [sum_packets_DESC]
      ) {
        sum {
          count: packets
          __typename
        }
        dimensions {
          metric: sourceIPAsn
          description: sourceIPASNDescription
          __typename
        }
        __typename
      }
      topNIPs: ipFlows1mGroups(
        limit: 5
        filter: $filter
        orderBy: [sum_packets_DESC]
      ) {
        sum {
          count: packets
          __typename
        }
        dimensions {
          metric: sourceIP
          __typename
        }
        __typename
      }
      topNColos: ipFlows1mGroups(
        limit: 10
        filter: $filter
        orderBy: [sum_packets_DESC]
      ) {
        sum {
          count: packets
          __typename
        }
        dimensions {
          metric: coloCity
          coloCode
          __typename
        }
        __typename
      }
      topNCountries: ipFlows1mGroups(
        limit: 10
        filter: $filter
        orderBy: [sum_packets_DESC]
      ) {
        sum {
          count: packets
          __typename
        }
        dimensions {
          metric: coloCountry
          __typename
        }
        __typename
      }
      topNIPVersions: ipFlows1mGroups(
        limit: 2
        filter: $filter
        orderBy: [sum_packets_DESC]
      ) {
        sum {
          count: packets
          __typename
        }
        dimensions {
          metric: ipVersion
          __typename
        }
        __typename
      }
      __typename
    }
    __typename
  }
}
Attack Report Query Example

After running the query using Altair GraphQL Client, the response is returned in a JSON format:

Announcing Network Analytics

What Do Customers Want?

As part of our product definition and design research stages, we interviewed internal customer-facing teams including Customer Support, Solution Engineering and more. I consider these stakeholders as super-user-aggregators because they’re customer-facing teams and are constantly engaging and helping our users. After the internal research phase, we expanded externally to customers and prospects; particularly network and security engineers and leaders. We wanted to know how they expect the dashboard to fit in their work-flow, what are their use cases and how we can tailor the dashboard to their needs. Long story short, we identified two main use cases: Incident Response and Reporting. Let’s go into each of these use cases in more detail.

Incident Response

I started off by asking them a simple question – “what do you do when you’re paged?” We wanted to better understand their incident response process; specifically, how they’d expect to use this dashboard when responding to an incident and what are the metrics that matter to them to help them make quick calculated decisions.

You’ve Just Been Paged

Let’s say that you’re a security operations engineer. It’s Black Friday. You’re on call. You’ve just been paged. Traffic levels to one of your data-centers has exceeded a safe threshold. Boom. What do you do? Responding quickly and resolving the issue as soon as possible is key.

If your workflows are similar to our customers’, then your objective is to resolve the page as soon as possible. However, before you can resolve it, you need to determine if there is any action that you need to take. For instance, is this a legitimate rise in traffic from excited Black Friday shoppers, perhaps a new game release or maybe an attack that hasn’t been mitigated? Do you need to shift traffic to another data-center or are levels still stable? Our customers tell us that these are the metrics that matter the most:

  1. Top Destination IP and port – helps understand what services are being impacted
  2. Top source IPs, port, ASN, data-center and data-center Country – helps identify the source of the traffic
  3. Real-time packet and bit rates – helps understand the traffic levels
  4. Protocol distribution – helps understand what type of traffic is abnormal
  5. TCP flag distribution – an abnormal distribution could indicate an attack
  6. Attack Log – shows what types of traffic is being dropped/rate-limited
Announcing Network Analytics
Customizable DDoS Attack Log

As network and transport layer attacks can be highly distributed and the packet attributes can be easily spoofed, it’s usually not practical to block IPs. Instead, the dashboard enables you to quickly identify patterns such as an increased frequency of a specific TCP flag or increased traffic from a specific country. Identifying these patterns brings you one step closer to resolving the issue. After you’ve identified the patterns, packet-level filtering can be applied to drop or rate-limit the malicious traffic. If the attack was automatically mitigated by Cloudflare’s systems, you’ll be able to see it immediately along with the attack attributes in the activity log. By filtering by the Attack ID, the entire dashboard becomes your attack report.

Announcing Network Analytics
Packet/Bit Distribution by Source & Destination
Announcing Network Analytics
TCP Flag Distribution

Reporting

During our interviews with security and network engineers, we also asked them what metrics and insights they need when creating reports for their managers, C-levels, colleagues and providing evidence to law-enforcement agencies. After all, processing data and creating reports can consume over a third (36%) of a security team’s time (~3 hours a day) and is also one of the most frequent DDoS asks by our customers.

Announcing Network Analytics
Add filters, select-time range, print and share

On top of all of these customizable insights, we wanted to also provide a one-line summary that would reflect your recent activity. The one-liner is dynamic and changes based on your activity. It tells you whether you’re currently under attack, and how many attacks were blocked. If your CISO is asking for a security update, you can simply copy-paste it and convey the efficiency of the service:

Announcing Network Analytics
Dynamic Summary

Our customers say that they want to reflect the value of Cloudflare to their managers and peers:

  1. How much potential downtime and bandwidth did Cloudflare spare me?
  2. What are my top attacked IPs and ports?
  3. Where are the attacks coming from? What types and what are the trends?

The Secret To Creating Good Reports

What does everyone love? Cool Maps! The key to a good report is adding a map; showing where the attack came from. But given that packet attributes can be easily spoofed, including the source IP, it won’t do us any good to plot a map based on the locations of the source IPs. It would result in a spoofed source country and is therefore useless data. Instead, we decided to show the geographic distribution of packets and bits based on the Cloudflare data-center in which they were ingested. As opposed to legacy scrubbing center solutions with limited network infrastructures, Cloudflare has data-centers in more than 200 cities around the world. This enables us to provide precise geographic distribution with high confidence, making your reports accurate.

Announcing Network Analytics
Packet/Bit Distribution by geography: Data-center City & Country

Tailored For You

One of the main challenges both our customers and we struggle with is how to process and generate actionable insights from all of the data points. This is especially critical when responding to an incident. Under this assumption, we built this dashboard with the purpose of speeding up your reporting and investigation processes. By tailoring it to your needs, we hope to make you more efficient and make the most out of Cloudflare’s services. Got any feedback or questions? Post them below in the comments section.

If you’re an existing Magic Transit or BYOIP customer, then the dashboard is already available to you. Not a customer yet? Click here to learn more.

Lower your costs with the new pause and resume actions on Amazon Redshift

Post Syndicated from Sain Das original https://aws.amazon.com/blogs/big-data/lower-your-costs-with-the-new-pause-and-resume-actions-on-amazon-redshift/

Today’s analytics workloads typically require a data warehouse to be available 24 hours a day, 7 days a week. However, there may be times when you need an Amazon Redshift cluster for a short duration of time at frequent (or infrequent) intervals. For example, you may run a periodic ETL job or use a cluster for testing and development and not use it during off-hours or weekends. In these cases, you may want an easy way to keep the data warehouse up and running only part of the time. Previously, you could accomplish this by making a backup, terminating the cluster, and restoring the cluster from the snapshot. The pause and resume actions on Amazon Redshift are a much simpler way to suspend billing and are designed to use if your Amazon Redshift cluster is out of operation for hours at a time, and especially if that time is on a regularly scheduled basis.

Pausing a cluster suspends compute and retains the underlying data structures and data so you can resume the cluster at a later point in time. You can configure this through the Amazon Redshift console or the use of Amazon Redshift CLIs.

When the cluster is paused, the data warehouse’s storage incurs charges. On-demand compute billing is suspended and resumed on a per-second basis. Paused clusters still appear as an entry in the console. You can also automate the pause and resume actions by using a schedule that matches your operational needs.

Using the actions via the Amazon Redshift console

To use the pause and resume actions on the Amazon Redshift console, complete the following steps:

  1. On the Amazon Redshift console, choose Clusters.
  2. Choose your desired cluster.
  3. Choose Actions.
  4. Choose Pause.
  5. To determine when to pause the cluster, choose from the following three options:
    • To pause the cluster immediately, select Pause now.
    • To pause the cluster at a later point, select Pause later.
    • To pause and resume the cluster according to a set schedule, select Pause and resume on schedule.
  6. For this walkthrough, select Pause now.
  7. Choose Pause now.

The cluster is now in Modifying status. It can take up to a few minutes for the cluster to change to a Paused state, but the cost accrual for compute resources is suspended immediately.

The following screenshot shows a view of the cluster status.

Amazon Redshift processes any outstanding queries before it pauses the cluster. When the cluster is paused, you can still view it on the Amazon Redshift console, and the Resume action is available.

To resume the cluster, complete the following steps:

  1. On the Amazon Redshift console, choose Clusters.
  2. Choose your desired cluster.
  3. Choose Actions.
  4. Choose Resume.
  5. Choose when to resume the cluster. The options are the same as those for the pause action.
  6. For this walkthrough, select Resume now.
  7. Choose Resume now.

The cluster moves to Modifying status. Depending upon the size of the cluster, it can take several minutes to resume a cluster before queries can be processed. Billing only resumes when the cluster is available.

The following screenshot shows the view of the cluster status.

Using the actions via CLI

The following two commands pause and resume the cluster:

  • Pause-cluster
  • Resume-cluster

To pause a given cluster, enter the following code:

aws redshift pause-cluster --cluster identifier <insert cluster identifier here>

To resume a paused cluster, enter the following code:

aws redshift resume-cluster --cluster identifier <insert cluster identifier here>

Scheduling pause and resume actions

You can schedule to pause and resume a cluster at specific times of the day and week. For example, this walkthrough pauses a cluster on Friday 8:00 p.m. and resumes it on Monday 7:00 a.m. You can configure this via the Amazon Redshift console or APIs.

Scheduling via the Amazon Redshift console

To schedule to pause and resume a cluster on the Amazon Redshift console, complete the following steps:

  1. On the Amazon Redshift console, choose Clusters.
  2. Choose your desired cluster.
  3. Choose Actions.
  4. Choose Pause.
  5. Select Pause and resume on schedule.
  6. For Schedule name, enter a name for this schedule.
  7. Optionally, for Starts on and Ends on, enter the dates and times to start and end the schedule.
  8. For Pause every and Resume every, choose the time and day to pause and resume.
  9. Choose Schedule recurring pause and resume.

You can review existing pause and resume schedules on the Schedules tab. See the following screenshot.

Scheduling via CLI

The following CLI commands allow you to create, modify, and delete scheduled pause and resume tasks.

To create a scheduled action to occur one time, enter the following code:

aws redshift create-scheduled-action --scheduled-action-name test-resume --schedule "at(2020-02-21T02:00:00)" --target-action "{\"ResumeCluster\":{\"ClusterIdentifier\":\"redshift-cluster-1\"}}" --iam-role arn:aws:iam::<Account ID>:role/<Redshift Role>

To create a recurring scheduled action, enter the following code:

aws redshift create-scheduled-action --scheduled-action-name "scheduled-pause-repetitive" --target-action "{\"PauseCluster\":{\"ClusterIdentifier\":\"redshift-cluster-1\"}}" --schedule "cron(30 20 * * ? *)" --iam-role "arn:aws:iam::<Account ID>:role/<Redshift Role>"

The preceding code example pauses a cluster daily at 10:30 p.m.

To modify an existing scheduled action, enter the following code:

aws redshift modify-scheduled-action --scheduled-action-name "scheduled-pause-repetitive" --schedule "cron(30 * * * ? *)"

The preceding code example modifies the scheduled-pause-repetitive schedule to run every hour at 30 minutes past the hour.

To delete a scheduled action, enter the following code:

aws redshift delete-scheduled-action --scheduled-action-name "scheduled-pause-repetitive"

Summary

The pause and resume actions on Amazon Redshift allow you to easily pause and resume clusters that may not be in operation at all times. It allows you to create a regularly-scheduled time to initiate the pause and resume actions at specific times or you can manually initiate a pause and later a resume. Flexible on-demand pricing and per-second billing gives you greater control of costs of your Redshift compute clusters while maintaining your data in a way that is simple to manage. You can run your data warehouse at the lowest cost possible without having to purchase a fixed amount of resources up front.

 


About the Author

Sain Das is a data warehouse specialist solutions architect with AWS.

 

 

 

Tune Hadoop and Spark performance with Dr. Elephant and Sparklens on Amazon EMR

Post Syndicated from Nivas Shankar original https://aws.amazon.com/blogs/big-data/tune-hadoop-and-spark-performance-with-dr-elephant-and-sparklens-on-amazon-emr/

Data engineers and ETL developers often spend a significant amount of time running and tuning Apache Spark jobs with different parameters to evaluate performance, which can be challenging and time-consuming. Dr. Elephant and Sparklens help you tune your Spark and Hive applications by monitoring your workloads and providing suggested changes to optimize performance parameters, like required Executor nodes, Core nodes, Driver Memory and Hive (Tez or MapReduce) jobs on Mapper, Reducer, Memory, Data Skew configurations. Dr. Elephant gathers job metrics, runs analysis on them, and presents optimization recommendations in a simple way for easy consumption and corrective actions. Similarly, Sparklens makes it easy to understand the scalability limits of Spark applications and compute resources, and runs efficiently with well-defined methods instead of leaning by trial and error, which saves both developer and compute time.

This post demonstrates how to install Dr. Elephant and Sparklens on an Amazon EMR cluster and run workloads to demonstrate these tools’ capabilities. Amazon EMR is a managed Hadoop service offered by AWS to easily and cost-effectively run Hadoop and other open-source frameworks on AWS.

The following diagram illustrates the solution architecture. Data engineers and ETL developers can submit jobs to Amazon EMR cluster and based on Dr. Elephant and Sparklens tools recommendations , they can optimize their Spark applications and compute resources for better performance and efficiency.

Prerequisite Steps

Creating a new EMR cluster

To configure an EMR cluster with Dr. Elephant or Sparklens, launch an EMR cluster with your desired capacity. This post uses the 10 core nodes of r4.xlarge instances and one master node of r4.4xlarge with the default settings.

You can launch the cluster via the AWS Management Console, an AWS CloudFormation template, or AWS CLI commands. Use the following CloudFormation Stack:

The Following screenshot shows EMR cluster Summary launched from CloudFormation stack.

Enabling Dr. Elephant or Sparklens

If you already have a persistent cluster running, add these steps to enable Dr. Elephant or Sparklens. See the following code:

aws emr add-steps --cluster-id j-XXXXXXXX --steps '[{"Args":["s3://aws-bigdata-blog/artifacts/aws-blog-hadoop-spark-performance-tuning/install-dr-elephant-emr5.sh"],"Type":"CUSTOM_JAR","ActionOnFailure":"CONTINUE","Jar":" s3://elasticmapreduce/libs/script-runner/script-runner.jar","Properties":"","Name":"Install Dr. Elephant and Sparklens"}

Validating access to the Dr. Elephant portal and Sparklens settings

To access Dr. Elephant, direct your browser to the master nodes address on port 8087:

https://<< DNS Name.compute-1.amazonaws.com>>:8087

Note: You need to set up an SSH tunnel to the master node using dynamic or local port forwarding.

The following screenshot shows Dr. Elephant dashboard listed with latest analysis of jobs submitted in EMR cluster.

To validate Sparklens, you need to SSH to the master node. For more information, see Connect to the Master Node Using SSH.

On the console, run the following command:

cd /etc/spark/conf/

Launch PySpark and check that settings are enabled. See the following code:

[[email protected]]$ pyspark

You should see the line qubole#sparklens added as a dependency in the code. The following screenshot shows Sparklens enabled on EMR cluster:

Sparklens – Testing Spark workloads

You can now test Spark workloads in an EMR cluster and observe them through the Sparklens logs.

This post tests a data generator example of 100 billion records using PySpark code and observes how Sparklens helps to optimize and provide recommendations to fine-tune the processing. Complete the following steps:

  1. Copy the code in the EMR cluster.
  2. Navigate to the /home/hadoop/
  3. Enter the following code via test-spark.py:
    from pyspark.sql.functions import rand, randn
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext, SQLContext
    sc = SparkContext.getOrCreate()
    sqlContext = SQLContext(sc)
    
    df = sqlContext.range(0, 100000000000)
    df.count()
    df2 = df.select("id", rand(seed=1000).alias("uniform"), randn(seed=555).alias("normal"))
    row1 = df2.agg({"id": "max"}).collect()[0]
    print row1["max(id)"]
    df2.createOrReplaceTempView("df2")
    df_part1 = spark.sql("select * from df2 where id between 1 and 999999 order by id desc")
    row2 = df_part1.agg({"id": "max"}).collect()[0]
    print row2["max(id)"]
    df_part1.write.format("parquet").mode("overwrite").save("/home/hadoop/data/output1/")
    df_part2 = spark.sql("select * from df2 where id > 10000000 order by id desc")
    row2 = df_part2.agg({"id": "max"}).collect()[0]
    print row2["max(id)"]
    df_part2.write.format("parquet").mode("overwrite").save("/home/hadoop/data/output2/")

  4. Run spark-submit test-spark.py.

The following screenshot shows Sparklens job submission:

The following screenshot shows Sparklens collected application tasks metrics:

 

The following screenshot shows duration of jobs timeline metrics:

The following screenshot shows Sparklens recommendation on minimum possible time execution for the application jobs submitted:

Sparklens suggests the minimum possible runtime for the app resource is 23 seconds, compared to the default settings with an execution time of 50 seconds. Compute hours wasted are 76.15%, and only 30.98% of compute hours used.

You can reduce the executor count and executor core in the spark-submit job and see how it changes the outcome.

Enter the following code:

spark-submit --num-executors 1 --executor-cores 1 test-spark.py

The following screenshot shows Sparklens job application metrics after tuning the job:

The job completion time is reduced to 45 seconds, and only one executor node and one core is sufficient to run the job. It helps identify specific stages (like driver, skew, or lack of tasks) that are limiting Spark application and provides contextual information about what could be going wrong with these stages.

This post tests the preceding estimating Pi example with three natively supported applications—Scala, Java, and Python. For more information, see Write a Spark Application. To run the test, complete the following steps:

  1. Enter the following code:
    import sys
    from random import random
    from operator import add
    
    from pyspark import SparkContext
    
    if __name__ == "__main__":
        """
            Usage: pi [partitions]
        """
        sc = SparkContext(appName="PythonPi")
        partitions = int(sys.argv[1]) if len(sys.argv) > 1 else 2
        n = 100000 * partitions
    
        def f(_):
            x = random() * 2 - 1
            y = random() * 2 - 1
            return 1 if x ** 2 + y ** 2 < 1 else 0
    
        count = sc.parallelize(xrange(1, n + 1), partitions).map(f).reduce(add)
        print "Pi is roughly %f" % (4.0 * count / n)
    
        sc.stop()

  2. Copy the Spark code example into a local directory.
  3. Run the code as a spark-submit See the following code:spark-submit test-spark2.py

The following screenshot shows Sparklens job submission:

The following screenshot shows Sparklens collected application tasks metrics:

The following screenshot shows Sparklens recommendation on minimum possible time execution for the application jobs submitted:

The following screenshot shows Sparklens metrics on cluster compute utilization:

The following screenshot shows Sparklens recommendation on cluster utilization with core compute nodes and estimated utilization:

In this test, Sparklens suggests recommendations and minimum possible runtime for the app resource is 10 seconds, compared to the default settings with an execution time of 14 seconds. Compute hours wasted are 87.13% and only 12.87% of compute hours used.

In a single run of a Spark application, Sparklens can estimate how your application performs given any arbitrary number of executors. This helps you understand the ROI of adding executors. Internally, Sparklens has the concept of an analyzer, which is a generic component for emitting interesting events. For more information about analyzers, see the GitHub repo.

You can reduce the executor count and executor core in a spark-submit job and see how it changes the outcome. See the following code:

spark-submit --num-executors 2 --executor-cores 2 test-spark2.py

The following screenshot shows Sparklens job submission:

The following screenshot shows Sparklens job application metrics after tuning the jobs:

The job completion time is now reduced to 12 seconds and the job needed only one executor node and core sufficient to process the job.

Dr. Elephant Testing the Hive/MapReduce workloads

You can test scenarios in an EMR cluster and observe them through the Dr. Elephant portal.

Testing Hive load and performance analysis

You can load example datasets through the Hive CLI console and see how the workload performs and what performance optimizations it suggests.

This post demonstrates how to analyze Elastic Load Balancer access logs stored in Amazon S3 using Hive. Complete the following steps:

  1. On the Hive CLI, enter the following code:
    CREATE EXTERNAL TABLE IF NOT EXISTS elb_logs (
    Ts STRING,
    ELBName STRING,
    RequestIP STRING,
    RequestPort INT,
    BackendIP STRING,
    BackendPort INT,
    RequestProcessingTime DOUBLE,
    BackendProcessingTime DOUBLE,
    ClientResponseTime DOUBLE,
    ELBResponseCode STRING,
    BackendResponseCode STRING,
    ReceivedBytes BIGINT,
    SentBytes BIGINT,
    RequestVerb STRING,
    URL STRING,
    Protocol STRING
    )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
    WITH SERDEPROPERTIES (
    "input.regex" = "([^ ]*) ([^ ]*) ([^ ]*):([0-9]*) ([^ ]*):([0-9]*) ([.0-9]*) ([.0-9]*) ([.0-9]*) (-|[0-9]*) (-|[0-9]*) ([-0-9]*) ([-0-9]*) \"([^ ]*) ([^ ]*) (- |[^ ]*)\"$"
    ) LOCATION 's3://us-east-1.elasticmapreduce.samples/elb-access-logs/data/';

    The following screenshot shows external table creation through hive:

  2. Run a simple count query and check Dr. Elephant’s recommendations. See the following code:
    SELECT RequestIP, COUNT(RequestIP) FROM elb_logs WHERE BackendResponseCode<>200 GROUP BY RequestIP;

  3. Launch the Dr. Elephant Portal. The following screenshot shows the Dr. Elephant output. The Hive job needs some tuning for Tez mapper memory.
  4. Click on Tez mapper memory section from the application metrics highlighted section.The following screenshot shows that Dr. Elephant recommendation on over-allocated Tez mapper memory:
  5. Run the Hive query with reduced mapper memory. See the following code:
    set hive.tez.container.size=1024;SELECT RequestIP, COUNT(RequestIP) FROM elb_logs WHERE BackendResponseCode<>200 GROUP BY RequestIP

The following screenshot shows application metrics for the submitted jobs:

The following screenshot shows that Dr. Elephant indicates there are no more errors/warning from the query:

Tuning a map reduce job

To tune a map reduce job, run the following Pi code example with the default settings:

hadoop-mapreduce-examples pi -D mapreduce.map.memory.mb=4096 -D mapreduce.reduce.memory.mb=4096 200 1000000

The following screenshot shows application metrics for the submitted jobs:

The following screenshot of Dr. Elephant shows that the mapper time, map memory, and reducer memory need tuning:

The following screenshot shows Dr. Elephant recommendation on improving Mapper Memory:

The following screenshot shows Dr. Elephant recommendation on improving Reducer Memory:

Now, you can set the memory for Mapper and Reducer to the following value:

set mapreduce.map.memory.mb=4096

set mapreduce.reduce.memory.mb=4096

You can reduce the number of mappers and increase the number of samples per mapper to get the same Pi results. See the following code:

hadoop-mapreduce-examples pi -D mapreduce.map.memory.mb=4096 -D mapreduce.reduce.memory.mb=4096 100 2000000

The following screenshot shows improved metrics from recommendation:

The job shows a 50% improvement in efficiency—from approximately 60 seconds to 38 seconds.

You can run with 10 mappers and observe even more improved efficiency. See the following code:

hadoop-mapreduce-examples pi -D mapreduce.map.memory.mb=4096 -D mapreduce.reduce.memory.mb=4096 10 2000000

The following screenshot shows improved metrics from Dr. Elephant recommendation:

Dr. Elephant on an Amazon EMR cluster monitored and provided insights to optimize your Hive and Hadoop jobs.

Configuring your production workloads

To fine-tune the Dr. Elephant tool, navigate to the /mnt/dr-elephant-2.1.7/app-conf directory and edit the configuration files accordingly.

For example, you can write heuristics and plug them into the Dr. Elephant tool to set certain conditions to change in severity based on the number of tasks, and map or reduce numbers deviations from cluster capacity. You can also change the number of threads to analyze the completed jobs, or intervals between fetches from the resource manager.

The following screenshot shows list of Dr. Elephant configuration file for further tuning and customization according to requirements:

For more information about the metrics and configurations, see the GitHub repo.

Conclusion

This post showed how you can launch Dr. Elephant and Sparklens tools on an Amazon EMR cluster and try yourselves on optimizing and performance tuning for both compute and memory-intensive jobs. Dr. Elephant and Sparklens can help you optimize and enable faster job execution times and efficient memory management by using the parallelism of the dataset and optimal compute node usage. It also helps you overcome the challenges of processing many Spark and Hive jobs by adjusting the parallelism of the workload and cluster to meet your demands.

 


About the Author

Nivas Shankar is a Senior Data Architect at Amazon Web Services. He helps and works closely with enterprise customers building data lakes and analytical applications on the AWS platform. He holds a Masters degree in physics and is highly passionate about theoretical physics concepts.

 

 

Mert Hocanin is a big data architect with AWS, covering several products, including EMR, Athena and Managed Blockchain. Prior to working in AWS, he has worked on Amazon.com’s retail business as a Senior Software Development Engineer, building a data lake to process vast amounts of data from all over the company for reporting purposes. When not building and designing data lakes, Mert enjoys traveling and food.

Govern how your clients interact with Apache Kafka using API Gateway

Post Syndicated from Prasad Alle original https://aws.amazon.com/blogs/big-data/govern-how-your-clients-interact-with-apache-kafka-using-api-gateway/

At some point, you may ask yourself:

  • How can I implement IAM authentication or authorization to Amazon Managed Streaming for Apache Kafka (MSK)?
  • How can I protect my Apache Kafka cluster from traffic spikes based on specific scenarios without setting quotas on the cluster?
  • How can I validate requests adhere to a JSON Schema?
  • How can I make sure parameters are included in the URI, query string, and headers?
  • How can Amazon MSK ingest messages lightweight clients without using an agent or the native Apache Kafka protocol?

These tasks are achievable using custom proxy servers or gateways, but these options can be difficult to implement and manage. On the other hand, API Gateway has these features and is a fully managed AWS service.

In this blog post we will show you how Amazon API Gateway can answer these questions as a component between your Amazon MSK cluster and your clients.

Amazon MSK is a fully managed service for Apache Kafka that makes it easy to provision Kafka clusters with just a few clicks without the need to provision servers, manage storage, or configure Apache Zookeeper manually. Apache Kafka is an open-source platform for building real-time streaming data pipelines and applications.

Some use cases include ingesting messages from lightweight IoT devices that don’t have support for native Kafka protocol and orchestrating your streaming services with other backend services including third-party APIs.

This pattern also comes with the following trade-offs:

  • Cost and complexity due to another service to run and maintain.
  • Performance overhead because it adds extra processing to construct and make HTTP requests. Additionally, REST Proxy needs to parse requests, transform data between formats both for produce, and consume requests.

When you implement this architecture in a production environment, you should consider these points with your business use case and SLA needs.

Solution overview

To implement the solution, complete the following steps:

  1. Create an MSK cluster, Kafka client, and Kafka REST Proxy
  2. Create a Kafka topic and configure the REST Proxy on a Kafka client machine
  3. Create an API with REST Proxy integration via API Gateway
  4. Test the end-to-end processes by producing and consuming messages to Amazon MSK

The following diagram illustrates the solution architecture.

 

Within this architecture, you create an MSK cluster and set up an Amazon EC2 instance with the REST Proxy and Kafka client. You then expose the REST Proxy through Amazon API Gateway and also test the solution by producing messages to Amazon MSK using Postman.

For the production implementation, make sure to set up the REST Proxy behind load balancer with an Auto Scaling group.

Prerequisites

Before you get started, you must have the following prerequisites:

  • An AWS account that provides access to AWS services
  • An IAM user with an access key and secret access key to configure the AWS CLI
  • An Amazon EC2 keypair

Creating an MSK cluster, Kafka client, and REST Proxy

AWS CloudFormation provisions all the required resources, including VPC, subnets, security groups, Amazon MSK cluster, Kafka client, and Kafka REST Proxy. To create these resources, complete the following steps:

  1. Launch in the us-east-1 or us-west-2It takes approximately 15 to 20 minutes to complete.
  2. From the AWS CloudFormation console, choose AmzonMSKAPIBlog.
  3. Under Outputs, get the MSKClusterARN, KafkaClientEC2InstancePublicDNS, and MSKSecurityGroupID details.
  4. Get the ZooKeeperConnectionString and other information about your cluster by entering the following code (provide your Region, cluster ARN, and AWS named profile):
    $ aws kafka describe-cluster --region <Replace_With_us-east-1_or_us-west-2> --cluster-arn <Replace_With_Your_cluster-arn> --profile <Replace_With_Your_Profile>

    The following code example shows one of the lines in the output of this command:

    {
    ….
    ….
    "ZookeeperConnectString": "z-2.XXXXXX.us-east-1.amazonaws.com:2181,z-3.XXXXXX.us-east-1.amazonaws.com:2181,z-1.XXXXXX.us-east-1.amazonaws.com:2181"
    }

  5. Get the BootstrapBrokerString by entering the following code (provide your Region, cluster ARN, and AWS named profile):

    $ aws kafka get-bootstrap-brokers --region <Replace_With_us-east-1_or_us-west-2> --cluster-arn "<Replace_With_us-east-1_or_us-west-2>" --profile <Replace_With_Your_Profile>

    The following code example shows the output of this command:

    {
    "BootstrapBrokerString": "b-2.XXXXXXXXXXXX.us-east-1.amazonaws.com:9092,b-1.XXXXXXXXXXXX.amazonaws.com:9092,b-3.XXXXXXXXXXXX.us-east-1.amazonaws.com:9092"
    }

Creating a Kafka topic and configuring a Kafka REST Proxy

To create a Kafka topic and configure a Kafka REST Proxy on a Kafka client machine, complete the following steps:

  1. SSH into your Kafka client Amazon EC2 instance. See the following code:
    ssh -i <Replace_With_Your_pemfile> [email protected]<Replace_With_Your_KafkaClientDNS>

  2. Go to the bin folder (kafka/kafka_2.12-2.2.1/bin/) of the Apache Kafka installation on the client machine.
  3. Create a topic by entering the following code (provide the value you obtained for ZookeeperConnectString in the previous step):
    ./kafka-topics.sh --create --zookeeper <Replace_With_Your_ZookeeperConnectString> --replication-factor 3 --partitions 1 --topic amazonmskapigwblog

    If the command is successful, you see the following message: Created topic amazonmskapigwblog.

  4. To connect the Kafka REST server to the Amazon MSK cluster, modify kafka-rest.properties in the directory (/home/ec2-user/confluent-5.3.1/etc/kafka-rest/) to point to your Amazon MSK’s ZookeeperConnectString and BootstrapserversConnectString information. See the following code:
    sudo vi /home/ec2-user/confluent-5.3.1/etc/kafka-rest/kafka-rest.properties
    
    	zookeeper.connect=<Replace_With_Your_ZookeeperConnectString>
    bootstrap.servers=<Replace_With_Your_BootstrapserversConnectString> 

    As an additional, optional step, you can create an SSL for securing communication between REST clients and the REST Proxy (HTTPS). If SSL is not required, you can skip steps 5 and 6.

  5. Generate the server and client certificates. For more information, see Creating SLL Keys and Certificates on the Confluent website.
  6. Add the necessary property configurations to the kafka-rest.properties configuration file. See the following code example:
    listeners=http://0.0.0.0:8082,https://0.0.0.0:8085
    ssl.truststore.location=<Replace_With_Your_tuststore.jks>
    ssl.truststore.password=<Replace_With_Your_tuststorepassword>
    ssl.keystore.location=<Replace_With_Your_keystore.jks>
    ssl.keystore.password=<Replace_With_Your_keystorepassword>
    ssl.key.password=<Replace_With_Your_sslkeypassword>

    For more detailed instructions, see Encryption and Authentication with SSL on the Confluent website.

You have now created a Kafka topic and configured Kafka REST Proxy to connect to your Amazon MSK cluster.

Creating an API with Kafka REST Proxy integration

To create an API with Kafka REST Proxy integration via API Gateway, complete the following steps:

  1. On the API Gateway console, choose Create API.
  2. For API type, choose REST API.
  3. Choose Build.
  4. Choose New API.
  5. For API Name, enter a name (for example, amazonmsk-restapi).
  6. As an optional step, for Description, enter a brief description.
  7. Choose Create API.The next step is to create a child resource.
  8. Under Resources, choose a parent resource item.
  9. Under Actions, choose Create Resource.The New Child Resource pane opens.
  10. Select Configure as proxy resource.
  11. For Resource Name, enter proxy.
  12. For Resource Path, enter /{proxy+}.
  13. Select Enable API Gateway CORS.
  14. Choose Create Resource.After you create the resource, the Create Method window opens.
  15. For Integration type, select HTTP Proxy.
  16. For Endpoint URL, enter an HTTP backend resource URL (your Kafka Clien Amazont EC2 instance PublicDNS; for example, http://KafkaClientEC2InstancePublicDNS:8082/{proxy} or https://KafkaClientEC2InstancePublicDNS:8085/{proxy}).
  17. Use the default settings for the remaining fields.
  18. Choose Save.
  19. For SSL, for Endpoint URL, use the HTTPS endpoint.In the API you just created, the API’s proxy resource path of {proxy+} becomes the placeholder of any of the backend endpoints under http://YourKafkaClientPublicIP:8082/.
  20. Choose the API you just created.
  21. Under Actions, choose Deploy API.
  22. For Deployment stage, choose New Stage.
  23. For Stage name, enter the stage name (for example, dev, test, or prod).
  24. Choose Deploy.
  25. Record the Invoke URL after you have deployed the API.

Your external Kafka REST Proxy, which was exposed through API Gateway, now looks like https://YourAPIGWInvoleURL/dev/topics/amazonmskapigwblog. You use this URL in the next step.

Testing the end-to-end processes

To test the end-to-end processes by producing and consuming messages to Amazon MSK. Complete the following steps:

  1. SSH into the Kafka Client Amazon EC2 instance. See the following code:
    ssh -i "xxxxx.pem" [email protected]

  2. Go to the confluent-5.3.1/bin directory and start the kafka-rest service. See the following code:
    ./kafka-rest-start /home/ec2-user/confluent-5.3.1/etc/kafka-rest/kafka-rest.properties

    If the service already started, you can stop it with the following code:

    ./kafka-rest-stop /home/ec2-user/confluent-5.3.1/etc/kafka-rest/kafka-rest.properties

  3. Open another terminal window.
  4. In the kafka/kafka_2.12-2.2.1/bin directory, start the Kafka console consumer. See the following code:
    ./kafka-console-consumer.sh --bootstrap-server "BootstrapserversConnectString" --topic amazonmskapigwblog --from-beginning 

    You can now produce messages using Postman. Postman is an HTTP client for testing web services.

    Be sure to open TCP ports on the Kafka client security group from the system you are running Postman.

  5. Under Headers, choose the key Content-Type with value application/vnd.kafka.json.v2+json.
  6. Under Body, select raw.
  7. Choose JSON.This post enters the following code:
    {"records":[{"value":{"deviceid": "AppleWatch4","heartrate": "72","timestamp":"2019-10-07 12:46:13"}}]} 

    The following screen shot shows messages coming to the Kafka consumer from the API Gateway Kafka REST endpoint.

Conclusion

This post demonstrated how easy it is to set up REST API endpoints for Amazon MSK with API Gateway. This solution can help you produce and consume messages to Amazon MSK from any IoT device or programming language without depending on native Kafka protocol or clients.

If you have questions or suggestions, please leave your thoughts in the comments.

 


About the Author

Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.

 

 

Francisco Oliveira is a senior big data solutions architect with AWS. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

Integrate Power BI with Amazon Redshift for insights and analytics

Post Syndicated from Vu Le original https://aws.amazon.com/blogs/big-data/integrate-power-bi-with-amazon-redshift-for-insights-and-analytics/

Amazon Redshift is a fast, fully managed, cloud-native data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing business intelligence (BI) tools. Microsoft Power BI is a business analytics service that delivers insights to enable fast, informed decisions. With Power BI, you can perform ad-hoc query analysis, visualize data, and create-user friendly dashboards.

This post demonstrates how to integrate Power BI with Amazon Redshift to deliver powerful visualization and insights.

Solution architecture

This post provides code artifacts to help you create a big data environment on AWS from scratch. You can automatically provision a new Amazon Redshift data warehouse in under an hour without much technical depth required by using the AWS CloudFormation template and code examples provided.

The post also demonstrates how to configure integration for the most common deployment scenarios. For example, how do you connect Power BI to AWS services using ODBC/JDBC drivers? How do you connect to AWS services that are deployed behind a private network? What credentials do you use to connect to AWS services? This post addresses and answers these questions in the subsequent sections.

The following diagram shows the solution architecture deployed to AWS. All components inside the AWS Cloud boundary are deployed automatically using an AWS CloudFormation template to allow you to reproduce this solution quickly using your AWS account.

When deployed, the solution contains the following components:

  • Networking infrastructure that includes VPC, public and private subnets, security groups, internet gateway, NAT Gateway, and route tables
  • Linux EC2 instance provisioned in a public subnet to generate sample data
  • Windows Server EC2 instance to host Power BI Desktop
  • Windows Server EC2 instance to act as an on-premises data gateway that handles the communication between Power BI and Amazon Redshift
  • An Amazon Redshift cluster deployed in a private subnet
  • IAM user and roles with permissions to access Amazon S3 and Amazon Redshift

Prerequisites

To complete the steps in this post, you need the following prerequisites:

  • AWS account – You need an account to follow the instructions and test it with minimal cost.
    • If you are creating your account for the first time, choose the us-east-1 region.
    • Create a key pair for the selected Region. For more information, see Amazon EC2 Key Pairs.
  • Power BI tenant – You can test all the described Power BI functionalities with minimum to no cost with the following:
    • Power BI Pro license
    • Access to your Power BI admin portal

Creating and configuring your development environment

Before you can create Power BI visualizations in AWS, you need to load a fully working development environment with sample data. This section contains instructions to create and configure that environment from scratch. After completing all the deployment steps in this section, you have an AWS infrastructure with all the integration hooks between AWS and Power BI fully configured.

To create this environment, execute the following high-level tasks:

  1. Run an AWS CloudFormation template to provision the initial development environment.
  2. SSH into an Amazon EC2 Linux instance to generate a sample dataset.
  3. Configure the data warehouse by creating and loading data into Amazon Redshift tables.
  4. Install and configure the Power BI Desktop.
  5. Configure a data gateway in Power BI.
  6. Install the Power BI mobile app so you can consume the visuals from your phone.

To make the deployment quick and easy, this post automates much of the deployment steps through the use of an AWS CloudFormation template. For the tasks that could not be automated, the post provides detailed instructions along with actual code examples. You can find the template and relevant code in the GitHub repo. Remember to clone this GitHub repo to a local working folder because you need to reference these artifacts as you walk through the deployment steps in this section.

Provisioning a development environment

In this step, you run an AWS CloudFormation template to provision the initial infrastructure. Complete the following steps:

  1. On the AWS CloudFormation console, under Region, choose US East (N. Virginia).You can choose other Regions, but you need to change the EC2 AMI IDs in the AWS CloudFormation template parameters to match the chosen Region.
  2. Choose Create stack.
  3. Under Specify template, select Upload a template file.
  4. Click on Choose file.
  5. Select ProvisionDevEnv.yaml file from your working folder.
  6. Choose Next.
  7. Under Specify stack details, provide the following information:
    • Stack name – Enter a name for your CloudFormation stack.
    • AmazonLinuxImageID – Use the default value if you are running in US East (N. Virginia). Otherwise, provide the Amazon Linux AMI ID for your chosen Region. An easy way to find the AMI ID is to go to the Amazon EC2 console and launch a new instance. The Amazon Linux AMI ID should be listed on the first page. Use Amazon Linux, not Amazon Linux 2.
    • EC2KeyPair – Enter the name of your key pair for your Region.
    • MyIPAddress – Enter your public IP address in CIDR format, for example, 205.251.192.0/32. To find your public IP address, search online for what is my ip address.
    • RedshiftMasterUserPassword – Enter a password for your Amazon Redshift cluster.
    • RedshiftMasterUsername – Use the default username or provide your own.
    • WindowsServerImageID – Use the default value if you are running in US East (N. Virginia). Otherwise, provide the Microsoft Windows Server 2019 Base AMI ID for your Region.
    • For all other options, keep the default values.
  1. Choose Next.
  2. On the Configure stack options page, choose Next.
  3. On the Review page, select the “I acknowledge…” check box.
  4. Choose Create stack.

Wait for the CloudFormation stack creation to complete, which can take approximately 10 minutes. When the CloudFormation stack is complete, choose Stack Outputs. In this section, you can find all the keys and values of the resources that you need to reference later in the deployment process.

Generating an sample dataset

Now that you have provisioned the initial infrastructure, let us generate the sample data to be use in the Power BI visualizations. For more information about how to generate the sample dataset, see GenerateSampleDataset.txt in your working folder.

Replace the placeholders in this file with the following corresponding values from the AWS CloudFormation Stack Outputs section:

  • [KeyPairFileName] – Replace with the value of the EC2KeyPair template input parameter
  • [EC2IPAddress] – Replace with the public IP address of this EC2 instance with the key DataGeneratorPublicIP
  • [S3BucketName] – Replace with the actual S3 bucket name

After you generate the dataset and copy it to your S3 bucket, terminate the EC2 instance with the name Data Generator. You don’t need it anymore.

Configuring an Amazon Redshift cluster

In this step, you run a series of SQL commands to create tables in your Amazon Redshift cluster. After that, you load the data you generated earlier from Amazon S3 into the Amazon Redshift tables. Complete the following steps:

  1. On the Amazon Redshift console, choose Query Editor.
  2. Connect to the cluster by providing the database name, database user, and password that were provided as input parameters to the CloudFormation stack.
  3. From the schema drop-down menu, choose public.
  4. Open the file CreateRedshiftTables.txt in your working folder and run each SQL statement, one at a time, in the Query Editor.
  5. Run the COPY commands in the file CopyFromS3ToRedshift.txt, one at a time, to load sample data into the tables you just created. Replace the following placeholders with the actual values from the AWS CloudFormation Stack Outputs section
    • [S3BucketName] – Replace with actual S3 bucket name.
    • [RedshiftAccessRoleName] – Replace with the IAM role name.
  6. Confirm that you see the following tables listed under the public schema. Do a select count(*) on each table to see how many rows you have for the following tables:
    • customer: 1, 500,000
    • lineitem: 59,986,052
    • nation: 25
    • orders: 15,000,000
    • part: 2,000,000
    • partsupp: 8,000,000
    • region: 5
    • supplier: 100,000

Installing and configuring Power BI Desktop

In this step, you connect using Remote Desktop Protocol (RDP) into the Windows Server jump box and install Power BI Desktop, which you use later to create visualizations from the data that you pulled from Amazon Redshift. For more information, see Connecting to Your Windows Instance. Complete the following steps:

  • From your local machine, RDP to the Windows Server Jump Box. You can get the public IP address of the jump box from the CloudFormation Stack Outputs tab or from your EC2 Console.
  • Remember to use the EC2KeyPair you specified in the CloudFormation template to decrypt the administrator password by using the EC2 console:
    • Open the Amazon EC2 console, and then choose Instances.
    • Select the instance of your choice, choose Actions, and then choose Get Windows Password.
      Note: It can take a few minutes for this option to be available after first launching a new instance.
    • Click on Choose File, select your key pair file, and then choose Open. Alternatively, you can paste the contents of your key pair into the text box.
    • Choose Decrypt Password.
  • The first thing you’ll want to do after you’ve logged into the jump box is to turn off IE Enhanced Security Configuration. Otherwise, you will have problems logging into Power BI Service later.
    • Inside the Jump Box, go to Start > Server Manager > Local Server
    • Click the On link to the right of IE Enhanced Security Configuration
    • Click Off under the Administrators section and Click OK
  • Download and install Power BI Desktop 64-bit on the Jump Box. Remember to just download PBIDesktopSetup_x64.exe, which is the 64-bit version of Power BI Desktop.  After you download the executable, run it to install Power BI Desktop.  Accept all the default settings.

Installing and configuring a Microsoft on-premises data gateway

In this step, you install a Microsoft on-premises data gateway to enable Power BI to communicate with data stores that are not accessible from the public internet, such as your Amazon Redshift cluster, which you deployed inside a private subnet. Install the Microsoft on-premises data gateway on the data gateway instance. You can perform the following tests at minimum or no cost. Complete the following steps:

  • From the Windows Server Jump Box, you will RDP to the data gateway using its private IP address, which you can get from the EC2 Console.
  • Use the EC2KeyPair to decrypt the administrator password for the data gateway following the same instructions as outlined in the previous steps.
  • Turn off IE Enhanced Security Configuration.
    • Inside the data gateway, go to Start > Server Manager > Local Server
    • Click the On link to the right of IE Enhanced Security Configuration
    • Click Off under the Administrators section
    • Click OK
  • Download and install Microsoft On-premises Data Gateway
    • On the screen that asks you to choose the type of gateway you need, choose On-premises data gateway (recommended)
    • Accept the default values and click Install
    • When the installer ask you to sign-in, type the email address associated with the admin account for the Power BI Pro tenant.
    • Click Sign in
    • After sign-in, you will be asked to register the gateway. If you are asked to register a new gateway or migrate, restore, or take over an existing gateway, choose Register a new gateway.
    • Give your gateway a name and provide a recovery key
    • Click Configure
    • You should see a green checkmark indicating the gateway is online and ready to be used.

Configuring the data gateway in Power BI

In this step, you log in to your Power BI tenant as an administrator to configure how Power BI connects to the AWS data sources. Complete the following steps:

  1. On your browser (from any machine), launch https://powerbi.com.
  2. Log in with an account that has admin privileges. Use the same account that you for the data gateway.
  3. Choose the gear icon.
  4. Choose Manage gateways.
  5. Choose the gateway you installed earlier.
  6. Select the check box Allow user’s custom data connectors to refresh through this gateway cluster.
  7. Choose Apply.
  8. Choose ADD DATA SOURCE.
  9. Create a data source for Amazon Redshift with the following parameters:
    • Data Source Name – Amazon Redshift Private VPC.
    • Data Source Type – Amazon Redshift.
    • Server – Replace with the value of the [RedshiftClusterEndpoint] key from the AWS CloudFormation Stack Outputs
    • Database – Enter the name of the Amazon Redshift database, which you provided earlier as part of the CloudFormation stack parameters. The default is dev.
    • Username – Enter your database username, which you provided earlier as part of the CloudFormation stack parameters.
    • Password – Enter your database password, which you provided earlier as part of the CloudFormation stack parameters.
  10. Choose Add.

Installing the Power BI mobile app

In this step, you install the Power BI app on your mobile phone so you can interact with Power BI reports and dashboards later from your phone. Complete the following steps:

  1. Go to either the Apple App Store or Google Play Store and search for Microsoft Power BI.
  2. Install the app.
  3. Sign in to Power BI with the same account you’ve been using.

Visualizing Redshift Data using Power BI Desktop

In this section, you will connect Power BI Desktop to Redshift; create a report; publish the report to Power BI service; and finally consume the report from your phone.

  1. From Jump Box, launch Power BI Desktop
  2. If you haven’t already, sign in to the Service with your Power BI credentials
  3. Select Home > Get Data > More > Database > Amazon Redshift
  4. Click Connect
  5. On the next screen, provide the following values:
    • Server – copy the value of the key [RedshiftClusterEndpoint], which is found in the CloudFormation Stack Outputs tab
    • Databasedev (or whatever name you gave for the database)
    • Data Connectivity ModeDirectQuery
  6. If this is the first time you’re connecting to this cluster, then you’ll need to type the Redshift credentials you provided to the CloudFormation Stack earlier. Type in your Redshift username and password in the popup window and click on Connect.
  7. Select the orders, lineitem, and part tables from the Navigator window and then click Load.
  8. Once the data has finished loading, you will need to define table relationships in the in-memory model.
    • In Power BI Desktop, change to the Model view by clicking on the “table relationship” icon on the left.
    • Create relationships between the tables by dragging and dropping the following columns on each other.
      • o_orderkey = lineitem.l_orderkey
      • p_partkey = lineitem.l_partkey
  1. Now, you are ready to create some charts. Change to Report view and add the following visualizations to the report.
    • Date Slicer
      • Visualization type – Slicer
      • Field – orders.o_orderdate
    • Sales by Date by Manufacturer
      • Visualization type – Line Chart
      • Axis – orders.o_orderdate
      • Legend – part.p_mfgr
      • Values – lineitem.l_extendedprice
    • Order Count
      • Visualization type – Card
      • Fields – Count of orders.o_orderkey
    • Line Item Count
      • Visualization type – Card
      • Fields – Count of lineitem.l_linenumber
  1. With a little formatting, your report shows Sales by Date and Manufacturer and should look something similar to this:

  1. Save the report and give it a name.

Publishing the report to Power BI

After you create a report in Power BI Desktop, you need to publish the dataset and report to Power BI to share it with others or consume it from a mobile device. Complete the following steps:

  1. In Power BI Desktop, choose Publish
  2. Choose My workspace.
  3. Choose Select.

It should not take long for the publishing to complete because no data was imported.

Configuring the gateway connection

You now need to tell Power BI to map the data source inside this report to the Amazon Redshift data source that you registered with the data gateway. Complete the following steps:

  1. From any machine, open a browser and launch https://powerbi.com.
  2. Log in to Power BI.
  3. Choose My workspace.
  4. Choose Datasets.
  5. Choose the … icon next to dataset you just published.
  6. Choose Settings.
  7. Expand the Gateway connection section.
  8. For Use a data gateway, choose On.
  9. Under Maps to, choose the Amazon Redshift data source you created earlier.
  10. Choose Apply.

Consuming the Power BI reports and dashboard

Now that you have published the report to Power BI successfully, you can interact with it on https://powerbi.com.

  1. Launch https://powerbi.com.
  2. Log in to Power BI.
  3. Choose My workspace.
  4. Choose Reports.
  5. Choose the report you just published.

Your report should look similar to the following screenshot.  It shows the same Sales by Date and Manufacturer graph as before.

You can interact with the reports and dashboards on the Power BI mobile app the same way you can on the Power BI website. The following screenshot shows how the Sales by Date and Manufacturer report looks on the Power BI app for iPhone.

 

When you view and interact with the report, whether from the website or your mobile app, you’re always viewing the latest data because you connect directly to Amazon Redshift.

Conclusion

This post showed how to use Power BI to query data in Amazon Redshift to generate reports, visualization, and dashboards, and described a solution architecture that you can deploy in the AWS Cloud. The post also demonstrated how easy it is to set up and connect Power BI to an Amazon Redshift cluster in a VPC without public internet access and how to push Power BI dashboards to mobile devices.

Special acknowledgement goes to AWS colleagues Juan Yu, Sophia Jung and Joe Harris for their valuable comments and suggestions.

If you have any questions or suggestions, leave your feedback in the comment section. If you need any further assistance to optimize your Amazon Redshift implementation, contact your AWS account team or a trusted AWS partner.

 


About the authors

Vu Le is a Senior Data Architect, Strategic Accounts Team, AWS Professional Services.

 

 

 

 

Po Hong, PhD, is a Senior Data Architect, Data & Analytics Global Specialty Practice, AWS Professional Services.

 

 

Streaming ETL with Apache Flink and Amazon Kinesis Data Analytics

Post Syndicated from Steffen Hausmann original https://aws.amazon.com/blogs/big-data/streaming-etl-with-apache-flink-and-amazon-kinesis-data-analytics/

Most businesses generate data continuously in real time and at ever-increasing volumes. Data is generated as users play mobile games, load balancers log requests, customers shop on your website, and temperature changes on IoT sensors. You can capitalize on time-sensitive events, improve customer experiences, increase efficiency, and drive innovation by analyzing this data quickly. The speed at which you get these insights often depends on how quickly you can load data into data lakes, data stores, and other analytics tools. As data volume and velocity increases, it becomes more important to not only load the incoming data, but also to transform and analyze it in near-real time.

This post looks at how to use Apache Flink as a basis for sophisticated streaming extract-transform-load (ETL) pipelines. Apache Flink is a framework and distributed processing engine for processing data streams. AWS provides a fully managed service for Apache Flink through Amazon Kinesis Data Analytics, which enables you to build and run sophisticated streaming applications quickly, easily, and with low operational overhead.

This post discusses the concepts that are required to implement powerful and flexible streaming ETL pipelines with Apache Flink and Kinesis Data Analytics. It also looks at code examples for different sources and sinks. For more information, see the GitHub repo. The repo also contains an AWS CloudFormation template so you can get started in minutes and explore the example streaming ETL pipeline.

Architecture for streaming ETL with Apache Flink

Apache Flink is a framework and distributed processing engine for stateful computations over unbounded and bounded data streams. It supports a wide range of highly customizable connectors, including connectors for Apache Kafka, Amazon Kinesis Data Streams, Elasticsearch, and Amazon Simple Storage Service (Amazon S3). Moreover, Apache Flink provides a powerful API to transform, aggregate, and enrich events, and supports exactly-once semantics. Apache Flink is therefore a good foundation for the core of your streaming architecture.

To deploy and run the streaming ETL pipeline, the architecture relies on Kinesis Data Analytics. Kinesis Data Analytics enables you to run Flink applications in a fully managed environment. The service provisions and manages the required infrastructure, scales the Flink application in response to changing traffic patterns, and automatically recovers from infrastructure and application failures. You can combine the expressive Flink API for processing streaming data with the advantages of a managed service by using Kinesis Data Analytics to deploy and run Flink applications. It allows you to build robust streaming ETL pipelines and reduces the operational overhead of provisioning and operating infrastructure.

The architecture in this post takes advantage of several capabilities that you can achieve when you run Apache Flink with Kinesis Data Analytics. Specifically, the architecture supports the following:

  • Private network connectivity – Connect to resources in your Amazon Virtual Private Cloud (Amazon VPC), in your data center with a VPN connection, or in a remote region with a VPC peering connection
  • Multiple sources and sinks – Read and write data from Kinesis data streams, Apache Kafka clusters, and Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters
  • Data partitioning – Determine the partitioning of data that is ingested into Amazon S3 based on information extracted from the event payload
  • Multiple Elasticsearch indexes and custom document IDs – Fan out from a single input stream to different Elasticsearch indexes and explicitly control the document ID
  • Exactly-once semantics – Avoid duplicates when ingesting and delivering data between Apache Kafka, Amazon S3, and Amazon Elasticsearch Service (Amazon ES)

The following diagram illustrates this architecture.

The remainder of this post discusses how to implement streaming ETL architectures with Apache Flink and Kinesis Data Analytics. The architecture persists streaming data from one or multiple sources to different destinations and is extensible to your needs. This post does not cover additional filtering, enrichment, and aggregation transformations, although that is a natural extension for practical applications.

This post shows how to build, deploy, and operate the Flink application with Kinesis Data Analytics, without further focusing on these operational aspects. It is only relevant to know that you can create a Kinesis Data Analytics application by uploading the compiled Flink application jar file to Amazon S3 and specifying some additional configuration options with the service. You can then execute the Kinesis Data Analytics application in a fully managed environment. For more information, see Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications and the Amazon Kinesis Data Analytics developer guide.

Exploring a streaming ETL pipeline in your AWS account

Before you consider the implementation details and operational aspects, you should get a first impression of the streaming ETL pipeline in action. To create the required resources, deploy the following AWS CloudFormation template:

The template creates a Kinesis data stream and an Amazon Elastic Compute Cloud (Amazon EC2) instance to replay a historic data set into the data stream. This post uses data based on the public dataset obtained from the New York City Taxi and Limousine Commission. Each event describes a taxi trip made in New York City and includes timestamps for the start and end of a trip, information on the boroughs the trip started and ended in, and various details on the fare of the trip. A Kinesis Data Analytics application then reads the events and persists them to Amazon S3 in Parquet format and partitioned by event time.

Connect to the instance by following the link next to ConnectToInstance in the output section of the CloudFromation template that you executed previously. You can then start replaying a set of taxi trips into the data stream with the following code:

$ java -jar /tmp/amazon-kinesis-replay-*.jar -noWatermark -objectPrefix artifacts/kinesis-analytics-taxi-consumer/taxi-trips-partitioned.json.lz4/dropoff_year=2018/ -speedup 3600 -streamName <Kinesis stream name>

You can obtain this command with the correct parameters from the output section of the AWS CloudFormation template. The output section also points you to the S3 bucket to which events are persisted and an Amazon CloudWatch dashboard that lets you monitor the pipeline.

For more information about enabling the remaining combinations of sources and sinks, for example, Apache Kafka and Elasticsearch, see the GitHub repo.

Building a streaming ETL pipeline with Apache Flink

Now that you have seen the pipeline in action, you can dive into the technical aspects of how to implement the functionality with Apache Flink and Kinesis Data Analytics.

Reading and writing to private resources

Kinesis Data Analytics applications can access resources on the public internet and resources in a private subnet that is part of your VPC. By default, a Kinesis Data Analytics application only enables access to resources that you can reach over the public internet. This works well for resources that provide a public endpoint, for example, Kinesis data streams or Amazon Elasticsearch Service.

If your resources are private to your VPC, either for technical or security-related reasons, you can configure VPC connectivity for your Kinesis Data Analytics application. For example, MSK clusters are private; you cannot access them from the public internet. You may run your own Apache Kafka cluster on premises that is not exposed to the public internet and is only accessible from your VPC through a VPN connection. The same is true for other resources that are private to your VPC, such as relational databases or AWS PrivateLink-powered endpoints.

To enable VPC connectivity, configure the Kinesis Data Analytics application to connect to private subnets in your VPC. Kinesis Data Analytics creates elastic network interfaces in one or more of the subnets provided in your VPC configuration for the application, depending on the parallelism of the application. For more information, see Configuring Kinesis Data Analytics for Java Applications to access Resources in an Amazon VPC.

The following screenshot shows an example configuration of a Kinesis Data Analytics application with VPC connectivity:

The application can then access resources that have network connectivity from the configured subnets. This includes resources that are not directly contained in these subnets, which you can reach over a VPN connection or through VPC peering. This configuration also supports endpoints that are available over the public internet if you have a NAT gateway configured for the respective subnets. For more information, see Internet and Service Access for a VPC-Connected Kinesis Data Analytics for Java application.

Configuring Kinesis and Kafka sources

Apache Flink supports various data sources, including Kinesis Data Streams and Apache Kafka. For more information, see Streaming Connectors on the Apache Flink website.

To connect to a Kinesis data stream, first configure the Region and a credentials provider. As a general best practice, choose AUTO as the credentials provider. The application will then use temporary credentials from the role of the respective Kinesis Data Analytics application to read events from the specified data stream. This avoids baking static credentials into the application. In this context, it is also reasonable to increase the time between two read operations from the data stream. When you increase the default of 200 milliseconds to 1 second, the latency increases slightly, but it facilitates multiple consumers reading from the same data stream. See the following code:

Properties properties = new Properties();
properties.setProperty(AWSConfigConstants.AWS_REGION, "<Region name>");
properties.setProperty(AWSConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
properties.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS, "1000");

This config is passed to the FlinkKinesisConsumer with the stream name and a DeserializationSchema. This post uses the TripEventSchema for deserialization, which specifies how to deserialize a byte array that represents a Kinesis record into a TripEvent object. See the following code:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<TripEvent> events = env.addSource(
  new FlinkKinesisConsumer<>("<Kinesis stream name>", new TripEventSchema(), properties)
);

For more information, see TripEventSchema.java and TripEvent.java on GitHub. Apache Flink provides other more generic serializers that can deserialize data into strings or JSON objects.

Apache Flink is not limited to reading from Kinesis data streams. If you configure the Kinesis Data Analytics application’s VPC settings correctly, Apache Flink can also read events from Apache Kafka and MSK clusters. Specify a comma-separated list of broker and port pairs to use for the initial connection to your cluster. This config is passed to the FlinkKafkaConsumer with the topic name and a DeserializationSchema to create a source that reads from the respective topic of the Apache Kafka cluster. See the following code:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "<comma separated list of broker and port pairs>");

DataStream<TripEvent> events = env.addSource(
  new FlinkKafkaConsumer<>("<topic name>", new TripEventSchema(), properties)
);

The resulting DataStream contains TripEvent objects that have been deserialized from the data ingested into the data stream and Kafka topic, respectively. You can then use the data streams in combination with a sink to persist the events into their respective destination.

Persisting data in Amazon S3 with data partitioning

When you persist streaming data to Amazon S3, you may want to partition the data. You can substantially improve query performance of analytic tools by partitioning data because partitions that cannot contribute to a query can be pruned and therefore do not need to be read. For example, the right partitioning strategy can improve Amazon Athena query performance and cost by reducing the amount of data read for a query. You should choose to partition your data by the same attributes used in your application logic and query patterns. Furthermore, it is common when processing streaming data to include the time of an event, or event time, in your partitioning strategy. This contrasts with using the ingestion time or some other service-side timestamp that does not reflect the time an event occurred as accurately as event time.

For more information about taking data partitioned by ingestion time and repartitioning it by event time with Athena, see Analyze your Amazon CloudFront access logs at scale. However, you can directly partition the incoming data based on event time with Apache Flink by using the payload of events to determine the partitioning, which avoids an additional post-processing step. This capability is called data partitioning and is not limited to partition by time.

You can realize data partitioning with Apache Flink’s StreamingFileSink and BucketAssigner. For more information, see Streaming File Sink on the Apache Flink website.

When given a specific event, the BucketAssigner determines the corresponding partition prefix in the form of a string. See the following code:

public class TripEventBucketAssigner implements BucketAssigner<TripEvent, String> {
  public String getBucketId(TripEvent event, Context context) {
    return String.format("pickup_location=%03d/year=%04d/month=%02d",
        event.getPickupLocationId(),
        event.getPickupDatetime().getYear(),
        event.getPickupDatetime().getMonthOfYear()
    );
  }

  ...
}

The sink takes an argument for the S3 bucket as a destination path and a function that converts the TripEvent Java objects into a string. See the following code:

SinkFunction<TripEvent> sink = StreamingFileSink
  .forRowFormat(
    new Path("s3://<Bucket name>"),
    (Encoder<TripEvent>) (element, outputStream) -> {
      PrintStream out = new PrintStream(outputStream);
      out.println(TripEventSchema.toJson(element));
    }
  )
  .withBucketAssigner(new TripEventBucketAssigner())
  .withRollingPolicy(DefaultRollingPolicy.create().build())
  .build();

events.keyBy(TripEvent::getPickupLocationId).addSink(sink);

You can further customize the size of the objects you write to Amazon S3 and the frequency of the object creation with a rolling policy. You can configure your policy to have more events aggregated into fewer objects at the cost of increased latency, or vice versa. This can help avoid many small objects on Amazon S3, which can be a desirable trade-off for increased latency. A high number of objects can negatively impact the query performance of consumers reading the data from Amazon S3. For more information, see DefaultRollingPolicy on the Apache Flink website.

The number of output files that arrive in your S3 bucket per your rolling policy also depends on the parallelism of the StreamingFileSink and how you distribute events between Flink application operators. In the previous example, the Flink internal DataStream is partitioned by pickup location ID with the keyBy operator. The location ID is also used in the BucketAssigner as part of the prefix for objects that are written to Amazon S3. Therefore, the same node aggregates and persists all events with the same prefix, which results in particularly large objects on Amazon S3.

Apache Flink uses multipart uploads under the hood when writing to Amazon S3 with the StreamingFileSink. In case of failures, Apache Flink may not be able to clean up incomplete multipart uploads. To avoid unnecessary storage fees, set up the automatic cleanup of incomplete multipart uploads by configuring appropriate lifecycle rules on the S3 bucket. For more information, see Important Considerations for S3 on the Apache Flink website and Example 8: Lifecycle Configuration to Abort Multipart Uploads.

Converting output to Apache Parquet

In addition to partitioning data before delivery to Amazon S3, you may want to compress it with a columnar storage format. Apache Parquet is a popular columnar format, which is well supported in the AWS ecosystem. It reduces the storage footprint and can substantially increase query performance and reduce cost.

The StreamingFileSink supports Apache Parquet and other bulk-encoded formats through a built-in BulkWriter factory. See the following code:

SinkFunction<TripEvent> sink = StreamingFileSink
  .forBulkFormat(
    new Path("s3://<bucket name>"),
    ParquetAvroWriters.forSpecificRecord(TripEvent.class)
  )
  .withBucketAssigner(new TripEventBucketAssigner())
  .build();

events.keyBy(TripEvent::getPickupLocationId).addSink(sink);

For more information, see Bulk-encoded Formats on the Apache Flink website.

Persisting events works a bit differently when you use the Parquet conversion. When you enable Parquet conversion, you can only configure the StreamingFileSink with the OnCheckpointRollingPolicy, which commits completed part files to Amazon S3 only when a checkpoint is triggered. You need to enable Apache Flink checkpoints in your Kinesis Data Analytics application to persist data to Amazon S3. It only becomes visible for consumers when a checkpoint is triggered, so your delivery latency depends on how often your application is checkpointing.

Moreover, you previously just needed to generate a string representation of the data to write to Amazon S3. In contrast, the ParquetAvroWriters expects an Apache Avro schema for events. For more information, see the GitHub repo. You can use and extend the schema on the repo if you are want an example.

In general, it is highly desirable to convert data into Parquet if you want to work with and query the persisted data effectively. Although it requires some additional effort, the benefits of the conversion outweigh these additional complexities compared to storing raw data.

Fanning out to multiple Elasticsearch indexes and custom document IDs

Amazon ES is a fully managed service that makes it easy for you to deploy, secure, and run Elasticsearch clusters. A popular use case is to stream application and network log data into Amazon S3. These logs are documents in Elasticsearch parlance, and you can create one for every event and store it in an Elasticsearch index.

The Elasticsearch sink that Apache Flink provides is flexible and extensible. You can specify an index based on the payload of each event. This is useful when the stream contains different event types and you want to store the respective documents in different Elasticsearch indexes. With this capability, you can use a single sink and application, respectively, to write into multiple indexes. With newer Elasticsearch versions, a single index cannot contain multiple types. See the following code:

SinkFunction<TripEvent> sink = AmazonElasticsearchSink.buildElasticsearchSink(
  "<Elasticsearch endpoint>",
  "<AWS region>",
  new ElasticsearchSinkFunction<TripEvent>() {
   public IndexRequest createIndexRequest(TripEvent element) {
    String type = element.getType().toString();
    String tripId = Long.toString(element.getTripId());

    return Requests.indexRequest()
      .index(type)
      .type(type)
      .id(tripId)
      .source(TripEventSchema.toJson(element), XContentType.JSON);
   }
);

events.addSink(sink);

You can also explicitly set the document ID when you send documents to Elasticsearch. If an event with the same ID is ingested into Elasticsearch multiple times, it is overwritten rather than creating duplicates. This enables your writes to Elasticsearch to be idempotent. In this way, you can obtain exactly-once semantics of the entire architecture, even if your data sources only provide at-least-once semantics.

The AmazonElasticsearchSink used above is an extension of the Elasticsearch sink that is comes with Apache Flink. The sink adds support to sign requests with IAM credentials so you can use the strong IAM-based authentication and authorization that is available from the service. To this end, the sink picks up temporary credentials from the Kinesis Data Analytics environment in which the application is running. It uses the Signature Version 4 method to add authentication information to the request that is sent to the Elasticsearch endpoint.

Leveraging exactly-once semantics

You can obtain exactly-once semantics by combining an idempotent sink with at-least-once semantics, but that is not always feasible. For instance, if you want to replicate data from one Apache Kafka cluster to another or persist transactional CDC data from Apache Kafka to Amazon S3, you may not be able to tolerate duplicates in the destination, but both of these sinks are not idempotent.

Apache Flink natively supports exactly-once semantics. Kinesis Data Analytics implicitly enables exactly-once mode for checkpoints. To obtain end-to-end exactly-once semantics, you need to enable checkpoints for the Kinesis Data Analytics application and choose a connector that supports exactly-once semantics, such as the StreamingFileSink. For more information, see Fault Tolerance Guarantees of Data Sources and Sinks on the Apache Flink website.

There are some side effects to using exactly-once semantics. For example, end-to-end latency increases for several reasons. First, you can only commit the output when a checkpoint is triggered. This is the same as the latency increases that occurred when you turned on Parquet conversion. The default checkpoint interval is 1 minute, which you can decrease. However, obtaining sub-second delivery latencies are difficult with this approach.

Also, the details of end-to-end exactly-once semantics are subtle. Although the Flink application may read in an exactly-once fashion from a data stream, duplicates may already be part of the stream, so you can only obtain at-least-once semantics of the entire application. For Apache Kafka as a source and sink, different caveats apply. For more information, see Caveats on the Apache Flink website.

Be sure that you understand all the details of the entire application stack before you take a hard dependency on exactly-once semantics. In general, if your application can tolerate at-least-once semantics, it’s a good idea to use that semantic instead of relying on stronger semantics that you don’t need.

Using multiple sources and sinks

One Flink application can read data from multiple sources and persist data to multiple destinations. This is interesting for several reasons. First, you can persist the data or different subsets of the data to different destinations. For example, you can use the same application to replicate all events from your on-premises Apache Kafka cluster to an MSK cluster. At the same time, you can deliver specific, valuable events to an Elasticsearch cluster.

Second, you can use multiple sinks to increase the robustness of your application. For example, your application that applies filters and enriches streaming data can also archive the raw data stream. If something goes wrong with your more complex application logics, Amazon S3 still has the raw data, which you can use to backfill the sink.

However, there are some trade-offs. When you bundle many functionalities in a single application, you increase the blast radius of failures. If a single component of the application fails, the entire application fails and you need to recover it from the last checkpoint. This causes some downtime and increased delivery latency to all delivery destinations in the application. Also, a single large application is often harder to maintain and to change. You should strike a balance between adding functionality or creating additional Kinesis Data Analytics applications.

Operational aspects

When you run the architecture in production, you set out to execute a single Flink application continuously and indefinitely. It is crucial to implement monitoring and proper alarming to make sure that the pipeline is working as expected and the processing can keep up with the incoming data. Ideally, the pipeline should adapt to changing throughput conditions and cause notifications if it fails to deliver data from the sources to the destinations.

Some aspects require specific attention from an operational perspective. The following section provides some ideas and further references on how you can increase the robustness of your streaming ETL pipelines.

Monitoring and scaling sources

The data stream and the MSK cluster, respectively, are the entry point to the entire architecture. They decouple the data producers from the rest of the architecture. To avoid any impact to data producers, which you often cannot control directly, you need to scale the input stream of the architecture appropriately and make sure that it can ingest messages at any time.

Kinesis Data Streams uses a throughput provisioning model based on shards. Each shard provides a certain read and write capacity. From the number of provisioned shards, you can derive the maximum throughput of the stream in terms of ingested and emitted events and data volume per second. For more information, see Kinesis Data Streams Quotas.

Kinesis Data Streams exposes metrics through CloudWatch that report on these characteristics and indicate whether the stream is over- or under-provisioned. You can use the IncomingBytes and IncomingRecords metrics to scale the stream proactively, or you can use the WriteProvisionedThroughputExceeded metrics to scale the stream reactively. Similar metrics exist for data egress, which you should also monitor. For more information, see Monitoring the Amazon Kinesis Data Streams with Amazon CloudWatch.

The following graph shows some of these metrics for the data stream of the example architecture. On average the Kinesis data stream receives 2.8 million events and 1.1 GB of data every minute.

You can use the IncomingBytes and IncomingRecords metrics to scale the stream proactively whereas you can use the WriteProvisionedThroughputExceeded metrics to scale the stream reactively. You can even automate the scaling of your Kinesis Data Streams. For more information, see Scale Your Amazon Kinesis Stream Capacity with UpdateShardCount.

Apache Kafka and Amazon MSK use a node-based provisioning model. Amazon MSK also exposes metrics through CloudWatch, including metrics that indicate how much data and how many events are ingested into the cluster. For more information, see Amazon MSK Metrics for Monitoring with CloudWatch.

In addition, you can also enable open monitoring with Prometheus for MSK clusters. It is a bit harder to know the total capacity of the cluster, and you often need benchmarking to know when you should scale. For more information about important metrics to monitor, see Monitoring Kafka on the Confluent website.

Monitoring and scaling the Kinesis Data Analytics application

The Flink application is the central core of the architecture. Kinesis Data Analytics executes it in a managed environment, and you want to make sure that it continuously reads data from the sources and persists data in the data sinks without falling behind or getting stuck.

When the application falls behind, it often is an indicator that it is not scaled appropriately. Two important metrics to track the progress of the application are millisBehindLastest (when the application is reading from a Kinesis data stream) and records-lag-max (when it is reading from Apache Kafka and Amazon MSK). These metrics not only indicate that data is read from the sources, but they also tell if data is read fast enough. If the values of these metrics are continuously growing, the application is continuously falling behind, which may indicate that you need to scale up the Kinesis Data Analytics application. For more information, see Kinesis Data Streams Connector Metrics and Application Metrics.

The following graph shows the metrics for the example application in this post. During checkpointing, the maximum millisBehindLatest metric occasionally spikes up to 7 seconds. However, because the reported average of the metric is less than 1 second and the application immediately catches up to the tip of the stream again, it is not a concern for this architecture.

Although the lag of the application is one of the most important metrics to monitor, there are other relevant metrics that Apache Flink and Kinesis Data Analytics expose. For more information, see Monitoring Apache Flink Applications 101 on the Apache Flink website.

Monitoring sinks

To verify that sinks are receiving data and, depending on the sink type, do not run out of storage, you need to monitor sinks closely.

You can enable detailed metrics for your S3 buckets that track the number of requests and data uploaded into the bucket with 1-minute granularity. For more information, see Monitoring Metrics with Amazon CloudWatch. The following graph shows these metrics for the S3 bucket of the example architecture:

When the architecture persists data into a Kinesis data stream or a Kafka topic, it acts as a producer, so the same recommendations as for monitoring and scaling sources apply. For more information about operating and monitoring the service in production environments, see Amazon Elasticsearch Service Best Practices.

Handling errors

“Failures are a given and everything eventually fails over time”, so you should expect the application to fail at some point. For example, an underlying node of the infrastructure that Kinesis Data Analytics manages might fail, or intermittent timeouts on the network can prevent the application from reading from sources or writing to sinks. When this happens, Kinesis Data Analytics restarts the application and resumes processing by recovering from the latest checkpoint. Because the raw events have been persisted in a data stream or Kafka topic, the application can reread the events that have been persisted in the stream between the last checkpoint and when it recovered and continue standard processing.

These kinds of failures are rare and the application can gracefully recover without sacrificing processing semantics, including exactly-once semantics. However, other failure modes need additional attention and mitigation.

When an exception is thrown anywhere in the application code, for example, in the component that contains the logic for parsing events, the entire application crashes. As before, the application eventually recovers, but if the exception is from a bug in your code that a specific event always hits, it results in an infinite loop. After recovering from the failure, the application rereads the event, because it was not processed successfully before, and crashes again. The process starts again and repeats indefinitely, which effectively blocks the application from making any progress.

Therefore, you want to catch and handle exceptions in the application code to avoid crashing the application. If there is a persistent problem that you cannot resolve programmatically, you can use side outputs to redirect the problematic raw events to a secondary data stream, which you can persist to a dead letter queue or an S3 bucket for later inspection. For more information, see Side Outputs on the Apache Flink website.

When the application is stuck and cannot make any progress, it is at least visible in the metrics for application lag. If your streaming ETL pipeline filters or enriches events, failures may be much more subtle, and you may only notice them long after they have been ingested. For instance, due to a bug in the application, you may accidentally drop important events or corrupt their payload in unintended ways. Kinesis data streams stores events for up to 7 days and, though technically possible, Apache Kafka is often not configured to store events indefinitely either. If you don’t identify the corruption quickly enough, you risk losing information when the retention of the raw events expires.

To protect against this scenario, you can persist the raw events to Amazon S3 before you apply any additional transformations or processing to them. You can keep the raw events and reprocess or replay them into the stream if you need to. To integrate the functionality into the application, add a second sink that just writes to Amazon S3. Alternatively, use a separate application that only reads and persists the raw events from the stream, at the cost of running and paying for an additional application.

When to choose what

AWS provides many services that work with streaming data and can perform streaming ETL. Amazon Kinesis Data Firehose can ingest, process, and persist streaming data into a range of supported destinations. There is a significant overlap of the functionality between Kinesis Data Firehose and the solution in this post, but there are different reasons to use one or the other.

As a rule of thumb, use Kinesis Data Firehose whenever it fits your requirements. The service is built with simplicity and ease of use in mind. To use Kinesis Data Firehose, you just need to configure the service. You can use Kinesis Data Firehose for streaming ETL use cases with no code, no servers, and no ongoing administration. Moreover, Kinesis Data Firehose comes with many built-in capabilities, and its pricing model allows you to only pay for the data processed and delivered. If you don’t ingest data into Kinesis Data Firehose, you pay nothing for the service.

In contrast, the solution in this post requires you to create, build, and deploy a Flink application. Moreover, you need to think about monitoring and how to obtain a robust architecture that is not only tolerant against infrastructure failures but also resilient against application failures and bugs. However, this added complexity unlocks many advanced capabilities, which your use case may require. For more information, see Build and run streaming applications with Apache Flink and Amazon Kinesis Data Analytics for Java Applications and the Amazon Kinesis Data Analytics Developer Guide.

What’s next?

This post discussed how to build a streaming ETL pipeline with Apache Flink and Kinesis Data Analytics. It focused on how to build an extensible solution that addresses some advanced use cases for streaming ingest while maintaining low operational overhead. The solution allows you to quickly enrich, transform, and load your streaming data into your data lake, data store, or another analytical tool without the need for an additional ETL step. The post also explored ways to extend the application with monitoring and error handling.

You should now have a good understanding of how to build streaming ETL pipelines on AWS. You can start capitalizing on your time-sensitive events by using a streaming ETL pipeline that makes valuable information quickly accessible to consumers. You can tailor the format and shape of this information to your use case without adding the substantial latency of traditional batch-based ETL processes.

 


About the Author

Steffen Hausmann is a Specialist Solutions Architect for Analytics at AWS. He works with customers around the globe to design and build streaming architectures so that they can get value from analyzing their streaming data. He holds a doctorate degree in computer science from the University of Munich and in his free time, he tries to lure his daughters into tech with cute stickers he collects at conferences. You can follow his ruthless attempts on Twitter (@sthmmm).

 

 

How Siemens built a fully managed scheduling mechanism for updates on Amazon S3 data lakes

Post Syndicated from Pedro Bento original https://aws.amazon.com/blogs/big-data/how-siemens-built-a-fully-managed-scheduling-mechanism-for-consistent-updates-on-amazon-s3-data-lakes/

Siemens is a global technology leader with more than 370,000 employees and 170 years of experience. To protect Siemens from cybercrime, the Siemens Cyber Defense Center (CDC) continuously monitors Siemens’ networks and assets. To handle the resulting enormous data load, the CDC built a next-generation threat detection and analysis platform called ARGOS. ARGOS is a hybrid-cloud solution that makes heavy use of fully managed AWS services for streaming, big data processing, and machine learning.

Users such as security analysts, data scientists, threat intelligence teams, and incident handlers continuously access data in the ARGOS platform. Further, various automated components update, extend, and remove data to enrich information, improve data quality, enforce PII requirements, or mutate data due to schema evolution or additional data normalization requirements. Keeping the data always available and consistent presents multiple challenges.

While object-based data lakes are highly beneficial from a cost perspective compared to traditional transactional databases in such scenarios, they hardly allow for atomic updates or require highly complex and costly extensions. To overcome this problem, Siemens designed a solution that enables atomic file updates on Amazon S3-based data lakes without compromising query performance and availability.

This post presents this solution, which is an easy-to-use scheduling service for S3 data update tasks. Siemens uses it for multiple purposes, including pseudonymization, anonymization, and removal of sensitive data. This post demonstrates how to use the solution to remove values from a dataset after a predefined amount of time. Adding further data processing tasks is straightforward because the solution has a well-defined architecture and the whole stack consists of fewer than 200 lines of source code. It is solely based on fully managed AWS services and therefore achieves minimal operational overhead.

Architecture overview

This post uses an S3-based data lake with continuous data ingestion and Amazon Athena as query mechanism. The goal is to remove certain values after a predefined time automatically after ingestion. Applications and users consuming the data via Athena are not impacted (for example, they do not observe downtimes or data quality issues like duplication).

The following diagram illustrates the architecture of this solution.

Siemens built the solution with the following services and components:

  1. Scheduling trigger – New data (for example, in JSON format) is continuously uploaded to a S3 bucket.
  2. Task scheduling – As soon as new files land, an AWS Lambda function processes the resulting S3 bucket notification events. As part of the processing, it creates a new item on Amazon DynamoDB that specifies a Time to Live (TTL) and the path to that S3 object.
  3. Task execution trigger – When the TTL expires, the DynamoDB item is deleted from the table and the DynamoDB stream triggers a Lambda function that processes the S3 object at that path.
  4. Task execution – The Lambda function derives meta information (like the relevant S3 path) from the TTL expiration event and processes the S3 object. Finally, the new S3 object replaces the older version.
  5. Data usage – The updated data is available for querying from Athena without further manual processing, and uses S3’s eventual consistency on read operations.

About DynamoDB Streams and TTL

TTL for DynamoDB lets you define when items in a table expire so they can be deleted from the database automatically. TTL comes at no extra cost as a way to reduce storage use and reduce the cost of storing irrelevant data without using provisioned throughput. You can set a timestamp for deletion on a per-item basis, which allows you to limit storage usage to only those records that are relevant, by enabling TTL on a table.

Solution overview

To implement this solution manually, complete the following steps:

  1. Create a DynamoDB table and configure DynamoDB Streams.
  2. Create a Lambda function to insert TTL records.
  3. Configure an S3 event notification on the target bucket.
  4. Create a Lambda function that performs data processing tasks.
  5. Use Athena to query the processed data.

If you want to deploy the solution automatically, you may skip these steps, and use the AWS Cloudformation template provided.

Prerequisites

To complete this walkthrough, you must have the following:

  • An AWS account with access to the AWS Management Console.
  • A role with access to S3, DynamoDB, Lambda, and Athena.

Creating a DynamoDB table and configuring DynamoDB Streams

Start first with the time-based trigger setup. For this, you use S3 notifications, DynamoDB Streams, and a Lambda function to integrate both services. The DynamoDB table stores the items to process after a predefined time.

Complete the following steps:

  1. On the DynamoDB console, create a table.
  2. For Table name, enter objects-to-process.
  3. For Primary key, enter path and choose String.
  4. Select the table and click on Manage TTL next to “Time to live attribute” under table details.
  5. For TTL attribute, enter ttl.
  6. For DynamoDB Streams, choose Enable with view type New and old images.

Note that you can enable DynamoDB TTL on non-numeric attributes, but it only works on numeric attributes.

The DynamoDB TTL is not minute-precise. Expired items are typically deleted within 48 hours of expiration. However, you may experience shorter deviations of only 10–30 minutes from the actual TTL value. For more information, see Time to Live: How It Works.

Creating a Lambda function to insert TTL records

The first Lambda function you create is for scheduling tasks. It receives a S3 notification as input, recreates the S3 path (for example, s3://<bucket>/<key>), and creates a new item on DynamoDB with two attributes: the S3 path and the TTL (in seconds). For more information about a similar S3 notification event structure, see Test the Lambda Function.

To deploy the Lambda function, on the Lambda console, create a function named NotificationFunction with the Python 3.7 runtime and the following code:

import boto3, os, time

# Put here a new parameter for TTL, default 300, 5 minutes
default_ttl = 300

s3_client = boto3.client('s3')
table = boto3.resource('dynamodb').Table('objects-to-process')

def parse_bucket_and_key(s3_notif_event):
    s3_record = s3_notif_event['Records'][0]['s3']
    return s3_record['bucket']['name'], s3_record['object']['key']

def lambda_handler(event, context):
    try:
        bucket_name, key = parse_bucket_and_key(event)
        head_obj = s3_client.head_object(Bucket=bucket_name, Key=key)
        tags = s3_client.get_object_tagging(Bucket=bucket_name, Key=key)
        if(head_obj['ContentLength'] > 0 and len(tags['TagSet']) == 0):
            record_path = f"s3://{bucket_name}/{key}"
            table.put_item(Item={'path': record_path, 'ttl': int(time.time()) + default_ttl})
    except:
        pass # Ignore

Configuring S3 event notifications on the target bucket

You can take advantage of the scalability, security, and performance of S3 by using it as a data lake for storing your datasets. Additionally, you can use S3 event notifications to capture S3-related events, such as the creation or deletion of objects within a bucket. You can forward these events to other AWS services, such as Lambda.

To configure S3 event notifications, complete the following steps:

  1. On the S3 console, create an S3 bucket named data-bucket.
  2. Click on the bucket and go to “Properties” tab.
  3. Under Advanced Settings, choose Events and add a notification.
  4. For Name, enter MyEventNotification.
  5. For Events, select All object create events.
  6. For Prefix, enter dataset/.
  7. For Send to, choose Lambda Function.
  8. For Lambda, choose NotificationFunction.

This configuration restricts the scheduling to events that happen within your previously defined dataset. For more information, see How Do I Enable and Configure Event Notifications for an S3 Bucket?

Creating a Lambda function that performs data processing tasks

You have now created a time-based trigger for the deletion of the record in the DynamoDB table. However, when the system delete occurs and the change is recorded in DynamoDB Streams, no further action is taken. Lambda can poll the stream to detect these change records and trigger a function to process them according to the activity (INSERT, MODIFY, REMOVE).

This post is only concerned with deleted items because it uses the TTL feature of DynamoDB Streams to trigger task executions. Lambda gives you the flexibility to either process the item by itself or to forward the processing effort to somewhere else (such as an AWS Glue job or an Amazon SQS queue).

This post uses Lambda directly to process the S3 objects. The Lambda function performs the following tasks:

  1. Gets the S3 object from the DynamoDB item’s S3 path attribute.
  2. Modifies the object’s data.
  3. Overrides the old S3 object with the updated content and tags the object as processed.

Complete the following steps:

  1. On the Lambda console, create a function named JSONProcessingFunction with Python 3.7 as the runtime and the following code:
    import os, json, boto3
    from functools import partial
    from urllib.parse import urlparse
    
    s3 = boto3.resource('s3')
    
    def parse_bucket_and_key(s3_url_as_string):
        s3_path = urlparse(s3_url_as_string)
        return s3_path.netloc, s3_path.path[1:]
    
    def extract_s3path_from_dynamo_event(event):
        if event["Records"][0]["eventName"] == "REMOVE":
            return event["Records"][0]["dynamodb"]["Keys"]["path"]["S"]
    
    def modify_json(json_dict, column_name, value):
        json_dict[column_name] = value
        return json_dict
        
    def get_obj_contents(bucketname, key):
        obj = s3.Object(bucketname, key)
        return obj.get()['Body'].iter_lines()
    
    clean_column_2_func = partial(modify_json, column_name="file_contents", value="")
    
    def lambda_handler(event, context):
        s3_url_as_string = extract_s3path_from_dynamo_event(event)
        if s3_url_as_string:
            bucket_name, key = parse_bucket_and_key(s3_url_as_string)
            updated_json = "\n".join(map(json.dumps, map(clean_column_2_func, map(json.loads, get_obj_contents(bucket_name, key)))))
            s3.Object(bucket_name, key).put(Body=updated_json, Tagging="PROCESSED=True")
        else:
            print(f"Invalid event: {str(event)}")

  2. On the Lambda function configuration webpage, click on Add trigger.
  3. For Trigger configuration, choose DynamoDB.
  4. For DynamoDB table, choose objects-to-process.
  5. For Batch size, enter 1.
  6. For Batch window, enter 0.
  7. For Starting position, choose Trim horizon.
  8. Select Enable trigger.

You use batch size = 1 because each S3 object represented on the DynamoDB table is typically large. If these files are small, you can use a larger batch size. The batch size is essentially the number of files that your Lambda function processes at a time.

Because any new objects on S3 (in a versioning-enabled bucket) create an object creation event, even if its key already exists, you must make sure that your task schedule Lambda function ignores any object creation events that your task execution function creates. Otherwise, it creates an infinite loop. This post uses tags on S3 objects: when the task execution function processes an object, it adds a processed tag. The task scheduling function ignores those objects in subsequent executions.

Using Athena to query the processed data

The final step is to create a table for Athena to query the data. You can do this manually or by using an AWS Glue crawler that infers the schema directly from the data and automatically creates the table for you. This post uses a crawler because it can handle schema changes and add new partitions automatically. To create this crawler, use the following code:

aws glue create-crawler --name data-crawler \ 
--role <AWSGlueServiceRole-crawler> \
--database-name data_db \
--description 'crawl data bucket!' \
--targets \
"{\
  \"S3Targets\": [\
    {\
      \"Path\": \"s3://<data-bucket>/dataset/\"\
    }\
  ]\
}"

Replace <AWSGlueServiceRole-crawler> and <data-bucket> with the name of your AWSGlueServiceRole and S3 bucket, respectively.

When the crawling process is complete, you can start querying the data. You can use the Athena console to interact with the table while its underlying data is being transparently updated. See the following code:

SELECT * FROM data_db.dataset LIMIT 1000

Automated setup

You can use the following AWS CloudFormation template to create the solution described on this post on your AWS account. To launch the template, choose the following link:

This CloudFormation stack requires the following parameters:

  • Stack name – A meaningful name for the stack, for example, data-updater-solution.
  • Bucket name – The name of the S3 bucket to use for the solution. The stack creation process creates this bucket.
  • Time to Live – The number of seconds to expire items on the DynamoDB table. Referenced S3 objects are processed on item expiration.

Stack creation takes up to a few minutes. Check and refresh the AWS CloudFormation Resources tab to monitor the process while it is running.

When the stack shows the state CREATE_COMPLETE, you can start using the solution.

Testing the solution

To test the solution, download the mock_uploaded_data.json dataset created with the Mockaroo data generator. The use case is a web service in which users can upload files. The goal is to delete those files some predefined time after the upload to reduce storage and query costs. To this end, the provided code looks for the attribute file_contents and replaces its value with an empty string.

You can now upload new data into your data-bucket S3 bucket under the dataset/ prefix. Your NotificationFunction Lambda function processes the resulting bucket notification event for the upload, and a new item appears on your DynamoDB table. Shortly after the predefined TTL time, the JSONProcessingFunction Lambda function processes the data and you can check the resulting changes via an Athena query.

You can also confirm that a S3 object was processed successfully if the DynamoDB item corresponding to this S3 object is no longer present in the DynamoDB table and the S3 object has the processed tag.

Conclusion

This post showed how to automatically re-process objects on S3 after a predefined amount of time by using a simple and fully managed scheduling mechanism. Because you use S3 for storage, you automatically benefit from S3’s eventual consistency model, simply by using identical keys (names) both for the original and processed objects. This way, you avoid query results with duplicate or missing data. Also, incomplete or only partially uploaded objects do not result in data inconsistencies because S3 only creates new object versions for successfully completed file transfers.

You may have previously used Spark to process objects hourly. This requires you to monitor objects that must be processed, to move and process them in a staging area, and to move them back to their actual destination. The main drawback is the final step because, due to Spark’s parallelism nature, files are generated with different names and contents. That prevents direct file replacement in the dataset and leads to downtimes or potential data duplicates when data is queried during a move operation. Additionally, because each copy/delete operation could potentially fail, you have to deal with possible partially processed data manually.

From an operations perspective, AWS serverless services simplify your infrastructure. You can combine the scalability of these services with a pay-as-you-go plan to start with a low-cost POC and scale to production quickly—all with a minimal code base.

Compared to hourly Spark jobs, you could potentially reduce costs by up to 80%, which makes this solution both cheaper and simpler.

Special thanks to Karl Fuchs, Stefan Schmidt, Carlos Rodrigues, João Neves, Eduardo Dixo and Marco Henriques for their valuable feedback on this post’s content.

 


About the Authors

Pedro Completo Bento is a senior big data engineer working at Siemens CDC. He holds a Master in Computer Science from the Instituto Superior Técnico in Lisbon. He started his career as a full-stack developer, specializing later on big data challenges. Working with AWS, he builds highly reliable, performant and scalable systems on the cloud, while keeping the costs at bay. In his free time, he enjoys to play boardgames with his friends.

 

 

Arturo Bayo is a big data consultant at Amazon Web Services. He promotes a data-driven culture in enterprise customers around EMEA, providing specialized guidance on business intelligence and data lake projects while working with AWS customers and partners to build innovative solutions around data and analytics.

 

 

 

 

Load data incrementally and optimized Parquet writer with AWS Glue

Post Syndicated from Mohit Saxena original https://aws.amazon.com/blogs/big-data/load-data-incrementally-and-optimized-parquet-writer-with-aws-glue/

AWS Glue provides a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs. The first post of the series, Best practices to scale Apache Spark jobs and partition data with AWS Glue, discusses best practices to help developers of Apache Spark applications and Glue ETL jobs, big data architects, data engineers, and business analysts scale their data processing jobs running on AWS Glue automatically.

This post shows how to incrementally load data from data sources in an Amazon S3 data lake and databases using JDBC. It also shows how to scale AWS Glue ETL jobs by reading only newly added data using job bookmarks, and processing late-arriving data by resetting the job bookmark to the end of a prior job run. The post also reviews best practices using job bookmarks with complex AWS Glue ETL scripts and workloads.

Finally, the post shows how to use the custom AWS Glue Parquet writer optimized for performance by avoiding extra passes of the data and computing the schema at runtime. The AWS Glue Parquet writer also allows schema evolution in datasets with the addition or deletion of columns.

AWS Glue job bookmarks

AWS Glue’s Spark runtime has a mechanism to store state. This mechanism is used to track data processed by a particular run of an ETL job. The persisted state information is called job bookmark.

The snapshot above shows a view of the Glue Console with multiple job runs at different time instances of the same ETL job. Job bookmarks are used by AWS Glue jobs to process incremental data since the last job run. A job bookmark is composed of the states of various job elements, such as sources, transformations, and targets. For example, your AWS Glue job might read new partitions in an S3-backed table. AWS Glue tracks the partitions that the job has processed successfully to prevent duplicate processing and writing the same data to the target data store multiple times.

Job bookmark APIs

When using the AWS Glue console or the AWS Glue API to start a job, a job bookmark option is passed as a parameter.

There are three possible options:

  • Enable – This option causes the job to update the bookmark state after each successful run to keep track of processed data. Subsequent job run on the same data source only process newly added data since the last checkpoint.
  • Disable – Makes sure that job bookmarks are not used that can result in the job always processing the entire dataset. This is the default option.
  • Pause – Reads the state information and processes incremental data since the last checkpoint, but does not update it. You can use this option so that every subsequent run processes data from the same point in time.

In all cases, you are responsible for managing the output from previous job. For more information, see the first post in this series, Best practices to scale Apache Spark jobs and partition data with AWS Glue. For details about the parameters passed to a job, and specifically for a job bookmark, see Special Parameters Used by AWS Glue.

The following code example shows how to use job bookmarks in a Glue ETL job that reads from a AWS Glue table backed by a Amazon S3 location. The job receives new files from a Kinesis Firehose event stream in JSON format, transforms to rename two columns, converts and writes it out to Amazon Redshift.  transformation_ctx is the identifier for the job bookmark associated with this data source. For proper operation, you need job.init and job.commit to initialize and persist the bookmark state.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session

job = Job(glueContext)
job.init(args['JOB_NAME'], args)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "firehose_s3_db",
                table_name = "firehose_s3_raw_table",
                transformation_ctx = "datasource0")
applymapping = ApplyMapping.apply(frame = datasource0, 
                mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")],
                transformation_ctx = "applymapping1")

glueContext.write_dynamic_frame.from_jdbc_conf(frame = applymapping, catalog_connection = "redshift", connection_options = {"dbtable": "name", "database": "kinesis_db"}, redshift_tmp_dir= "s3://redshift_tmp_dir_path")

job.commit()

When using the APIs or CLI to start a job run, you need to add the following arguments to enable the job bookmark:

Job Arguments :

--job-bookmark-option, job-bookmark-enable
--JOB_NAME, glue-job-incremental

For S3 input sources, AWS Glue job bookmarks check the last modified time of the objects to verify which objects to reprocess. If there are new files arriving from Kinesis firehose, or existing files changed, since your last job run, the files are reprocessed when the job is run again using a periodic Glue job trigger or S3 trigger notification.

If you intend to reprocess all the data using the same job, reset the job bookmark. To reset the job bookmark state, use the AWS Glue console, the ResetJobBookmark Action (Python: reset_job_bookmark) API operation, or the AWS CLI. For example, enter the following command using the AWS CLI:

aws glue reset-job-bookmark --job-name my-job-name

You can also use the ResetJobBookmark API to a specific point for scheduled job runs by passing in the job run ID. It resets the state of the job bookmark to that of after the job run ID when it is complete. This functionality is similar to time travel; for example, you can now reprocess input data from a time in the past and use a different set of transformations in your ETL script or downstream jobs orchestrated with AWS Glue workflows in the ETL pipeline. From the AWS Glue Console, you can use the Rewind job bookmark option to reset the job bookmark state to the commit of a previous job run.

AWS Glue keeps track of bookmarks for each job. If you delete a job, you also delete the job bookmark. Popular S3-based storage formats, including JSON, CSV, Apache Avro, XML, and JDBC sources, support job bookmarks. Starting with AWS Glue version 1.0, columnar storage formats such as Apache Parquet and ORC are also supported.

Best practices 1: Development with job bookmarks

In some cases, you might enable AWS Glue job bookmarks but your AWS Glue job reprocesses data that it already processed in an earlier run. That could happen because of the following reasons:

  • Missing job commit – The job.commit() statement at the end of your AWS Glue ETL script updates the state of the job bookmark. If you don’t include it, the job reprocesses both the previously processed and new files. Make sure that the job commit statement is executed by all possible code paths in your user script leading to job completion.
  • Missing transformation context – Transformation context is an optional parameter in the GlueContext However, job bookmarks need it to function correctly. Confirm that you include the transformation context parameter when creating the DynamicFrame. See the following code example:
    sample_dynF=glueContext.create_dynamic_frame_from_catalog(database, 
                table_name,
                transformation_ctx="sample_dynF") 

  • JDBC sources – Job bookmarks require source tables to either have a primary key column[s] or a column[s] with incrementing values, which need to be specified in the source options, when you access relational databases using a JDBC connection. Job bookmarks can capture only newly added rows. This behavior does not apply to source tables stored on S3.
  • Last modified time – To identify which files stored on S3 to process, job bookmarks check the last modified time of the objects, not the file names. If your input objects changed since the last time the job ran, then they are reprocessed when the job runs again.

Best practices 2: Monitoring job bookmarks

There are three ways to inspect the behavior of job bookmarks for any job run:

  • File list store in tmp directory – All AWS Glue ETL jobs running Apache Spark and using DynamicFrames to read data output a manifest file containing a list of processed files per path. The manifest file is stored in the temporary location specified with the job. The path of the file is :<temporary location>/partitionlisting/<job name>/<run id>/<source transformation_ctx>.input-files.jsonThis file captures the list of files read for the corresponding data source regardless of any enabled job bookmarks.
  • Job metrics – You can use the AWS Glue job metrics to inspect the S3 read and write operations and track the number of bytes read by the job using bookmarks. You can also track the data a job reads across its multiple runs in the AWS Glue console. For more information, see Monitoring the Progress of Multiple Jobs.
  • Glue job logs – An AWS Glue job also emits logs in the Spark driver log stream related to processing and skipping of partitions in S3. The logs are stored in Amazon CloudWatch.

Skipping partitions

A job skips partition when it is empty or the creation timestamp of that particular partition in the AWS Glue Data Catalog is older than the timestamp of the last job run as captured by the job bookmark. The following example log message indicates the skipped partition:

19/05/21 14:49:22 WARN HadoopDataSource: Skipping Partition
{"year": "2019", "month": "03", "day": "26", "hour": "13"}
has no new files detected 
@ s3://input-s3-prefix/Year=2019/Month=03/Day=26/Hour=13/ 
or path does not exist

Processing partitions

When a job finds a new S3 partition created after the last job run or that has new files to process, it generates a log message. Log messages also indicate the percentage of the total number of files in the particular partition. The initial and final job bookmark filters of the current job run process these files. The following example illustrates the job bookmark filtering logic.

If the partition is new (created after the most recent job run based on the partition creation time), then the job processes all of the files in the partition. The partition creation time is 1559235148000, which is after the last job run. See the following example log message:

19/05/31 10:39:55 INFO PartitionFilesListerUsingBookmark:
Found new partition DynamicFramePartition([email protected],
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000) 
with 47 files

An existing partition triggers the first bookmark filter. This filter selects files with modification timestamps since the last job run. In the following example log message, 15 out of 47 files in the partition are new and should be processed:

19/05/31 10:40:31 INFO PartitionFilesListerUsingBookmark:
After initial job bookmarks filter, 
processing 31.91% of 47 files 
in partition DynamicFramePartition([email protected],
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000)

The final bookmark filter performs additional filtering to avoid race conditions related to S3’s eventual consistency. If a significantly large number of files arrive with the same modification time, this filter may exclude them from processing. In the following example log message, the filter processed all 15 files captured by the initial bookmark filter:

19/05/31 10:50:31 INFO PartitionFilesListerUsingBookmark:
After final job bookmarks filter, processing 100.00% of 15 files 
in partition DynamicFramePartition([email protected],
s3://input-s3-prefix/Year=2018/Month=12/Day=05/Hour=13/,
1559235148000)

Optimized Apache Parquet writer

AWS Glue offers an optimized Apache Parquet writer when using DynamicFrames to improve performance. Apache Parquet format is generally faster for reads than writes because of its columnar storage layout and a pre-computed schema that is written with the data into the files. AWS Glue’s Parquet writer offers fast write performance and flexibility to handle evolving datasets. Unlike the default Apache Spark Parquet writer, it does not require a pre-computed schema or schema that is inferred by performing an extra scan of the input dataset.

You can enable the AWS Glue Parquet writer by setting the format parameter of the write_dynamic_frame.from_options function to glueparquet. As data is streamed through an AWS Glue job for writing to S3, the optimized writer computes and merges the schema dynamically at runtime, which results in faster job runtimes. The AWS Glue Parquet writer also enables schema evolution by supporting the deletion and addition of new columns.

You can tune the AWS Glue Parquet writer further by setting the format_options parameters. See the following code example:

block_size = 128*1024*1024
page_size = 1024*1024
glueContext.write_dynamic_frame.from_options(frame = dyFrame, 
connection_type = "s3", connection_options = {"path": output_dir}, 
format = "glueparquet", 
format_options = {"compression": "snappy", 
                  blockSize = block_size, pageSize = page_size})

The default values for format_options are the following:

  • compression is “snappy”
  • blockSize is 128 MB
  • pageSize is 1 MB

The blockSize specifies the size of a row group in a Parquet file that is buffered in memory. The pageSize specifies the size of the smallest unit in a Parquet file that must be read fully to access a single record.

Conclusion

This post discussed how AWS Glue job bookmarks help incrementally process data collected from S3 and relational databases. You also learned how using job bookmarks can make backfilling historical data simple. Interacting with job bookmarks is easy; you can enable, disable, pause, and rewind them to a prior point in time. You can better tune your jobs and build checks to make sure all of the data is processed correctly by monitoring the progress and state of job bookmarks.

You can also use the AWS Glue Parquet writer to optimize the performance of writing Apache Parquet files in your data lake. The optimized writer enables schema evolution for Parquet files so you can manage changes to your data automatically.

We hope you try out these features to load and write your data in your Apache Spark applications on AWS Glue.

The third post in this series discusses how AWS Glue’s automatic code generation capability enables you to process and transform complex datasets with ease. The post also shows how to execute SQL queries on your datasets directly from your AWS Glue ETL script, and how to schedule and orchestrate data pipelines with AWS Glue workflows.

 


About the Authors

Mohit Saxena is a technical lead manager at AWS Glue. His passion is building scalable distributed systems for efficiently managing data on cloud. He also enjoys watching movies, and reading about the latest technology.

 

 

 

Bijay Bisht is a senior software development engineer at AWS.

 

 

Build machine learning-powered business intelligence analyses using Amazon QuickSight

Post Syndicated from Osemeke Isibor original https://aws.amazon.com/blogs/big-data/build-machine-learning-powered-business-intelligence-analyses-using-amazon-quicksight/

Imagine you can see the future—to know how many customers will order your product months ahead of time so you can make adequate provisions, or to know how many of your employees will leave your organization several months in advance so you can take preemptive actions to encourage staff retention. For an organization that sees the future, the possibilities are limitless. Machine learning (ML) makes it possible to predict the future with a higher degree of accuracy.

Amazon SageMaker provides every developer and data scientist the ability to build, train, and deploy ML models quickly, but for business users who usually work on creating business intelligence dashboards and reports rather than ML models, Amazon QuickSight is the service of choice. With Amazon QuickSight, you can still use ML to forecast the future. This post goes through how to create business intelligence analyses that use ML to forecast future data points and detect anomalies in data, with no technical expertise or ML experience needed.

Overview of solution

Amazon QuickSight ML Insights uses AWS-proven ML and natural language capabilities to help you gain deeper insights from your data. These powerful, out-of-the-box features make it easy to discover hidden trends and outliers, identify key business drivers, and perform powerful what-if analysis and forecasting with no technical or ML experience. You can use ML insights in sales reporting, web analytics, financial planning, and more. You can detect insights buried in aggregates, perform interactive what-if analysis, and discover what activities you need to meet business goals.

This post imports data from Amazon S3 into Amazon QuickSight and creates ML-powered analyses with the imported data. The following diagram illustrates this architecture.

Walkthrough

In this walkthrough, you create an Amazon QuickSight analysis that contains ML-powered visuals that forecast the future demand for taxis in New York City. You also generate ML-powered insights to detect anomalies in your data. This post uses the New York City Taxi and Limousine Commission (TLC) Trip Record Data on the Registry of Open Data on AWS.

The walkthrough includes the following steps:

  1. Set up and import data into Amazon QuickSight
  2. Create an ML-powered visual to forecast the future demand for taxis
  3. Generate an ML-powered insight to detect anomalies in the data set

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • An AWS account
  • Amazon Quicksight Enterprise edition
  • Basic knowledge of AWS

Setting up and importing data into Amazon QuickSight

Set up Amazon Quicksight as an individual user. Complete the following steps:

  1. On the AWS Management Console, in the Region list, select US East (N. Virginia) or any Region of your choice that Amazon QuickSight
  2. Under Analytics, for Services, choose Amazon QuickSight.If you already have an existing Amazon QuickSight account, make sure it is the Enterprise edition; if it is not, upgrade to Enterprise edition. For more information, see Upgrading your Amazon QuickSight Subscription from Standard Edition to Enterprise Edition.If you do not have an existing Amazon QuickSight account, proceed with the setup and make sure you choose Enterprise Edition when setting up the account. For more information, see Setup a Free Standalone User Account in Amazon QuickSight.After you complete the setup, a Welcome Wizard screen appears.
  1. Choose Next on each of the Welcome Wizard screens.
  2. Choose Get Started.

Before you import the data set, make sure that you have at least 3GB of SPICE capacity. For more information, see Managing SPICE Capacity.

Importing the NYC Taxi data set into Amazon QuickSight

The NYC Taxi data set is in an S3 bucket. To import S3 data into Amazon QuickSight, use a manifest file. For more information, see Supported Formats for Amazon S3 Manifest Files. To import your data, complete the following steps:

  1. In a new text file, copy and paste the following code:
    "fileLocations": [
            {
                "URIs": [
              "https://nyc-tlc.s3.amazonaws.com/trip data/green_tripdata_2018-01.csv",
              "https://nyc-tlc.s3.amazonaws.com/trip data/green_tripdata_2018-02.csv",
              "https://nyc-tlc.s3.amazonaws.com/trip data/green_tripdata_2018-03.csv",
              "https://nyc-tlc.s3.amazonaws.com/trip data/green_tripdata_2018-04.csv",
              "https://nyc-tlc.s3.amazonaws.com/trip data/green_tripdata_2018-05.csv",
              "https://nyc-tlc.s3.amazonaws.com/trip data/green_tripdata_2018-06.csv",
              "https://nyc-tlc.s3.amazonaws.com/trip data/green_tripdata_2018-07.csv",
              "https://nyc-tlc.s3.amazonaws.com/trip data/green_tripdata_2018-08.csv", 
              "https://nyc-tlc.s3.amazonaws.com/trip data/green_tripdata_2018-09.csv",
              "https://nyc-tlc.s3.amazonaws.com/trip data/green_tripdata_2018-10.csv",
              "https://nyc-tlc.s3.amazonaws.com/trip data/green_tripdata_2018-11.csv", 
              "https://nyc-tlc.s3.amazonaws.com/trip data/green_tripdata_2018-12.csv"
    
                ]
            }
        ],
        "globalUploadSettings": {
            "textqualifier": "\""
        }
    }

  2. Save the text file as nyc-taxi.json.
  3. On the Amazon QuickSight console, choose New analysis.
  4. Choose New data set.
  5. For data source, choose S3.
  6. Under New S3 data source, for Data source name, enter a name of your choice.
  7. For Upload a manifest file field, select Upload.
  8. Choose the nyc-taxi.json file you created earlier.
  9. Choose Connect.
    The S3 bucket this post uses is a public bucket that contains a public data set and open to the public. When using S3 buckets in your account with Amazon QuickSight, it is highly recommended that the buckets are not open to the public; you need to configure authentication to access your S3 bucket from Amazon QuickSight. For more information about troubleshooting, see I Can’t Connect to Amazon S3.After you choose Connect, the Finish data set creation screen appears.
  10. Choose Visualize.
  11. Wait for the import to complete.

You can see the progress on the top right corner of the screen. When the import is complete, the result shows the number of rows imported successfully and the number of rows skipped.

Creating an ML-powered visual

After you import the data set into Amazon QuickSight SPICE, you can start creating analyses and visuals. Your goal is to create an ML-powered visual to forecast the future demand for taxis. For more information, see Forecasting and Creating What-If Scenarios with Amazon Quicksight.

To create your visual, complete the following steps:

  1. From the Data source details pop screen, choose Visualize.
  2. From the field list, select Lpep_pickup_datetime.
  3. Under Visual types, select the first visual.Amazon QuickSight automatically uses the best visual based on the number and data type of fields you selected. From your selection, Amazon Quicksight displays a line chart visual.From the preceding graph, you can see that the bulk of your data clusters are around December 31, 2017, to January 1, 2019, for the Lpep_pickup_datetime field. There are a few data points with date ranges up to June 2080. These values are incorrect and can impact your ML forecasts.To clean up your data set, filter out the incorrect data using the data in the Lpep_pickup_datetime. This post only uses data in which Lpep_pickup_datetime falls between January 1, 2018, and December 18, 2018, because there is a more consistent amount of data within this date range.
  4. Use the filter menu to create a filter using the Lpep_pickup_datetime
  5. Under Filter type, choose Time range and Between.
  6. For Start date, enter 2018-01-01 00:00.
  7. Select Include start date.
  8. For End date, enter 2018-12-18 00:00.
  9. Select Include end date.
  10. Choose Apply.

The line chart should now contain only data with Lpep_pickup_datetime from January 1, 2018, and December 18, 2018. You can now add the forecast for the next 31 days to the line chart visual.

Adding the forecast to your visual

To add your forecast, complete the following steps:

  1. On the visual, choose the arrow.
  2. From the drop-down menu, choose Add forecast.
  3. Under Forecast properties, for Forecast length, for Periods forward, enter 31.
  4. For Periods backwards, enter 0.
  5. For Prediction interval, leave at the default value 90.
  6. For Seasonality, leave at the default selection Automatic.
  7. Choose Apply.

You now see an orange line on your graph, which is the forecasted pickup quantity per day for the next 31 days after December 18, 2018. You can explore the different dates by hovering your cursor over different points on the forecasted pickup line. For example, hovering your cursor over January 10, 2019, shows that the expected forecasted number of pickups for that day is approximately 22,000. The forecast also provides an upper bound (maximum number of pickups forecasted) of about 26,000 and a lower bound (minimum number of pickups forecasted) of about 18,000.

You can create multiple visuals with forecasts and combine them into a sharable Amazon QuickSight dashboard. For more information, see Working with Dashboards.

Generating an ML-powered insight to detect anomalies

In Amazon QuickSight, you can add insights, autonarratives, and ML-powered anomaly detection to your analyses without ML expertise or knowledge. Amazon QuickSight generates suggested insights and autonarratives automatically, but for ML-powered anomaly detection, you need to perform additional steps. For more information, see Using ML-Powered Anomaly Detection.

This post checks if there are any anomalies in the total fare amount over time from select locations. For example, if the total fare charged for taxi rides is about $1,000 and above from the first pickup location (for example, the airport) for most dates in your data set, an anomaly is when the total fare charged deviates from the standard pattern. Anomalies are not necessarily negative, but rather abnormalities that you can choose to investigate further.

To create an anomaly insight, complete the following steps:

  1. From the top right corner of the analysis creation screen, click the Add drop-down menu and choose Add insight.
  2. On the Computation screen, for Computation type, select Anomaly detection.
  3. Under Fields list, choose the following fields:
  • fare_amount
  • lpep_pickup_datetime
  • PULocationID
  1. Choose Get started.
  2. The Configure anomaly detection
  3. Choose Analyze all combinations of these categories.
  4. Leave the other settings as their default.You can now perform a contribution analysis and discover how the drop-off location contributed to the anomalies. For more information, see Viewing Top Contributors.
  5. Under Contribution analysis, choose DOLocationID.
  6. Choose Save.
  7. Choose Run Now.The anomaly detection can take up to 10 minutes to complete. If it is still running after about 10 minutes, your browser may have timed out. Refresh your browser and you should see the anomalies displayed in the visual.
  8. Choose Explore anomalies.

By default, the anomalies you see are for the last date in your data set. You can explore the anomalies across the entire date range of your data set by choosing SHOW ANOMALIES BY DATE and dragging the slider at the bottom of the visual to display the entire date range from January 1, 2018, to December 30, 2018.

This graph shows that March 21, 2018, has the highest number of anomalies of the fare charged in the entire data set. For example, the total fare amount charged by taxis that picked up passengers from location 74 on March 21, 2018, was 7,181. This is -64% (about 19,728.5) of the total fare charged by taxis for the same pickup location on March 20, 2018. When you explore the anomalies of other pickup locations for that same date, you can see that they all have similar drops in the total fare charged. You can also see the top DOLocationID contributors to these anomalies.

What happened in New York City on March 21, 2018, to cause this drop? A quick online search reveals that New York City experienced a severe weather condition on March 21, 2018.

Publishing your analyses to a dashboard, sharing the dashboard, and setting up email alerts

You can create additional visuals to your analyses, publish the analyses as a dashboard, and share the dashboard with other users. QuickSight anomaly detection allows you to uncover hidden insights in your data by continuously analyzing billions of data points. You can subscribe to receive alerts to your inbox if an anomaly occurs in your business metrics. The email alert also indicates the factors that contribute to these anomalies. This allows you to act immediately on the business metrics that need attention.

From the QuickSight dashboard, you can configure an anomaly alert to be sent to your email with Severity set to High and above and Direction set to Lower than expected. Also make sure to schedule a data refresh so that the anomaly detection runs on your most recent data. For more information, see Refreshing Data.

Cleaning up

To avoid incurring future charges, you need to cancel your Amazon Quicksight subscription.

Conclusion

This post walked you through how to use ML-powered insights with Amazon QuickSight to forecast future data points, detect anomalies, and derive valuable insights from your data without needing prior experience or knowledge of ML. If you want to do more forecasting without ML experience, check out Amazon Forecast.

If you have questions or suggestions, please leave a comment.

 


About the Author

Osemeke Isibor is a partner solutions architect at AWS. He works with AWS Partner Network (APN) partners to design secure, highly available, scalable and cost optimized solutions on AWS. He is a science-fiction enthusiast and a fan of anime.

 

 

 

Analyze your Amazon S3 spend using AWS Glue and Amazon Redshift

Post Syndicated from Shayon Sanyal original https://aws.amazon.com/blogs/big-data/analyze-your-amazon-s3-spend-using-aws-glue-and-amazon-redshift/

The AWS Cost & Usage Report (CUR) tracks your AWS usage and provides estimated charges associated with that usage. You can configure this report to present the data at hourly or daily intervals, and it is updated at least one time per day until it is finalized at the end of the billing period. The Cost & Usage Report is delivered automatically to an Amazon S3 bucket that you specify, and you can download it from there directly. You can also integrate the report into Amazon Redshift, query it with Amazon Athena, or upload it to Amazon QuickSight. For more information, see Query and Visualize AWS Cost and Usage Data Using Amazon Athena and Amazon QuickSight.

This post presents a solution that uses AWS Glue Data Catalog and Amazon Redshift to analyze S3 usage and spend by combining the AWS CUR, S3 inventory reports, and S3 server access logs.

Prerequisites

Before you begin, complete the following prerequisites:

  • You need an S3 bucket for your S3 inventory and server access log data files. For more information, see Create a Bucket and What is Amazon S3?
  • You must have the appropriate IAM permissions for Amazon Redshift to be able to access the S3 buckets – for this post, choose two non-restrictive IAM roles (AmazonS3FullAccess and AWSGlueConsoleFullAccess), but restrict your access accordingly for your own scenarios.

Amazon S3 inventory

Amazon S3 inventory is one of the tools S3 provides to help manage your storage. You can use it to audit and report on the replication and encryption status of your objects for business, compliance, and regulatory needs. Amazon S3 inventory provides comma-separated values (CSV), Apache optimized row columnar (ORC), or Apache Parquet output files that list your objects and their corresponding metadata on a daily or weekly basis for a given S3 bucket.

Amazon S3 server access logs

Server access logging provides detailed records for the requests you make to a bucket. Server access logs are useful for many applications, for example in security and access audits. It can also help you learn about your customer base and understand your S3 bill.

AWS Glue

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it reliably between various data stores. AWS Glue consists of a central metadata repository known as the Data Catalog, a crawler to populate the Data Catalog with tables, an ETL engine that automatically generates Python or Scala code, and a flexible scheduler that handles dependency resolution, job monitoring, and retries. AWS Glue is serverless, so there’s no infrastructure to set up or manage. This post uses AWS Glue to catalog S3 inventory data and server access logs, which makes it available for you to query with Amazon Redshift Spectrum.

Amazon Redshift

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. You can use Amazon Redshift to efficiently query and retrieve structured and semi-structured data from files in S3 without having to load the data into Amazon Redshift native tables. You can create Amazon Redshift external tables by defining the structure for files and registering them as tables in the AWS Glue Data Catalog.

Setting up S3 inventory reports for analysis

This post uses the Parquet file format for its inventory reports and delivers the files daily to S3 buckets. You can select both the frequency of delivery and output file formats under Advanced settings as shown in the screenshot below:

For more information about configuring your S3 inventory, see How Do I Configure Amazon S3 Inventory?

The following diagram shows the data flow for this solution:

Below steps summarize the data flow diagram represented above:

  • S3 Inventory Reports are delivered to an S3 bucket that you configure.
  • The AWS Glue crawler then crawls this S3 bucket and populates the metadata in the AWS Glue Data Catalog.
  • The AWS Glue Data Catalog is then accessible through an external schema in Redshift.
  • The S3 Inventory Reports (available in the AWS Glue Data Catalog) and the Cost and Usage Reports (available in another S3 bucket) are now ready to be joined and queried for analysis.

The inventory reports are delivered to an S3 bucket. The following screenshot shows the S3 bucket structure for the S3 inventory reports:

There is a data folder in this bucket. This folder contains the Parquet data you want to analyze. The following screenshot shows the content of the folder.

Because these are daily files, there is one file per day.

Configuring an AWS Glue crawler

You can use an AWS Glue crawler to discover this dataset in your S3 bucket and create the table schemas in the Data Catalog. After you create these tables, you can query them directly from Amazon Redshift.

To configure your crawler to read S3 inventory files from your S3 bucket, complete the following steps:

  1. Choose a crawler name.
  2. Choose S3 as the data store and specify the S3 path up to the data
  3. Choose an IAM role to read data from S3 – AmazonS3FullAccess and AWSGlueConsoleFullAccess.
  4. Set a frequency schedule for the crawler to run.
  5. Configure the crawler’s output by selecting a database and adding a prefix (if any).

This post uses the database s3spendanalysis.

The following screenshot shows the completed crawler configuration.

Run this crawler to add tables to your Glue Data Catalog. After the crawler has completed successfully, go to the Tables section on your AWS Glue console to verify the table details and table metadata. The following screenshot shows the table details and table metadata after your AWS Glue crawler has completed successfully:

Creating an external schema

Before you can query the S3 inventory reports, you need to create an external schema (and subsequently, external tables) in Amazon Redshift. An Amazon Redshift external schema references an external database in an external data catalog. Because you are using an AWS Glue Data Catalog as your external catalog, after you create an external schema in Amazon Redshift, you can see all the external tables in your Data Catalog in Amazon Redshift. To create the external schema, enter the following code:

create external schema spectrum_schema from data catalog
database 's3spendanalysis'
iam_role 'arn:aws:iam::<AWS_IAM_ROLE>';

Querying the table

On the Amazon Redshift dashboard, under Query editor, you can see the data table. You can also query the svv_external_schemas system table to verify that your external schema has been created successfully. See the following screenshot.

You can now query the S3 inventory reports directly from Amazon Redshift without having to move the data into Amazon Redshift first. The following screenshot shows how to do this using the Query Editor in the Amazon Redshift console:

Setting up S3 server access logs for analysis

The following diagram shows the data flow for this solution.

Below steps summarize the data flow diagram represented above:

  • S3 Server Access Logs are delivered to an S3 bucket that you configure.
  • These server access logs are then directly accessible to be queried from Amazon Redshift (note that we’ll be using CREATE EXTERNAL TABLE in Redshift Spectrum for this purpose, explained below).
  • The S3 Server Access Logs and the Cost and Usage Reports (available in another S3 bucket) are now ready to be joined and queried for analysis.

The S3 server access logs are delivered to an S3 bucket. For more information about setting up server access logging, see Amazon S3 Server Access Logging.

The following screenshot shows the S3 bucket structure for the server access logs.

The server access log files consist of a sequence of new-line delimited log records. Each log record represents one request and consists of space-delimited fields. The following code is an example log record:

b8ad5f5cfd3c09418536b47b157851fb7bea4a00486471093a7d765e35a4f8ef s3spendanalysisblog [23/Sep/2018:22:10:52 +0000] 72.21.196.65 arn:aws:iam::<AWS Account #>:user/shayons D5633DAD1063C5CA REST.GET.LIFECYCLE - "GET /s3spendanalysisblog?lifecycle= HTTP/1.1" 404 NoSuchLifecycleConfiguration 332 - 105 - "-" "S3Console/0.4, aws-internal/3 aws-sdk-java/1.11.408 Linux/4.9.119-0.1.ac.277.71.329.metal1.x86_64 OpenJDK_64-Bit_Server_VM/25.181-b13 java/1.8.0_181" -

Creating an external table

You can define the S3 server access logs as an external table. Because you already have an external schema, create an external table using the following code. This post uses RegEx SerDe to create a table that allows you to correctly parse all the fields present in the S3 server access logs. See the following code:

CREATE EXTERNAL TABLE spectrum_schema.s3accesslogs(
BucketOwner                   varchar(256), 
Bucket                        varchar(256), 
RequestDateTime               varchar(256), 
RemoteIP                      varchar(256), 
Requester                     varchar(256), 
RequestID                     varchar(256), 
Operation                     varchar(256), 
Key                           varchar(256), 
RequestURI_operation          varchar(256),
RequestURI_key                varchar(256),
RequestURI_httpProtoversion   varchar(256),
HTTPstatus                    varchar(256), 
ErrorCode                     varchar(256), 
BytesSent                     varchar(256), 
ObjectSize                    varchar(256), 
TotalTime                     varchar(256), 
TurnAroundTime                varchar(256), 
Referrer                      varchar(256), 
UserAgent                     varchar(256), 
VersionId                     varchar(256))
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  'input.regex' = '([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) \"([^ ]*) ([^ ]*) ([^ ]*)\" (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\") ([^ ]*)'
  )
STORED AS TEXTFILE
LOCATION
  's3://s3spendanalysisblog/accesslogs/';

Validating the data

You can validate the external table data in Amazon Redshift. The following screenshot shows how to do this using the Query Editor in the Amazon Redshift console:

You are now ready to analyze the data.

Analyzing the data using Amazon Redshift

In this post, you have a CUR file per day in your S3 bucket. The files themselves are organized in a monthly hierarchy. See the following screenshot.

Each day’s file consists of the following files for CUR data:

  • myCURReport-1.csv.gz – A zipped file of the data itself
  • myCURReport-Manifest.json – A JSON file that contains the metadata for the file
  • myCURReport-RedshiftCommands.sql – Amazon Redshift table creation scripts and a COPY command to create the CUR table from a Redshift manifest file
  • myCURReport-RedshiftManifest.json – The Amazon Redshift manifest file to create the CUR table

Using Amazon Redshift is one of the many ways to carry out this analysis. Amazon Redshift is a fast, fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing Business Intelligence (BI) tools. Amazon Redshift gives you fast querying capabilities over structured data using familiar SQL-based clients and BI tools using standard ODBC and JDBC connections. Queries are distributed and parallelized across multiple physical resources.

You are now ready to run SQL queries with the Amazon Redshift SQL Query Editor. This post also uses the psql client tool, a terminal-based front end from PostgreSQL, to query the data in the cluster.

To query the data, complete the following steps:

  1. Create a custom schema to contain your tables for analysis. See the following code:
    create schema if not exists redshift_schema;

    You should create your table in a schema other than public to control user access to database objects.

  2. Create a CUR table for the latest month in Amazon Redshift using the CUR SQL file in S3. See the following code:
    create table redshift_schema.AWSBilling201910 (
    identity_LineItemId VARCHAR(256),
    identity_TimeInterval VARCHAR(100),
    bill_InvoiceId VARCHAR(100),
    bill_BillingEntity VARCHAR(10),
    bill_BillType VARCHAR(100),
    bill_PayerAccountId VARCHAR(100),
    bill_BillingPeriodStartDate TIMESTAMPTZ,
    bill_BillingPeriodEndDate TIMESTAMPTZ,
    lineItem_UsageAccountId VARCHAR(100),
    lineItem_LineItemType VARCHAR(100),
    lineItem_UsageStartDate TIMESTAMPTZ,
    lineItem_UsageEndDate TIMESTAMPTZ,
    lineItem_ProductCode VARCHAR(100),
    lineItem_UsageType VARCHAR(100),
    lineItem_Operation VARCHAR(100),
    lineItem_AvailabilityZone VARCHAR(100),
    lineItem_ResourceId VARCHAR(256),
    lineItem_UsageAmount DECIMAL(11,2),
    lineItem_NormalizationFactor VARCHAR(10),
    lineItem_NormalizedUsageAmount DECIMAL(11,2),
    lineItem_CurrencyCode VARCHAR(10),
    lineItem_UnblendedRate DECIMAL(11,2),
    lineItem_UnblendedCost DECIMAL(11,2),
    lineItem_BlendedRate DECIMAL(11,2),
    lineItem_BlendedCost DECIMAL(11,2),
    lineItem_LineItemDescription VARCHAR(100),
    lineItem_TaxType VARCHAR(100),
    lineItem_LegalEntity VARCHAR(100),
    product_ProductName VARCHAR(100),
    product_alarmType VARCHAR(100),
    product_automaticLabel VARCHAR(100),
    product_availability VARCHAR(100),
    product_availabilityZone VARCHAR(100),
    product_clockSpeed VARCHAR(100),
    product_currentGeneration VARCHAR(100),
    product_databaseEngine VARCHAR(100),
    product_dedicatedEbsThroughput VARCHAR(100),
    product_deploymentOption VARCHAR(100),
    product_durability VARCHAR(100),
    product_ecu VARCHAR(100),
    product_edition VARCHAR(100),
    product_engineCode VARCHAR(100),
    product_enhancedNetworkingSupported VARCHAR(100),
    product_eventType VARCHAR(100),
    product_feeCode VARCHAR(100),
    product_feeDescription VARCHAR(100),
    product_fromLocation VARCHAR(100),
    product_fromLocationType VARCHAR(100),
    product_gpu VARCHAR(100),
    product_gpuMemory VARCHAR(100),
    product_group VARCHAR(100),
    product_groupDescription VARCHAR(100),
    product_instanceFamily VARCHAR(100),
    product_instanceType VARCHAR(100),
    product_instanceTypeFamily VARCHAR(100),
    product_io VARCHAR(100),
    product_labelingTaskType VARCHAR(100),
    product_licenseModel VARCHAR(100),
    product_location VARCHAR(100),
    product_locationType VARCHAR(100),
    product_maxThroughputvolume VARCHAR(100),
    product_maxVolumeSize VARCHAR(100),
    product_memory VARCHAR(100),
    product_messageDeliveryFrequency VARCHAR(100),
    product_messageDeliveryOrder VARCHAR(100),
    product_minVolumeSize VARCHAR(100),
    product_networkPerformance VARCHAR(100),
    product_normalizationSizeFactor VARCHAR(100),
    product_operation VARCHAR(100),
    product_physicalCpu VARCHAR(100),
    product_physicalGpu VARCHAR(100),
    product_physicalProcessor VARCHAR(100),
    product_processorArchitecture VARCHAR(100),
    product_processorFeatures VARCHAR(100),
    product_productFamily VARCHAR(100),
    product_protocol VARCHAR(100),
    product_queueType VARCHAR(100),
    product_region VARCHAR(100),
    product_servicecode VARCHAR(100),
    product_servicename VARCHAR(100),
    product_sku VARCHAR(100),
    product_storage VARCHAR(100),
    product_storageClass VARCHAR(100),
    product_storageMedia VARCHAR(100),
    product_subscriptionType VARCHAR(100),
    product_toLocation VARCHAR(100),
    product_toLocationType VARCHAR(100),
    product_transferType VARCHAR(100),
    product_usageFamily VARCHAR(100),
    product_usagetype VARCHAR(100),
    product_vcpu VARCHAR(100),
    product_version VARCHAR(100),
    product_volumeType VARCHAR(100),
    product_workforceType VARCHAR(100),
    pricing_RateId VARCHAR(100),
    pricing_publicOnDemandCost DECIMAL(11,2),
    pricing_publicOnDemandRate DECIMAL(11,2),
    pricing_term VARCHAR(100),
    pricing_unit VARCHAR(100),
    reservation_AmortizedUpfrontCostForUsage DECIMAL(11,2),
    reservation_AmortizedUpfrontFeeForBillingPeriod DECIMAL(11,2),
    reservation_EffectiveCost DECIMAL(11,2),
    reservation_EndTime TIMESTAMPTZ,
    reservation_ModificationStatus VARCHAR(100),
    reservation_NormalizedUnitsPerReservation BIGINT,
    reservation_RecurringFeeForUsage DECIMAL(11,2),
    reservation_StartTime TIMESTAMPTZ,
    reservation_SubscriptionId VARCHAR(100),
    reservation_TotalReservedNormalizedUnits BIGINT,
    reservation_TotalReservedUnits BIGINT,
    reservation_UnitsPerReservation BIGINT,
    reservation_UnusedAmortizedUpfrontFeeForBillingPeriod DECIMAL(11,2),
    reservation_UnusedNormalizedUnitQuantity BIGINT,
    reservation_UnusedQuantity BIGINT,
    reservation_UnusedRecurringFee DECIMAL(11,2),
    reservation_UpfrontValue BIGINT
    );

  3. Load the data into Amazon Redshift for the latest month, using the provided CUR Manifest file. See the following code:
    copy AWSBilling201910 from 's3://ss-cur//myCURReport/20191001-20191101/fd76beee-0709-42d5-bcb2-bb45f8ba1aae/myCURReport-RedshiftManifest.json'
    credentials 'arn:aws:iam::<AWS_IAM_ROLE>'
    GZIP CSV IGNOREHEADER 1 TIMEFORMAT 'auto' manifest;

  4. Validate the data loaded in the Amazon Redshift table. See the following code:
    select * from AWSBilling201910
    where lineItem_ProductCode = 'AmazonS3'
    and lineItem_ResourceId = 's3spendanalysisblog' limit 10;

    The following screenshot shows that data has been loaded correctly in the Amazon Redshift table:

Managing database security

You can manage database security in Amazon Redshift by controlling which users have access to which database objects. To make sure your objects are secure, create two groups: FINANCE and ADMIN, with two users in FINANCE and one user in ADMIN. Complete the following steps:

  1. Create the groups where the user accounts are assigned. The following code creates two different user groups:
    create group finance;
    create group admin;

    To view all user groups, query the PG_GROUP system catalog table (you should see finance and admin here):

    select * from pg_group:

  2. Create three database users with different privileges and add them to the groups. See the following code:
    create user finance1 password 'finance1Pass'
    in group finance;
    
    create user finance2 password 'finance2Pass'
    in group finance;
    
    create user admin1 password 'admin1Pass'
    in group admin;

    Validate the users have been successfully created. To view a list of users, query the PG_USER catalog table:

  3. Grant SELECT privileges to the FINANCE group and ALL privileges to the ADMIN group for your table AWSBilling201910 in redshift_schema. See the following code:
    grant select on table redshift_schema.AWSBilling201910 to group finance; 
    grant all on table redshift_schema.AWSBilling201910 to group admin;

    You can verify if you enforced database security correctly. The user finance1 tried to rename the table AWSBilling201910 in redshift_schema, but got a permission denied error message (due to restricted access). The following screenshot shows this scenario and the subsequent error message:

Example S3 inventory analysis

S3 charges split per bucket. The following query identifies the data storage and transfer costs for each separate S3 bucket:

SELECT
  "lineitem_productcode",
  "lineitem_usagetype",
  "lineitem_resourceid",
  b."storage_class",
  SUM(CASE
    WHEN "lineitem_usagetype" like '%Byte%' THEN "lineitem_usageamount"/1024
    ELSE "lineitem_usageamount"
  END) as "Usage",
  CASE
    WHEN "lineitem_usagetype" like '%Byte%' THEN 'TBs'
    ELSE 'Requests'
  END as "Usage Units",
  sum("lineitem_blendedcost") as cost
from awsbilling201902 a
  join spectrum_schema.data b
    on a.lineItem_ResourceId = b.bucket
where "product_productname" = 'Amazon Simple Storage Service'
group by
  "lineitem_productcode",
  "lineitem_usagetype",
  "lineitem_resourceid",
  b."storage_class"
order by
  sum("lineitem_blendedcost") desc;

The following screenshot shows the results of executing the above query:

Costs are split by type of storage (for example, Glacier versus standard storage).

The following query identifies S3 data transfer costs (intra-region and inter-region) by S3 storage class (usage amount, unblended cost, blended cost):

SELECT
 lineitem_productcode
 ,product_fromlocation
 ,product_tolocation,
  b.storage_class
 ,sum(lineitem_usageamount) usageamount
 ,sum(lineitem_unblendedcost) unblendedcost
 ,sum(lineitem_blendedcost) blendedcost
FROM
awsbilling201902 a
  join spectrum_schema.data b
ON
    a.lineItem_ResourceId = b.bucket
WHERE
 a.lineitem_productcode = 'AmazonS3'
 AND a.product_productfamily = 'Data Transfer'
GROUP BY
 1,2,3,4
ORDER BY
 usageamount desc;

The following screenshot shows the result of executing the above query:

The following query identifies S3 fee, API request, and storage charges:

SELECT
 lineitem_productcode
 ,product_productfamily
 ,b.storage_class
 ,sum(lineitem_usageamount) usageamount
 ,sum(lineitem_unblendedcost) unblendedcost
 ,sum(lineitem_blendedcost) blendedcost
FROM
awsbilling201902 a
  join spectrum_schema.data b
ON
   a.lineItem_ResourceId = b.bucket
WHERE
 a.lineitem_productcode = 'AmazonS3'
  and a.product_productfamily <> 'Data Transfer'
GROUP BY
 1,2,3
ORDER BY
 usageamount desc;

The following screenshot shows the result of executing the above query:

Server access logs sample analysis queries

S3 access log charges per operation type. The following query identifies the data storage and transfer costs for each separate HTTP operation:

SELECT
  "lineitem_productcode",
  "lineitem_usagetype",
  "lineitem_resourceid",
  b."operation",
  b."httpstatus",
  b."bytessent",
  SUM(CASE
      WHEN "lineitem_usagetype" like '%Byte%'
        THEN "lineitem_usageamount" / 1024
      ELSE "lineitem_usageamount"
      END) as "Usage",
  CASE
  WHEN "lineitem_usagetype" like '%Byte%'
    THEN 'TBs'
  ELSE 'Requests'
  END  as "Usage Units",
  sum("lineitem_blendedcost") as cost
from awsbilling201902 a
  join spectrum_schema.s3accesslogs b
    on a.lineItem_ResourceId = b.bucket
where "product_productname" = 'Amazon Simple Storage Service'
group by
  1, 2, 3, 4, 5, 6
order by
  sum("lineitem_blendedcost") desc;

The following screenshot shows the result of executing the above query:

The following query identifies S3 data transfer costs (intra-region and inter-region) by S3 operation and HTTP status (usage amount, unblended cost, blended cost):

SELECT
 lineitem_productcode
 ,product_fromlocation
 ,product_tolocation
 ,b.operation
 ,b.httpstatus
 ,sum(lineitem_usageamount) usageamount
 ,sum(lineitem_unblendedcost) unblendedcost
 ,sum(lineitem_blendedcost) blendedcost
FROM
awsbilling201902 a
  JOIN spectrum_schema.s3accesslogs b
ON
   a.lineItem_ResourceId = b.bucket
WHERE
 a.lineitem_productcode = 'AmazonS3'
 AND a.product_productfamily = 'Data Transfer'
GROUP BY
 1,2,3,4,5
ORDER BY
 usageamount desc;

The following screenshot shows the result of executing the above query:

The following query identifies S3 fee, API request, and storage charges:

SELECT
 lineitem_productcode
 ,product_productfamily
 ,b.operation
 ,b.httpstatus
 ,sum(lineitem_usageamount) usageamount
 ,sum(lineitem_unblendedcost) unblendedcost
 ,sum(lineitem_blendedcost) blendedcost
FROM
awsbilling201902 a
  JOIN spectrum_schema.s3accesslogs b
ON
   a.lineItem_ResourceId = b.bucket
WHERE
 a.lineitem_productcode = 'AmazonS3'
  and a.product_productfamily <> 'Data Transfer'
GROUP BY
 1,2,3,4
ORDER BY
 usageamount desc;

The following screenshot shows the result of executing the above query:

Overall data flow diagram

The following diagram shows the complete data flow for this solution.

Conclusion

AWS Glue makes provides an easy and convenient way to discover data stored in your S3 buckets automatically in a cloud-native, secure, and efficient way. This post demonstrated how to use AWS Glue and Amazon Redshift to analyze your S3 spend using Cost and Usage Reports. You also learned best practices for managing database security in Amazon Redshift through users and groups. Using this framework, you can start analyzing your S3 bucket spend with a few clicks in a matter of minutes on the AWS Management Console!

If you have questions or suggestions, please leave your thoughts in the comments section below.

 


About the Author

 Shayon Sanyal is a Data Architect, Data Lake for Global Financial Services at AWS.

 

 

 

How FactSet automated exporting data from Amazon DynamoDB to Amazon S3 Parquet to build a data analytics platform

Post Syndicated from Arvind Godbole original https://aws.amazon.com/blogs/big-data/how-factset-automated-exporting-data-from-amazon-dynamodb-to-amazon-s3-parquet-to-build-a-data-analytics-platform/

This is a guest post by Arvind Godbole, Lead Software Engineer with FactSet and Tarik Makota, AWS Principal Solutions Architect. In their own words “FactSet creates flexible, open data and software solutions for tens of thousands of investment professionals around the world, which provides instant access to financial data and analytics that investors use to make crucial decisions. At FactSet, we are always working to improve the value that our products provide.”

One area that we’ve been looking into is the relevancy of search results for our clients. Given the wide variety of client use cases and the large number of searches per day, we needed a platform to store anonymized usage data and allow us to analyze that data to boost results using our custom scoring algorithm. Amazon EMR was the obvious choice to host the calculations, but the question arose on how to get our anonymized data into a form that Amazon EMR could use. We worked with AWS and chose to use Amazon DynamoDB to prepare the data for usage in Amazon EMR.

This post walks you through how FactSet takes data from a DynamoDB table and converts that data into Apache Parquet. We store the Parquet files in Amazon S3 to enable near real-time analysis with Amazon EMR. Along the way, we encountered challenges related to data type conversion, which we will explain and show how we were able to overcome these.

Workflow overview

Our workflow contained the following steps:

  1. Anonymized log data is stored into DynamoDB tables. These entries have different fields, depending on how the logs were generated. Whenever we create items in the tables, we use DynamoDB Streams to write out a record. The stream records contain information from a single item in a DynamoDB table.
  2. An AWS Lambda function is hooked into the DynamoDB stream to capture the new items stored in a DynamoDB table. We built our Lambda function off of the lambda-streams-to-firehose project on GitHub to convert the DynamoDB stream image to JSON, which we stringify and push to Amazon Kinesis Data Firehose.
  3. Kinesis Data Firehose transforms the JSON data into Parquet using data contained within an AWS Glue Data Catalog table.
  4. Kinesis Data Firehose stores the Parquet files in S3.
  5. An AWS Glue crawler discovers the schema of DynamoDB items and stores the associated metadata into the Data Catalog.

The following diagram illustrates this workflow.

AWS Glue provides tools to help with data preparation and analysis. A crawler can run on a DynamoDB table to take inventory of the table data and store that information in a Data Catalog. Other services can use the Data Catalog as an index to the location, schema, and types of the table data. There are other ways to add metadata into a Data Catalog, but the key idea is that you can update and modify the metadata easily. For more information, see Populating the AWS Glue Data Catalog.

Problem: Data type disparities

Using a variety of technologies to build a solution often requires mapping and converting data types between these technologies. The cloud is no exception. In our case, log items stored in DynamoDB contained attributes of type String Set. String Set values caused data conversion exceptions when Kinesis tried to transform the data to Parquet. After investigating the problem, we found the following:

  • As the crawler indexes the DynamoDB table, Set data types (StringSet, NumberSet) are stored in the Glue metadata catalog as set<string> and set<bigint>.
  • Kinesis Data Firehose uses that same catalog when it performs the conversion to Apache Parquet. The conversion requires valid Hive data types.
  • set<string> and set<bigint> are not valid Hive data types, so the conversion fails, and an exception is generated. The exception looks similar to the following code:
    [{
       "lastErrorCode": "DataFormatConversion.InvalidSchema",
       "lastErrorMessage": "The schema is invalid. Error parsing the schema: Error: type expected at the position 38 of 'array,used:bigint>>' but 'set' is found."
    }]

Solution: Construct data mapping

While working with the AWS team, we confirmed that the Kinesis Data Firehose converter needs valid Hive data types in the Data Catalog to succeed. When it comes to complex data types, Hive doesn’t support set<data_type>, but it does support the following:

  • ARRAY<data_type>
  • MAP<primitive_type, data_type
  • STRUCT<col_name : data_type [COMMENT col_comment], ...>
  • UNIONTYPE<data_type, data_type, ...>

In our case, this meant that we must convert set<string> and set<bigint> into array<string> and array<bigint>. Our first step was to manually change the types directly in the Data Catalog. After we updated the Data Catalog to change all occurrences of set<data_type> to array<data_type>, the Kinesis transformation to Parquet completed successfully.

Our business case calls for a data store that can store items with different attributes in the same table and the addition of new attributes on-the-fly. We took advantage of DynamoDB’s schema-less nature and ability to scale up and down on-demand so we could focus on our functionality and not the management of the underlying infrastructure. For more information, see Should Your DynamoDB Table Be Normalized or Denormalized?

If our data had a static schema, a manual change would be good enough. Given our business case, a manual solution wasn’t going to scale. Every time we introduced new attributes to the DynamoDB table, we needed to run the crawler, which re-created the metadata and overwrote the change.

Serverless event architecture

To automate the data type updates to the Data Catalog, we used Amazon EventBridge and Lambda to implement the modifications to the data type mapping. EventBridge is a serverless event bus that connects applications using events. An event is a signal that a system’s state has changed, such as the status of a Data Catalog table.

The following diagram shows the previous workflow with the new architecture.

  1. The crawler stays as-is and crawls the DynamoDB table to obtain the metadata.
  2. The metadata obtained by the crawler is stored in the Data Catalog. Previous metadata is updated or removed, and changes (manual or automated) are overwritten.
  3. The event GlueTableChanged in EventBridge listens to any changes to the Data Catalog tables. After we receive the event that there was a change to the table, we trigger the Lambda function.
  4. The Lambda function uses AWS SDK to update the Glue Catalog table using the glue.update_table() API to replace occurrences of set<data_type> with array<data_type>.

To set up EventBridge, we set Event pattern to be “Pre-defined pattern by service”. For service provider, we selected AWS and Glue as service. Event Type we selected “Glue Data Catalog Table State Change”. The following screenshot shows the EventBridge configuration that sends events to the Lambda function that updates the Data Catalog.

The following is the baseline Lambda code:

# This is NOT production worthy code please modify and implement error handling routines as appropriate
import json
import logging
import boto3

glue = boto3.client('glue')

logger = logging.getLogger()
logger.setLevel(logging.INFO)

# Define subsegments manually
def table_contains_set(databaseName, tableName):
    
    # returns Glue Catalog description for Table structure
    response = glue.get_table( DatabaseName=databaseName,Name=tableName)
    logger.info(response)  
    
    # loop thru all the Columns of the table 
    isModified = False
    for i in response['Table']['StorageDescriptor']['Columns']: 
        logger.info("## Column: " + str(i['Name']))
        # if Column datatype starts with set< then change it to array<
        if i['Type'].find("set<") != -1:
            i['Type'] = i['Type'].replace("set<", "array<")
            isModified = True
            logger.info(i['Type'])
    
    if isModified:
        # following 3 statements simply clean up the response JSON so that update_table API call works
        del response['Table']['DatabaseName']
        del response['Table']['CreateTime']
        del response['Table']['UpdateTime']
        glue.update_table(DatabaseName=databaseName,TableInput=response['Table'],SkipArchive=True)
        
    logger.info("============ ### =============") 
    logger.info(response)
    
    return True
    
def lambda_handler(event, context):
    logger.info('## EVENT')
    # logger.info(event)
    # This is Sample of the event payload that would be received
    # { 'version': '0', 
    #   'id': '2b402842-21f5-1d76-1a9a-c90076d1d7da', 
    #   'detail-type': 'Glue Data Catalog Table State Change', 
    #   'source': 'aws.glue', 
    #   'account': '1111111111', 
    #   'time': '2019-08-18T02:53:41Z', 
    #   'region': 'us-east-1', 
    #   'resources': ['arn:aws:glue:us-east-1:111111111:table/ddb-glue-fh/ddb_glu_fh_sample'], 
    #   'detail': {
    #           'databaseName': 'ddb-glue-fh', 
    #           'changedPartitions': [], 
    #           'typeOfChange': 'UpdateTable', 
    #           'tableName': 'ddb_glu_fh_sample'
    #    }
    # }
    
    # get the database and table name of the Glue table triggered the event
    databaseName = event['detail']['databaseName']
    tableName = event['detail']['tableName']
    logger.info("DB: " + databaseName + " | Table: " + tableName)
    
    table_contains_set(databaseName, tableName)
   
    # TODO implement and modify
    return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

The Lambda function is straightforward; this post provides a basic skeleton. You can use this as a template to implement your own functionality for your specific data.

Conclusion

Simple things such as data type conversion and mapping can create unexpected outcomes and challenges when data crosses service boundaries. One of the advantages of AWS is the wide variety of tools with which you can create robust and scalable solutions tailored to your needs. Using event-driven architecture, we solved our data type conversion errors and automated the process to eliminate the issue as we move forward.

 


About the Authors

Arvind Godbole is a Lead Software Engineer at FactSet Research Systems. He has experience in building high-performance, high-availability client facing products and services, ranging from real-time financial applications to search infrastructure. He is currently building an analytics platform to gain insights into client workflows. He holds a B.S. in Computer Engineering from the University of California, San Diego

 

 

 

Tarik Makota is a Principal Solutions Architect with the Amazon Web Services. He provides technical guidance, design advice and thought leadership to AWS’ customers across US Northeast. He holds an M.S. in Software Development and Management from Rochester Institute of Technology.

 

 

Amazon Redshift at re:Invent 2019

Post Syndicated from Corina Radovanovich original https://aws.amazon.com/blogs/big-data/amazon-redshift-at-reinvent-2019/

The annual AWS re:Invent learning conference is an exciting time full of new product and program launches. At the first re:Invent conference in 2012, AWS announced Amazon Redshift. Since then, tens of thousands of customers have started using Amazon Redshift as their cloud data warehouse. In 2019, AWS shared several significant launches and dozens of sessions. This post presents highlights of what happened with Amazon Redshift at re:Invent 2019.

Andy Jassy’s AWS re:Invent 2019 keynote

When Andy Jassy takes the stage to talk about what’s new at AWS, he launches the new Amazon Redshift node type, RA3 with managed storage; the new Federated Query (preview) feature, Export to Data Lake; and Advanced Query Accelerator (AQUA) (preview) for Amazon Redshift. Watch AWS re:Invent 2019 – Keynote with Andy Jassy on YouTube, or jump ahead for the Amazon RedShift announcements.

Deep dive and best practices for Amazon Redshift

Every year the Amazon Redshift deep dive session rates highly, and people continue to watch and re-watch it after the event. This year was no different. Specialist Solution Architects Harshida Patel and Tony Gibbs take an in-depth look at best practices for data warehousing with Amazon Redshift. It’s a must-see for existing Amazon Redshift users. Watch AWS re:Invent 2019: Deep dive and best practices for Amazon Redshift (ANT418) on YouTube.

What’s new with Amazon Redshift, featuring Yelp and Workday

With over 200 new features and capabilities launched in the last 18 months, there’s a lot to cover in a session about what’s new with Amazon Redshift. Join one of the Product Managers driving RA3 and managed storage, Himanshu Raja, to catch up on the recent performance, concurrency, elasticity, and manageability enhancements behind Amazon Redshift’s record price-to-performance ratio. You also get more insight into the architectural evolution of Amazon Redshift with RA3 and managed storage, and how it uses machine learning to create a self-optimizing data warehouse. In the second half of the session, you hear from Steven Moy, a software engineer at Yelp, about how Amazon Redshift’s latest features have helped Yelp achieve optimization and scale for an organization with an enormous about of data and sophisticated analytics. Watch AWS re:Invent 2019: What’s new with Amazon Redshift, featuring Yelp (ANT320-R1) on YouTube.

The session repeated with Michalis Petropoulos, Director of Engineering, and Erol Guney of Workday. Watch the full session to get a slightly different take on what’s new, or jump to the customer presentation to hear from Erol Guney, Architect, Data Platform, at Workday about how Amazon Redshift empowers their Data as a Service product team to focus on architecture goals and business logic.

Migrate your data warehouse to the cloud in record time, featuring Nielsen and Fannie Mae

In this session, you learn about important concepts and tips for migrating your legacy on-premise data warehouse to the cloud. You hear from Tejas Desai, VP of Technology at Neilsen, about their migration journey and benefits. Watch AWS re:Invent 2019: Migrate your data warehouse to the cloud, featuring Nielsen (ANT334-R1) on YouTube.

The repeat of this session features Amy Tseng from Fannie Mae. If you don’t want to listen to Tony’s overview again, skip ahead to learn how Fannie Mae embraced a data lake architecture with Amazon Redshift for analytics to save costs, maximize performance, and scale. Amy’s presentation was a crowd favorite, with some of the most positive customer feedback and a wealth of great information about how Fannie Mae managed their migration.

How to scale data analytics with Amazon Redshift, featuring Duolingo and Warner Brothers Interactive Entertainment

Data is growing fast, and so is the value that business users need to gain from business data. When AWS first announced Amazon Redshift in 2012, it could handle up to 640 TB of compressed data. It can now scale to 8 PB of compressed data. Learn more about Amazon Redshift’s unique ability to deliver top performance at the lowest and most predictable cost from Vinay Shukla, Principal Product Manager. This is an especially important session if you want to learn more about the newest Amazon Redshift node type RA3. You also hear from Jonathan Burket of Duolingo about their experience in the preview of RA3 nodes and how Duolingo uses Amazon Redshift. Duolingo is a wildly popular language-learning platform and the most downloaded education app in the world, with over 300 million users. Enabling data-driven decisions with A/B tests and ad hoc analysis has been a driver of their success. Watch AWS re:Invent 2019: How to scale data analytics with Amazon Redshift (ANT335-R2) on YouTube.

The repeat session features Redshift Product Manager Maor Kleider with an in-depth case study from Matt Howell, Executive Director, Analytics, and Kurt Larson, Technical Director, Analytics, at Warner Brothers Interactive Entertainment. Watch the full session for another perspective about how to scale with the latest Amazon Redshift features, with unique insights about analytics across Amazon Redshift and your data lake. You can also jump to the customer presentation. Not only is this session packed with interesting insights about how data analytics drives the success of games like Batman and Mortal Kombat, it also has an action-packed trailer about all the awesome Warner Brothers games.

If you prefer to see a session without the announcements from the keynote and with demos, watch Debu Panda showcase the new Amazon Redshift console and share practical tips about using Amazon Redshift.

Amazon Redshift reimagined: RA3 and AQUA

This embargoed session is the first opportunity to learn more about AQUA for Amazon Redshift, and how it improves query performance to up to 10 times faster. Britt Johnston, Director of Product Management, kicks off with an intro into the next generation of Amazon Redshift, and Senior Principal Engineer Andy Caldwell jumps in to share the origin and vision of the exciting new technology. The enthusiasm Andy feels about sharing AQUA with customers for the first time is palpable. Watch AWS re:Invent 2019: [NEW LAUNCH!] Amazon Redshift reimagined: RA3 and AQUA (ANT230) on YouTube.

State-of-the-art cloud data warehousing, featuring Asurion and Comcast

This session serves as a great introduction to cloud data warehousing at AWS, with insightful presentations from a different customer in each delivery. You can hear from Asurion about how they use data analytics to serve over 300 million people with excellent customer satisfaction scores. You learn about how to use AWS services with Amazon Redshift and why Asurion believes in their data lake-based architecture. Watch AWS re:Invent 2019: State-of-the-art cloud data warehousing, featuring Asurion (ANT213-R1) on YouTube.

In the repeat session, Rajat Garg, Senior Principal Architect from Comcast, talks about moving to Amazon Redshift from a legacy on-premise Oracle Exadata environment. He shares their strategy, approach, and performance improvements.

What’s next and more information

In addition to these sessions at re:Invent, there are also hands-on workshops, intimate builder roundtables, and interactive chalk talks that weren’t recorded.

Keep exploring the following links for more information about new releases:

We hope to see you in Las Vegas for re:Invent 2020, or at one of the hundreds of other AWS virtual and in-person events running around the world. For more information, see AWS Events and Webinars.


About the authors

Corina Radovanovich leads product marketing for cloud data warehousing at AWS.

 

 

 

How Verizon Media Group migrated from on-premises Apache Hadoop and Spark to Amazon EMR

Post Syndicated from Lev Brailovskiy original https://aws.amazon.com/blogs/big-data/how-verizon-media-group-migrated-from-on-premises-apache-hadoop-and-spark-to-amazon-emr/

This is a guest post by Verizon Media Group.

At Verizon Media Group (VMG), one of the major problems we faced was the inability to scale out computing capacity in a required amount of time—hardware acquisitions often took months to complete. Scaling and upgrading hardware to accommodate workload changes was not economically viable, and upgrading redundant management software required significant downtimes and carried a large amount of risk.

At VMG, we depend on technologies such as Apache Hadoop and Apache Spark to run our data processing pipelines. We previously managed our clusters with Cloudera Manager, which was subject to slow release cycles. As a result, we ran older versions of available open-source releases and couldn’t take advantage of the latest bug fixes and performance improvements on Apache projects. These reasons, combined with our already existing investment in AWS, made us explore migrating our distributed computing pipelines to Amazon EMR.

Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark.

This post discusses the issues we encountered and solved while building a pipeline to address our data processing needs.

About us

Verizon Media is, ultimately, an online advertising company. Most online advertising today is done through display ads, also known as banners or video ads. Regardless of format, all internet ads usually fire various kinds of beacons to tracking servers, which are usually highly scalable web server deployments with a sole responsibility to log received beacons to one or multiple event sinks.

Pipeline architecture

In our group, which deals mostly with video advertising, we use NGINX web servers deployed in multiple geographical locations, which log events fired from our video player directly to Apache Kafka for real-time processing and to Amazon S3 for batch processing. A typical data pipeline in our group involves processing such input feeds, applying validation and enrichment routines, aggregating resulting data, and replicating it to further destinations for reporting purposes. The following diagram shows a typical pipeline that we created.

We start getting data on our NGINX beacon servers. The data is stored in 1-minute intervals on local disk in gzip files. Every minute, we move the data from NGINX servers to raw data location in S3. Upon landing on S3, the file sends a message to Amazon SQS. Apache NiFi is listening to SQS messages to start working on files. During this time, NiFi groups smaller files into larger files and stores the outcome in a special path on a temporary location on S3. The path name is combined using an inverse timestamp to make sure we store data in a random location to avoid reading bottlenecks.

Every hour, we scale out a Spark cluster on Amazon EMR to process the raw data. This processing includes enriching and validating the data. This data is stored in a permanent location folder on S3 in an Apache ORC columnar format. We also update the AWS Glue Data Catalog to expose this data in Amazon Athena in case we need to investigate it for issues. After raw data processing is finished, we downscale the Spark EMR cluster and start aggregating data based on pre-defined aggregation templates using Presto on Amazon EMR. The aggregated data is stored in ORC format in a special location on S3 for aggregated data.

We also update our Data Catalog with the location of the data so we can query it with Athena. Additionally, we replicate the data from S3 into Vertica for our reporting to expose the data to internal and external customers. In this scenario, we use Athena as the disaster recovery (DR) solution for Vertica. Every time our reporting platform sees that Vertica is in bad health, we automatically fail over to Amazon Athena. This solution proved to be extremely cost-effective for us. We have another use case for Athena in our real-time analytics that we do not discuss in this post.

Migration challenges

Migration to Amazon EMR required us to make some design changes to get the best results. When running big data pipelines on the cloud, operational cost optimization is the name of the game. The two major costs are storage and compute. In traditional on-premises Hadoop warehouses, these are coupled as storage nodes that also serve as computation nodes. This can provide a performance advantage due to data locality. However, the disadvantage of this coupling is that any changes to the storage layer, such as maintenance, can also affect the computational layer. In an environment such as AWS, we can decouple storage and computation by using S3 for storage and Amazon EMR for computation. This provides a major flexibility advantage when dealing with cluster maintenance because all clusters are ephemeral.

To further save costs, we had to figure out how to achieve maximum utilization on our computational layer. This meant that we had to switch our platform to using multiple clusters for different pipelines, where each cluster is automatically scaled depending on the pipeline’s needs.

Switching to S3

Running a Hadoop data warehouse on S3 introduces additional considerations. S3 is not a file system like HDFS and does not provide the same immediate consistency guarantees. You can consider S3 as an eventually consistent object store with a REST API to access it.

The rename

A key difference with S3 is that rename is not an atomic operation. All rename operations on S3 run a copy followed by a delete operation. Executing renames on S3 is undesirable due to running time costs. To use S3 efficiently, you must remove the use of any rename operations. Renames are commonly used in Hadoop warehouses at various commit stages, such as moving a temporary directory to its final destination as an atomic operation. The best approach is to avoid any rename operations and instead write data once.

Output committers

Both Spark and Apache MapReduce jobs have commit stages that commit output files produced by multiple distributed workers to final output directories. Explaining how output committers work is beyond the scope of this post, but the important thing is that standard default output committers designed to work on HDFS depend on rename operations, which as explained previously have a performance penalty on storage systems like S3. A simple strategy that worked for us was disabling speculative execution and switching the output committer’s algorithm version. It is also possible to write your own custom committers, which do not depend on renames. For example, as of Amazon EMR 5.19.0, AWS released a custom OutputCommitter for Spark that optimizes writes to S3.

Eventual consistency

One of the major challenges working with S3 is that it is eventually consistent, whereas HDFS is strongly consistent. S3 does offer read-after-write guarantees for PUTS of new objects, but this is not always enough to build consistent distributed pipelines on. One common scenario that comes up a lot in big data processing is one job outputting a list of files to a directory and another job reading from that directory. For the second job to run, it has to list the directory to find all the files it has to read. In S3, there are no directories; we simply list files with the same prefix, which means you might not see all the new files immediately after your first job is finished running.

To address this issue, AWS offers EMRFS, which is a consistency layer added on top of S3 to make it behave like a consistent file system. EMRFS uses Amazon DynamoDB and keeps metadata about every file on S3. In simple terms, with EMRFS enabled when listing an S3 prefix, the actual S3 response is compared to metadata on DynamoDB. If there’s a mismatch, the S3 driver polls a little longer and waits for data to show up on S3.

In general, we found that EMRFS was necessary to ensure data consistency. For some of our data pipelines, we use PrestoDB to aggregate data that is stored on S3, where we chose to run PrestoDB without EMRFS support. While this has exposed us to the eventual consistency risk for our upstream jobs, we found that we can work around these issues by monitoring for discrepancies between downstream and upstream data and rerunning the upstream jobs if needed. In our experience, consistency issues happen very rarely, but they are possible. If you choose to run without EMRFS, you should design your system accordingly.

Automatic scaling strategies

An important and yet in some ways trivial challenge was figuring out how to take advantage of Amazon EMR automatic scaling capabilities. To achieve optimal operational costs, we want to make sure no server is sitting idle.

To achieve that, the answer might seem obvious—create a long-running EMR cluster and use readily available automatic scaling features to control a cluster’s size based on a parameter, such as available free memory on the cluster. However, some of our batch pipelines start every hour, run for exactly 20 minutes, and are computationally very intensive. Because processing time is very important, we want to make sure we don’t waste any time. The optimal strategy for us is to preemptively resize the cluster through custom scripts before particular big batch pipelines start.

Additionally, it would be difficult to run multiple data pipelines on a single cluster and attempt to keep it at optimal capacity at any given moment because every pipeline is slightly different. We have instead opted to run all our major pipelines on independent EMR clusters. This has a lot of advantages and only a minor disadvantage. The advantages are that each cluster can be resized at exactly the required time, run the software version required by its pipeline, and be managed without affecting other pipelines. The minor disadvantage is that there’s a small amount of computational waste by running extra name nodes and task nodes.

When developing an automatic scaling strategy, we first tried to create and drop clusters every time we need to run our pipelines. However, we quickly found that bootstrapping a cluster from scratch can take more time than we’d like. We instead keep these clusters always running, and we upsize the cluster by adding task nodes before the pipeline starts and remove the task nodes as soon as the pipeline ends. We found that by simply adding task nodes, we can start running our pipelines much faster. If we run into issues with long-running clusters, we can quickly recycle and create a new one from scratch. We continue to work with AWS on these issues.

Our custom automatic scaling scripts are simple Python scripts, which usually run before a pipeline starts. For example, assume that our pipeline consists of a simple MapReduce job with a single mapping and reduce phase. Also assume that the mapping phase is more computationally expensive. We can write a simple script that looks at the amount of data that needs to be processed the next hour and figures out the amount of mappers that are needed to process this data in the same way that a Hadoop job does. When we know the amount of mapping tasks, we can decide how many servers we want to run all the mapper tasks in parallel.

When running Spark real-time pipelines, things are a little trickier because we sometimes have to remove computational resources while the application is running. A simple strategy that worked for us is to create a separate real-time cluster in parallel to the existing one, scale it up to a required size based on amount of data processed during the last hour with some extra capacity, and restart the real-time application on the new cluster.

Operational costs

You can evaluate all AWS costs up front with the EC2 calculator. The main costs when running big data pipelines are storage and computation, with some extra minor costs such as DynamoDB when using EMRFS.

Storage costs

The first cost to consider is storage. Because HDFS has a default replication factor of 3, it would require 3 PB of actual storage capacity instead of 1 PB.

Storing 1 GB on S3 costs ±$0.023 per month. S3 is already highly redundant so you don’t need to take the replication factor into account, which reduces our costs immediately by 67%. You should also consider the other costs for write or read requests, but these usually tend to be small.

Computation costs

The second-largest cost after storage is the cost of computation. To reduce computation costs, you should take advantage of reserved instance pricing as much as possible. An m4.4xlarge instance type with 16 VCPUs on AWS costs $0.301 an hour when it is reserved for 3 years, with all fees up-front. An On-Demand Instance costs $0.8 an hour, which is a 62% difference in price. This is easier to achieve in larger organizations that perform regular capacity planning. An extra hourly fee of $0.24 is added to every Amazon EMR machine for the use of the Amazon EMR platform. It is possible to reduce costs even further by using Amazon EC2 Spot Instances. For more information, see Instance Purchasing Options.

To achieve optimal operational costs, try to make sure that your computation clusters are never sitting idle and try to downscale dynamically based on the amount of work your clusters are doing at any given moment.

Final thoughts

We have been operating our big data pipelines on Amazon EMR for over a year and storing all our data on S3. At times, our real-time processing pipelines have peaked at handling more than 2 million events per second, with a total processing latency from the initial event to updated aggregates of 1 minute. We’ve been enjoying the flexibility around Amazon EMR and its ability to tear down and recreate clusters in a matter of minutes. We are satisfied with the overall stability of the Amazon EMR platform and we will continue working with AWS to improve it.

As we have mentioned before, cost is a major factor to consider, and you could argue that it could be cheaper to run Hadoop in your own data centers. However, this argument hinges on your organization’s ability to do so efficiently; it may have hidden operational costs as well as reduce elasticity. We know through first-hand experience that running on-premises is not an undertaking that you should take lightly and requires a lot of planning and maintenance. We believe that platforms such as Amazon EMR bring a lot of advantages when designing big data systems.

Disclaimer: The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.


About the authors

Lev Brailovskiy is Director of Engineering leading Service Engineering Group in Supply Side Platform (SSP) at Verizon Media. He has over 15 years of experience designing and building software systems. In the past six years, Lev spent time designing, developing, and running large-scale reporting and data processing software both in private Data Centers and in the public Cloud. He can be be contacted via LinkedIn.

 

 

Zilvinas Shaltys is Technical Lead for the Video Syndication cloud data warehouse platform at Verizon. Zilvinas has years of experience working with a wide variety of big data technologies deployed at considerable scale. He was responsible for migrating big data pipelines from AOL data centers to Amazon EMR. Zilvinas is currently working on improving stability and scalability of existing batch and realtime big data systems. He can be contacted via LinkedIn.