Tag Archives: Analytics

Automating bucketing of streaming data using Amazon Athena and AWS Lambda

Post Syndicated from Ahmed Saef Zamzam original https://aws.amazon.com/blogs/big-data/automating-bucketing-of-streaming-data-using-amazon-athena-and-aws-lambda/

In today’s world, data plays a vital role in helping businesses understand and improve their processes and services to reduce cost. You can use several tools to gain insights from your data, such as Amazon Kinesis Data Analytics or open-source frameworks like Structured Streaming and Apache Flink to analyze the data in real time. Alternatively, you can batch analyze the data by ingesting it into a centralized storage known as a data lake. Data lakes allow you to import any amount of data that can come in real time or batch. With Amazon Simple Storage Service (Amazon S3), you can cost-effectively build and scale a data lake of any size in a secure environment where data is protected by 99.999999999% (11 9s) of durability.

After the data lands in your data lake, you can start processing this data using any Big Data processing tool of your choice. Amazon Athena is a fully managed interactive query service that enables you to analyze data stored in an Amazon S3-based data lake using standard SQL. You can also integrate Athena with Amazon QuickSight for easy visualization of the data.

When working with Athena, you can employ a few best practices to reduce cost and improve performance. Converting to columnar formats, partitioning, and bucketing your data are some of the best practices outlined in Top 10 Performance Tuning Tips for Amazon Athena. Bucketing is a technique that groups data based on specific columns together within a single partition. These columns are known as bucket keys. By grouping related data together into a single bucket (a file within a partition), you significantly reduce the amount of data scanned by Athena, thus improving query performance and reducing cost. For example, imagine collecting and storing clickstream data. If you frequently filter or aggregate by user ID, then within a single partition it’s better to store all rows for the same user together. If user data isn’t stored together, then Athena has to scan multiple files to retrieve the user’s records. This leads to more files being scanned, and therefore, an increase in query runtime and cost.

Like partitioning, columns that are frequently used to filter the data are good candidates for bucketing. However, unlike partitioning, with bucketing it’s better to use columns with high cardinality as a bucketing key. For example, Year and Month columns are good candidates for partition keys, whereas userID and sensorID are good examples of bucket keys. By doing this, you make sure that all buckets have a similar number of rows. For more information, see Bucketing vs Partitioning.

For real-time data (such as data coming from sensors or clickstream data), streaming tools like Amazon Kinesis Data Firehose can convert the data to columnar formats and partition it while writing to Amazon S3. With Kafka, you can do the same thing with connectors. But what about bucketing? This post shows how to continuously bucket streaming data using AWS Lambda and Athena.

Overview of solution

The following diagram shows the high-level architecture of the solution.

The architecture includes the following steps:

  1. We use the Amazon Kinesis Data Generator (KDG) to simulate streaming data. Data is then written into Kinesis Data Firehose; a fully managed service that enables you to load streaming data to an Amazon S3-based data lake.
  2. Kinesis Data Firehose partitions the data by hour and writes new JSON files into the current partition in a /raw Each new partition looks like /raw/dt=<YYYY-MM-dd-HH>. Every hour, a new partition is created.
  3. Two Lambda functions are triggered on an hourly basis based on Amazon CloudWatch Events.
    • Function 1 (LoadPartition) runs every hour to load new /raw partitions to Athena SourceTable, which points to the /raw prefix.
    • Function 2 (Bucketing) runs the Athena CREATE TABLE AS SELECT (CTAS) query.
  4. The CTAS query copies the previous hour’s data from /raw to /curated and buckets the data while doing so. It loads the new data as a new partition to TargetTable, which points to the /curated prefix.

Overview of walkthrough

In this post, we cover the following high-level steps:

  1. Install and configure the KDG.
  2. Create a Kinesis Data Firehose delivery stream.
  3. Create the database and tables in Athena.
  4. Create the Lambda functions and schedule them.
  5. Test the solution.
  6. Create view that the combines data from both tables.
  7. Clean up.

Installing and configuring the KDG

First, we need to install and configure the KDG in our AWS account. To do this, we use the following AWS CloudFormation template.

For more information about installing the KDG, see the KDG Guide in GitHub.

To configure the KDG, complete the following steps:

  1. On the AWS CloudFormation console, locate the stack you just created.
  2. On the Outputs tab, record the value for KinesisDataGeneratorUrl.
  3. Log in to the KDG main page using the credentials created when you deployed the CloudFormation template.
  4. In the Record template section, enter the following template. Each record has three fields: sensorID, currentTemperature, and status.
    {
        "sensorId": {{random.number(4000)}},
        "currentTemperature": {{random.number(
            {
                "min":10,
                "max":50
            }
        )}},
        "status": "{{random.arrayElement(
            ["OK","FAIL","WARN"]
        )}}"
    }
    

  5. Choose Test template.

The result should look like the following screenshot.

We don’t start sending data now; we do this after creating all other resources.

Creating a Kinesis Data Firehose delivery stream

Next, we create the Kinesis Data Firehose delivery stream that is used to load the data to the S3 bucket.

  1. On the Amazon Kinesis console, choose Kinesis Data Firehose.
  2. Choose Create delivery stream.
  3. For Delivery stream name, enter a name, such as AutoBucketingKDF.
  4. For Source, select Direct PUT or other sources.
  5. Leave all other settings at their default and choose Next.
  6. On Process Records page, leave everything at its default and choose Next.
  7. Choose Amazon S3 as the destination and choose your S3 bucket from the drop-down menu (or create a new one). For this post, I already have a bucket created.
  8. For S3 Prefix, enter the following prefix:
    raw/dt=!{timestamp:yyyy}-!{timestamp:MM}-!{timestamp:dd}-!{timestamp:HH}/

We use custom prefixes to tell Kinesis Data Firehose to create a new partition every hour. Each partition looks like this: dt=YYYY-MM-dd-HH. This partition-naming convention conforms to the Hive partition-naming convention, <PartitionKey>=<PartitionKey>. In this case, <PartitionKey> is dt and <PartitionValue> is YYYY-MM-dd-HH. By doing this, we implement a flat partitioning model instead of hierarchical (year=YYYY/month=MM/day=dd/hour=HH) partitions. This model can be much simpler for end-users to work with, and you can use a single column (dt) to filter the data. For more information on flat vs. hierarchal partitions, see Data Lake Storage Foundation on GitHub.

  1. For S3 error prefix, enter the following code:
    myFirehoseFailures/!{firehose:error-output-type}/

  2. On the Settings page, leave everything at its default.
  3. Choose Create delivery stream.

Creating an Athena database and tables

In this solution, the Athena database has two tables: SourceTable and TargetTable. Both tables have identical schemas and will have the same data eventually. However, each table points to a different S3 location. Moreover, because data is stored in different formats, Athena uses a different SerDe for each table to parse the data. SourceTable uses JSON SerDe and TargetTable uses Parquet SerDe. One other difference is that SourceTable’s data isn’t bucketed, whereas TargetTable’s data is bucketed.

In this step, we create both tables and the database that groups them.

  1. On the Athena console, create a new database by running the following statement:
    CREATE DATABASE mydatabase

  2. Choose the database that was created and run the following query to create SourceTable. Replace <s3_bucket_name> with the bucket name you used when creating the Kinesis Data Firehose delivery stream.
    CREATE EXTERNAL TABLE mydatabase.SourceTable(
      sensorid string, 
      currenttemperature int, 
      status string)
    PARTITIONED BY ( 
      dt string)
    ROW FORMAT SERDE 
      'org.openx.data.jsonserde.JsonSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
    LOCATION
      's3://<s3_bucket_name>/raw/'
    

  3. Run the following CTAS statement to create TargetTable:
    CREATE TABLE TargetTable
    WITH (
          format = 'PARQUET', 
          external_location = 's3://<s3_bucket_name>/curated/', 
          partitioned_by = ARRAY['dt'], 
          bucketed_by = ARRAY['sensorID'], 
          bucket_count = 3) 
    AS SELECT *
    FROM SourceTable

SourceTable doesn’t have any data yet. However, the preceding query creates the table definition in the Data Catalog. We configured this data to be bucketed by sensorID (bucketing key) with a bucket count of 3. Ideally, the number of buckets should be so that the files are of optimal size.

Creating Lambda functions

The solution has two Lambda functions: LoadPartiton and Bucketing. We use an AWS Serverless Application Model (AWS SAM) template to create, deploy, and schedule both functions.

Follow the instructions in the GitHub repo to deploy the template. When deploying the template, it asks you for some parameters. You can use the default parameters, but you have to change S3BucketName and AthenaResultLocation. For more information, see Parameter Details in the GitHub repo.

LoadPartition function

The LoadPartiton function is scheduled to run the first minute of every hour. Every time Kinesis Data Firehose creates a new partition in the /raw folder, this function loads the new partition to the SourceTable. This is crucial because the second function (Bucketing) reads this partition the following hour to copy the data to /curated.

Bucketing function

The Bucketing function is scheduled to run the first minute of every hour. It copies the last hour’s data from SourceTable to TargetTable. It does so by creating a tempTable using a CTAS query. This tempTable points to the new date-hour folder under /curated; this folder is then added as a single partition to TargetTable.

To implement this, the function runs three queries sequentially. The queries use two parameters:

  • <s3_bucket_name> – Defined by an AWS SAM parameter and should be the same bucket used throughout this solution
  • <last_hour_partition> – Is calculated by the function depending on which hour it’s running

The function first creates TempTable as the result of a SELECT statement from SourceTable. It stores the results in a new folder under /curated. The results are bucketed and stored in Parquet format. See the following code:

CREATE TABLE TempTable
    WITH (
      format = 'PARQUET', 
      external_location = 's3://<s3_bucket_name>/curated/dt=<last_hour_partition>/', 
      bucketed_by = ARRAY['sensorID'], 
      bucket_count = 3) 
    AS SELECT *
    FROM SourceTable
    WHERE dt='<last_hour_partiton>';

We create a new subfolder in /curated, which is new partition for TargetTable. So, after the TempTable creation is complete, we load the new partition to TargetTable:

ALTER TABLE TargetTable
                ADD IF NOT EXISTS
                PARTITION ('<last_hour_partiton>');

Finally, we delete tempTable from the Data Catalog:

DROP TABLE TempTable

Testing the solution

Now that we have created all resources, it’s time to test the solution. We start by generating data from the KDG and waiting for an hour to start querying data in TargetTable (the bucketed table).

  1. Log in to the KDG. You should find the template you created earlier. For the configuration, choose the following:
    1. The Region used.
    2. For the delivery stream, choose the Kinesis Data Firehose you created earlier.
    3. For records/sec, enter 3000.
  2. Choose Send data.

The KDG starts sending simulated data to Kinesis Data Firehose. After 1 minute, a new partition should be created in Amazon S3.

The Lambda function that loads the partition to SourceTable runs on the first minute of the hour. If you started sending data after the first minute, this partition is missed because the next run loads the next hour’s partition, not this one. To mitigate this, run MSCK REPAIR TABLE SourceTable only for the first hour.

  1. To benchmark the performance between both tables, wait for an hour so that the data is available for querying in TargetTable.
  2. When the data is available, choose one sensorID and run the following query on SourceTable and TargetTable.
    SELECT sensorID, avg(currenttemperature) as AverageTempreture 
    FROM <TableName>
    WHERE dt='<YYYY-MM-dd-HH>' AND sensorID ='<sensorID_selected>'
    GROUP BY 1

The following screenshot shows the query results for SourceTable. It shows the runtime in seconds and amount of data scanned.

The following screenshot shows the query results for TargetTable.

If you look at these results, you don’t see a huge difference in runtime for this specific query and dataset; for other datasets, this difference should be more significant. However, from a data scanning perspective, after bucketing the data, we reduced the data scanned by approximately 98%. Therefore, for this specific use case, bucketing the data lead to a 98% reduction in Athena costs because you’re charged based on the amount of data scanned by each query.

Querying the current hour’s data

Data for the current hour isn’t available immediately in TargetTable. It’s available for querying after the first minute of the following hour. To query this data immediately, we have to create a view that UNIONS the previous hour’s data from TargetTable with the current hour’s data from SourceTable. If data is required for analysis after an hour of its arrival, then you don’t need to create this view.

To create this view, run the following query in Athena:

CREATE OR REPLACE VIEW combined AS

SELECT *, "$path" AS file
FROM SourceTable
WHERE dt >= date_format(date_trunc('hour', (current_timestamp)), '%Y-%m-%d-%H')

UNION ALL 

SELECT *, "$path" AS file
FROM TargetTable
WHERE dt < date_format(date_trunc('hour', (current_timestamp)), '%Y-%m-%d-%H')

Cleaning up

Delete the resources you created if you no longer need them.

  1. Delete the Kinesis Data Firehose delivery stream.
  2. In Athena, run the following statements
    1. DROP DATABASE mydatabase
    2. DROP TABLE SourceTable
    3. DROP TABLE TargetTable
  3. Delete the AWS SAM template to delete the Lambda functions.
  4. Delete the CloudFormation stack for the KDG. For more information, see Deleting a stack on the AWS CloudFormation console.

Conclusion

Bucketing is a powerful technique and can significantly improve performance and reduce Athena costs. In this post, we saw how to continuously bucket streaming data using Lambda and Athena. We used a simulated dataset generated by Kinesis Data Generator. The same solution can apply to any production data, with the following changes:

  • DDL statements
  • Functions used can work with data that is partitioned by hour with the partition key ‘dt’ and partition value <YYYY-MM-dd-HH>. If your data is partitioned in a different way, edit the Lambda functions accordingly.
  • Frequency of Lambda triggers.

About the Author

Ahmed Zamzam is a Solutions Architect with Amazon Web Services. He supports SMB customers in the UK in their digital transformation and their cloud journey to AWS, and specializes in Data Analytics. Outside of work, he loves traveling, hiking, and cycling.

 

 

 

 

Bringing the power of embedded analytics to your apps and services with Amazon QuickSight

Post Syndicated from Dorothy Li original https://aws.amazon.com/blogs/big-data/bringing-the-power-of-embedded-analytics-to-your-apps-and-services-with-amazon-quicksight/

In the world we live in today, companies need to quickly react to change—and to anticipate it. Customers tell us that their reliance on data has never been greater than what it is today. To improve your decision-making, you have two types of data transformation needs: data agility, the speed at which data turns into insights, and data transparency, the need to present insights to decision makers. Going forward, we expect data transformation projects to become a centerpiece in every organization, big or small.

Furthermore, applications are migrating to the cloud faster than ever. Applications need to scale quickly to potentially millions of users, have global availability, manage petabytes of data, and respond in milliseconds. Such modern applications are built with a combination of these new architecture patterns, operational models, and software delivery processes, and allow businesses to innovate faster while reducing risk, time-to-market, and total cost of ownership.

An emerging area from these two trends is to combine the power of application modernization with data transformation. This emerging trend is often called embedded analytics, and is the focus of this post.

The case for embedded analytics

Applications generate a high volume of structured and unstructured data. This could be clickstream data, sales data, data from IoT devices, social data, and more. Customers who are building these applications (such as software-as-a-service (SaaS) apps or enterprise portals) often tell us that their end-users find it challenging to derive meaning from this data because traditional business intelligence (BI) approaches don’t always work.

Traditional BI tools live in disparate systems and require data engineering teams to provide connectivity and continous integration with the application, adding to complexity and delays in the overall process. Even after the connectivity is built, you must switch back and forth between your application and the BI tool, causing frustration and decreasing the overall pace of decision-making. Customers tell us that their development teams are constantly looking for new ways to delight their users, and embedding the BI capability directly into their applications is one of the most requested asks from their end-users.

Given the strategic importance of this capability, you can use this to differentiate and up-sell as a new service in their applications. Gartner research demonstrates that 63% of CEOs expect to adopt a product-as-a-service model in the next two years, making this a major market opportunity. For example, if you provide financial services software, you can empower users to perform detailed analysis of portfolio performance trends. An HR solution might enable managers to visualize and predict turnover rates. A supply chain management solution could embed the ability to slice and dice KPIs and better understand the efficiency of logistics routes.

Comparing common approaches to embedded analytics

The approach to building an embedded analytics capability needs to deliver on the requirements of modern applications. It must be scalable, handle large amounts of data without compromising agility, and seamlessly integrate with the application’s user experience. Choosing the right methodology becomes especially important in the face of these needs.

You can build your own embedded analytics solution, but although this gives you maximum control, it has a number of disadvantages. You have to hire specialized resources (such as data engineers for building data connectivity and UX developers for building dashboards) and maintain dedicated infrastructure to manage the data processing needs of the application. This can be expensive, resource-intensive, and complex to build.

Embedding traditional BI solutions that are available in the market has limitations as well, because they’re not purpose-built for embedding use cases. Most solutions are server-based, meaning that they’re challenging to scale and require additional infrastructure setup and ongoing maintenance. These solutions also have restrictive, pay-per-server pricing, which doesn’t fully meet the needs of end-users that are consuming applications or portals via a session-based usage model.

A new approach to embedded analytics

At AWS re:Invent 2019, we launched new capabilities in Amazon QuickSight that make it easy to embed analytics into your applications and portals, empowering your customers to gain deeper insights into your application’s data. Unlike building your own analytics solution, which can be time-consuming and hard to scale, QuickSight allows you to quickly embed interactive dashboards and visualizations into your applications without compromising on the ability to personalize the look and feel of these new features.

QuickSight has a serverless architecture that automatically scales your applications from a few to hundreds of thousands of users without the need to build, set up, and manage your own analytics infrastructure. These capabilities allow you to deliver embedded analytics at hyperscale. So, why does hyperscale matter? Traditional BI tools run on a fixed amount of hardware resources, therefore more users, more concurrency, or more complex queries impact performance across all users, which requires you to add more capacity (leading to higher costs).

The following diagram illustrates a traditional architecture, which requires additional servers (and higher upfront cost) to scale.

With QuickSight, you have access to the power and scale of the AWS Cloud. You get auto scaled, consistent performance no matter the concurrency or scale of the userbase, and a truly pay-per-use architecture, meaning you only pay when your users access the dashboards or reports. The following diagram illustrates how QuickSight scales seamlessly with its serverless architecture, powered by the AWS cloud.

Furthermore, QuickSight enables your users to perform machine learning based insights such as anomaly detection, forecasting, and natural language queries. It also has a rich set of APIs that allow you to programmatically manage your analytics workflows, such as moving dashboards across accounts, automating deployments, and managing access for users with single sign-on (SSO).

New features in QuickSight Embedded Analytics

We recently announced the launch of additional embedding capabilities that allow you to do even more with QuickSight embedded analytics. QuickSight now allows you to embed dashboard authoring within applications (such as SaaS applications and enterprise portals), allowing you to empower your end-users to create their own visualizations and reports.

These ad hoc data analysis and self-service data exploration capabilities mean you don’t have to repeatedly create custom dashboards based on requests from your end-users, and can provide end-users with even greater agility and transparency with their data. This capability helps create product differentiation and up-sell opportunities within customer applications.

With this launch, QuickSight also provides namespaces, a multi-tenant capability that allows you to easily maintain data isolation while supporting multiple workloads within the same QuickSight account. For example, if you’re an independent software vendor (ISV), you can now assign dedicated namespaces to different customers within the same QuickSight account. This allows you to securely manage multiple customer workloads as users (authors or readers) within one namespace, and they can only discover and share content with other users within the same namespace, without exposing any data to other parties.

Without namespaces, you could set up your own embedded dashboards for hundreds of thousands of users with QuickSight. For example, see the following dashboard for our fictional company, Oktank Analytica.

With namespaces in place, you can extend this to provide ad-hoc authoring capabilities using curated datasets specific to each customer, created and shared by the developer or ISV. See the following screenshot.

For more information about these new features, see Embed multi-tenant analytics in applications with Amazon QuickSight.

Customer success stories

Customers are already using embedded analytics in QuickSight to great success. In this section, we share the stories of a few customers.

Blackboard

Blackboard is a leading EdTech company, serving higher education, K-12, business, and government clients around the world.

“The recent wave in digital transformation in the global education community has made it clear that it’s time for a similar transformation in the education analytics tools that support that community,” says Rachel Scherer, Sr. Director of Data & Analytics at Blackboard. “We see a need to support learners, teachers, and leaders in education by helping to change their relationship with data and information—to reduce the distance between information and experience, between ‘informed’ and ‘acting.’

“A large part of this strategy involves embedding information directly where our users are collaborating, teaching, and learning—providing tools and insights that aid in assessment, draw attention to opportunities learners may be missing, and help strategic and academic leadership identify patterns and opportunities for intervention. We’re particularly interested in making the experience of being informed much more intuitive—favoring insight-informed workflows and/or embedded prose over traditional visualizations that require interpretation.

“By removing the step of interpretation, embedded visualizations make insights more useful and actionable. With QuickSight, we were able to deliver on our promise of embedding visualizations quickly, supporting the rapid iteration that we require, at the large scale needed to support our global user community.”

For more information about Blackboard’s QuickSight use case, see the AWS Online Tech Talk Embedding Analytics in your Applications with Amazon QuickSight at the 25:50 mark.

Comcast

Syndication Insights (SI) enables Comcast’s syndicated partners to access the same level of rich data insights that Comcast uses for platform and operational improvements.

“The SI platform enables partners to gain deeper business insights, such as early detection into anomalies for users, while ensuring a seamless experience through embedded, interactive reports,” says Ajay Gavagal, Sr. Manager of Software Development at Comcast. “From the start, scalability was a core requirement for us. We chose QuickSight as it is scalable, enabling SI to extend to multiple syndicated partners without having to provision or manage additional infrastructure. Furthermore, QuickSight provides interactive dashboards that can be easily embedded into an application. Lastly, QuickSight’s rich APIs abstract away a lot of functionality that would otherwise need to be custom built.”

For more information about how Comcast uses QuickSight, see the AWS Online Tech Talk Embedding Analytics in your Applications with Amazon QuickSight at the 38:05 mark.

Panasonic Avionics Corporation

Panasonic Avionics Corporation provides customized in-flight entertainment and communications systems to more than 300 airlines worldwide.

“Our cloud-based solutions collect large amounts of anonymized data that help us optimize the experience for both our airline partners and their passengers,” says Anand Desikan, Director of Cloud Operations at Panasonic Avionics Corporation. “We started using Amazon QuickSight to report on in-flight Wi-Fi performance, and with its rich APIs, pay-per-session pricing, and ability to scale, we quickly rolled out QuickSight dashboards to hundreds of users. The constant evolution of the platform has been impressive: ML-powered anomaly detection, Amazon SageMaker integration, embedding, theming, and cross-visual filtering. Our users consume insights via natural language narratives, which allows them to read all their information right off the dashboard with no complex interpretation needed.”

EHE Health

EHE Health is national preventive health and primary care Center of Excellence provider system.

“As a 106-year-old organization moving toward greater agility and marketplace nimbleness, we needed to drastically upgrade our ability to be transparent within our internal and external ecosystems,” says David Buza, Chief Technology Officer at EHE Health. “With QuickSight, we are not constrained by pre-built BI reports, and can easily customize and track the right operational metrics, such as product utilization, market penetration, and available inventory to gain a holistic view of our business. These inputs help us to understand current performance and future opportunity so that we can provide greater partnership to our clients, while delivering on our brand promise of creating healthier employee populations.

“QuickSight allowed our teams to seamlessly communicate with our clients—all viewing the same information, simultaneously. QuickSight’s embedding capabilities, along with its secure platform, intuitive design, and flexibility, allowed us to service all stakeholders—both internally and externally. This greater flexibility and customization allowed us to fit the client’s needs seamlessly.”

Conclusion

Where data agility and transparency are critical to business success, embedded analytics can open a universe of possibilities, and we are excited to see what our customers will do with these new capabilities.

Additional resources

For more resources, see the following:


About the Author

Dorothy Li is the Vice President and General Manager for Amazon QuickSight.

Building an AWS Glue ETL pipeline locally without an AWS account

Post Syndicated from Adnan Alvee original https://aws.amazon.com/blogs/big-data/building-an-aws-glue-etl-pipeline-locally-without-an-aws-account/

If you’re new to AWS Glue and looking to understand its transformation capabilities without incurring an added expense, or if you’re simply wondering if AWS Glue ETL is the right tool for your use case and want a holistic view of AWS Glue ETL functions, then please continue reading. In this post, we walk you through several AWS Glue ETL functions with supporting examples, using a local PySpark shell in a containerized environment with no AWS artifact dependency. If you’re already familiar with AWS Glue and Apache Spark, you can use this solution as a quick cheat sheet for AWS Glue PySpark validations.

You don’t need an AWS account to follow along with this walkthrough. We use small example datasets for our use case and go through the transformations of several AWS Glue ETL PySpark functions: ApplyMapping, Filter, SplitRows, SelectFields, Join, DropFields, Relationalize, SelectFromCollection, RenameField, Unbox, Unnest, DropNullFields, SplitFields, Spigot and Write Dynamic Frame.

This post provides an introduction of the transformation capabilities of AWS Glue and provides insights towards possible uses of the supported functions. The goal is to get up and running with AWS Glue ETL functions in the shortest possible time, at no cost and without any AWS environment dependency.

Prerequisites

To follow along, you should have the following resources:

  • Basic programming experience
  • Basic Python and Spark knowledge (not required but good to have)
  • A desktop or workstation with Docker installed and running

If you prefer to set up the environment locally outside of a Docker container, you can follow the instructions provided in the GitHub repo, which hosts libraries used in AWS Glue. These libraries extend Apache Spark with additional data types and operations for ETL workflows.

Setting up resources

For this post, we use the amazon/aws-glue-libs:glue_libs_1.0.0_image_01 image from Dockerhub. This image has only been tested for AWS Glue 1.0 spark shell (PySpark). Additionally, this image also supports Jupyter and Zeppelin notebooks and a CLI interpreter. For the purpose of this post, we use the CLI interpreter. For more information on the container, please read Developing AWS Glue ETL jobs locally using a container.

To pull the relevant image from the Docker repository, enter the following command in a terminal prompt:

docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

To test on the command prompt, enter the following code:

docker run -itd --name glue_without_notebook amazon/aws-glue-libs:glue_libs_1.0.0_image_01
docker exec -it glue_without_notebook bash
/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/pyspark

To test on Jupyter notebooks, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter \amazon/aws-glue-libs:glue_libs_1.0.0_image_01 \
/home/jupyter/jupyter_start.sh

Browse to ‘localhost:8888’ in a browser to open Jupyter notebooks.

Importing GlueContext

To get started, enter the following import statements in the PySpark shell. We import GlueContext, which wraps the Spark SQLContext, thereby providing mechanisms to interact with Apache Spark:

import sys
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.transforms import *
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.types import *
from pyspark.sql import Row
glueContext = GlueContext(SparkContext.getOrCreate())

Dataset 1

We first generate a Spark DataFrame consisting of dummy data of an order list for a fictional company. We process the data using AWS Glue PySpark functions.

Enter the following code into the shell:

order_list = [
               ['1005', '623', 'YES', '1418901234', '75091'],\
               ['1006', '547', 'NO', '1418901256', '75034'],\
               ['1007', '823', 'YES', '1418901300', '75023'],\
               ['1008', '912', 'NO', '1418901400', '82091'],\
               ['1009', '321', 'YES', '1418902000', '90093']\
             ]

# Define schema for the order_list
order_schema = StructType([  
                      StructField("order_id", StringType()),
                      StructField("customer_id", StringType()),
                      StructField("essential_item", StringType()),
                      StructField("timestamp", StringType()),
                      StructField("zipcode", StringType())
                    ])

# Create a Spark Dataframe from the python list and the schema
df_orders = spark.createDataFrame(order_list, schema = order_schema)

The following .show() command allows us to view the DataFrame in the shell:

df_orders.show()

# Output
+--------+-----------+--------------+----------+-------+
|order_id|customer_id|essential_item| timestamp|zipcode|
+--------+-----------+--------------+----------+-------+
|    1005|        623|           YES|1418901234|  75091|
|    1006|        547|            NO|1418901256|  75034|
|    1007|        823|           YES|1418901300|  75023|
|    1008|        912|            NO|1418901400|  82091|
|    1009|        321|           YES|1418902000|  90093|
+--------+-----------+--------------+----------+-------+

DynamicFrame

A DynamicFrame is similar to a DataFrame, except that each record is self-describing, so no schema is required initially. Instead, AWS Glue computes a schema on-the-fly when required. We convert the df_orders DataFrame into a DynamicFrame.

Enter the following code in the shell:

dyf_orders = DynamicFrame.fromDF(df_orders, glueContext, "dyf") 

Now that we have our Dynamic Frame, we can start working with the datasets with AWS Glue transform functions.

ApplyMapping

The columns in our data might be in different formats, and you may want to change their respective names. ApplyMapping is the best option for changing the names and formatting all the columns collectively. For our dataset, we change some of the columns to Long from String format to save storage space later. We also shorten the column zipcode to zip. See the following code:

# Input 
dyf_applyMapping = ApplyMapping.apply( frame = dyf_orders, mappings = [ 
  ("order_id","String","order_id","Long"), 
  ("customer_id","String","customer_id","Long"),
  ("essential_item","String","essential_item","String"),
  ("timestamp","String","timestamp","Long"),
  ("zipcode","String","zip","Long")
])

dyf_applyMapping.printSchema()

# Output
root
|-- order_id: long
|-- customer_id: long
|-- essential_item: string
|-- timestamp: long
|-- zip: long

Filter

We now want to prioritize our order delivery for essential items. We can achieve that using the Filter function:

# Input 
dyf_filter = Filter.apply(frame = dyf_applyMapping, f = lambda x: x["essential_item"] == 'YES')

dyf_filter.toDF().show()

# Output 
+--------------+-----------+-----+----------+--------+
|essential_item|customer_id|  zip| timestamp|order_id|
+--------------+-----------+-----+----------+--------+
|           YES|        623|75091|1418901234|    1005|
|           YES|        823|75023|1418901300|    1007|
|           YES|        321|90093|1418902000|    1009|
+--------------+-----------+-----+----------+--------+

Map

Map allows us to apply a transformation to each record of a Dynamic Frame. For our case, we want to target a certain zip code for next day air shipping. We implement a simple “next_day_air” function and pass it to the Dynamic Frame:

# Input 

# This function takes in a dynamic frame record and checks if zipcode # 75034 is present in it. If present, it adds another column 
# “next_day_air” with value as True

def next_day_air(rec):
  if rec["zip"] == 75034:
    rec["next_day_air"] = True
  return rec

mapped_dyF =  Map.apply(frame = dyf_applyMapping, f = next_day_air)

mapped_dyF.toDF().show()

# Output
+--------------+-----------+-----+----------+--------+------------+
|essential_item|customer_id|  zip| timestamp|order_id|next_day_air|
+--------------+-----------+-----+----------+--------+------------+
|           YES|        623|75091|1418901234|    1005|        null|
|            NO|        547|75034|1418901256|    1006|        TRUE|
|           YES|        823|75023|1418901300|    1007|        null|
|            NO|        912|82091|1418901400|    1008|        null|
|           YES|        321|90093|1418902000|    1009|        null|
+--------------+-----------+-----+----------+--------+------------+

Dataset 2

To ship essential orders to the appropriate addresses, we need customer data. We demonstrate this by generating a custom JSON dataset consisting of zip codes and customer addresses. In this use case, this data represents the customer data of the company that we want to join later on.

We generate JSON strings consisting of customer data and use the Spark json function to convert them to a JSON structure (enter each jsonStr variable one at a time in case the terminal errors out):

# Input 
jsonStr1 = u'{ "zip": 75091, "customers": [{ "id": 623, "address": "108 Park Street, TX"}, { "id": 231, "address": "763 Marsh Ln, TX" }]}'
jsonStr2 = u'{ "zip": 82091, "customers": [{ "id": 201, "address": "771 Peek Pkwy, GA" }]}'
jsonStr3 = u'{ "zip": 75023, "customers": [{ "id": 343, "address": "66 P Street, NY" }]}'
jsonStr4 = u'{ "zip": 90093, "customers": [{ "id": 932, "address": "708 Fed Ln, CA"}, { "id": 102, "address": "807 Deccan Dr, CA" }]}'
df_row = spark.createDataFrame([
  Row(json=jsonStr1),
  Row(json=jsonStr2),
  Row(json=jsonStr3),
  Row(json=jsonStr4)
])

df_json = spark.read.json(df_row.rdd.map(lambda r: r.json))
df_json.show()

# Output
+-----------------------------------------------------+-----+
|customers                                            |zip  |
+-----------------------------------------------------+-----+
|[[108 Park Street, TX, 623], [763 Marsh Ln, TX, 231]]|75091|
|[[771 Peek Pkwy, GA, 201]]                           |82091|
|[[66 P Street, NY, 343]]                             |75023|
|[[708 Fed Ln, CA, 932], [807 Deccan Dr, CA, 102]]    |90093|
+-----------------------------------------------------+-----+
# Input
df_json.printSchema()

# Output
root
 |-- customers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- id: long (nullable = true)
 |-- zip: long (nullable = true)

To convert the DataFrame back to a DynamicFrame to continue with our operations, enter the following code:

# Input
dyf_json = DynamicFrame.fromDF(df_json, glueContext, "dyf_json")

SelectFields

To join with the order list, we don’t need all the columns, so we use the SelectFields function to shortlist the columns we need. In our use case, we need the zip code column, but we can add more columns as the argument paths accepts a list:

# Input
dyf_selectFields = SelectFields.apply(frame = dyf_filter, paths=['zip'])

dyf_selectFields.toDF().show()

# Output
+-----+
|  zip|
+-----+
|75091|
|75023|
|90093|
+-----+

Join

The Join function is straightforward and manages duplicate columns. We had two columns named zip from both datasets. AWS Glue added a period (.) in one of the duplicate column names to avoid errors:

# Input
dyf_join = Join.apply(dyf_json, dyf_selectFields, 'zip', 'zip')
dyf_join.toDF().show()

# Output
+--------------------+-----+-----+
|           customers| .zip|  zip|
+--------------------+-----+-----+
|[[108 Park Street...|75091|75091|
|[[66 P Street, NY...|75023|75023|
|[[708 Fed Ln, CA,...|90093|90093|
+--------------------+-----+-----+

DropFields

Because we don’t need two columns with the same name, we can use DropFields to drop one or multiple columns all at once. The backticks (`) around .zip inside the function call are needed because the column name contains a period (.):

# Input
dyf_dropfields = DropFields.apply(
  frame = dyf_join,
  paths = "`.zip`"
)

dyf_dropfields.toDF().show()

# Output
+--------------------+-----+
|           customers|  zip|
+--------------------+-----+
|[[108 Park Street...|75091|
|[[66 P Street, NY...|75023|
|[[708 Fed Ln, CA,...|90093|
+--------------------+-----+

Relationalize

The Relationalize function can flatten nested structures and create multiple dynamic frames. Our customer column from the previous operation is a nested structure, and Relationalize can convert it into multiple flattened DynamicFrames:

# Input
dyf_relationize = dyf_dropfields.relationalize("root", "/home/glue/GlueLocalOutput")

To see the DynamicFrames, we can’t run a .show() yet because it’s a collection. We need to check what keys are present. See the following code:

# Input
dyf_relationize.keys()

# Output
dict_keys(['root', 'root_customers'])

In the follow-up function in the next section, we show how to pick the DynamicFrame from a collection of multiple DynamicFrames.

SelectFromCollection

The SelectFromCollection function allows us to retrieve the specific DynamicFrame from a collection of DynamicFrames. For this use case, we retrieve both DynamicFrames from the previous operation using this function.

To retrieve the first DynamicFrame, enter the following code:

# Input
dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'root')

dyf_selectFromCollection.toDF().show()

# Output
+---------+-----+
|customers|  zip|
+---------+-----+
|        1|75091|
|        2|75023|
|        3|90093|
+---------+-----+

To retrieve the second DynamicFrame, enter the following code:

# Input
dyf_selectFromCollection = SelectFromCollection.apply(dyf_relationize, 'root_customers')

dyf_selectFromCollection.toDF().show()

# Output
+---+-----+---------------------+----------------+
| id|index|customers.val.address|customers.val.id|
+---+-----+---------------------+----------------+
|  2|    0|      66 P Street, NY|             343|
|  3|    0|       708 Fed Ln, CA|             932|
|  3|    1|    807 Deccan Dr, CA|             102|
|  1|    0|  108 Park Street, TX|             623|
|  1|    1|     763 Marsh Ln, TX|             231|
+---+-----+---------------------+----------------+

RenameField

The second DynamicFrame we retrieved from the previous operation introduces a period (.) into our column names and is very lengthy. We can change that using the RenameField function:

# Input
dyf_renameField_1 = RenameField.apply(dyf_selectFromCollection, "`customers.val.address`", "address")

dyf_renameField_2 = RenameField.apply(dyf_renameField_1, "`customers.val.id`", "cust_id")

dyf_dropfields_rf = DropFields.apply(
  frame = dyf_renameField_2,
  paths = ["index", "id"]
)

dyf_dropfields_rf.toDF().show()

# Output
+-------------------+-------+
|            address|cust_id|
+-------------------+-------+
|    66 P Street, NY|    343|
|     708 Fed Ln, CA|    932|
|  807 Deccan Dr, CA|    102|
|108 Park Street, TX|    623|
|   763 Marsh Ln, TX|    231|
+-------------------+-------+

ResolveChoice

ResloveChoice can gracefully handle column type ambiguities. For more information about the full capabilities of ResolveChoice, see the GitHub repo.

# Input
dyf_resolveChoice = dyf_dropfields_rf.resolveChoice(specs = [('cust_id','cast:String')])

dyf_resolveChoice.printSchema()

# Output
root
|-- address: string
|-- cust_id: string

Dataset 3

We generate another dataset to demonstrate a few other functions. In this use case, the company’s warehouse inventory data is in a nested JSON structure, which is initially in a String format. See the following code:

# Input
warehouse_inventory_list = [
              ['TX_WAREHOUSE', '{\
                          "strawberry":"220",\
                          "pineapple":"560",\
                          "mango":"350",\
                          "pears":null}'
               ],\
              ['CA_WAREHOUSE', '{\
                         "strawberry":"34",\
                         "pineapple":"123",\
                         "mango":"42",\
                         "pears":null}\
              '],
    		   ['CO_WAREHOUSE', '{\
                         "strawberry":"340",\
                         "pineapple":"180",\
                         "mango":"2",\
                         "pears":null}'
              ]
            ]


warehouse_schema = StructType([StructField("warehouse_loc", StringType())\
                              ,StructField("data", StringType())])

df_warehouse = spark.createDataFrame(warehouse_inventory_list, schema = warehouse_schema)
dyf_warehouse = DynamicFrame.fromDF(df_warehouse, glueContext, "dyf_warehouse")

dyf_warehouse.printSchema()

# Output
root
|-- warehouse_location: string
|-- data: string

Unbox

We use Unbox to extract JSON from String format for the new data. Compare the preceding printSchema() output with the following code:

# Input
dyf_unbox = Unbox.apply(frame = dyf_warehouse, path = "data", format="json")
dyf_unbox.printSchema()
# Output
root
|-- warehouse_loc: string
|-- data: struct
|    |-- strawberry: int
|    |-- pineapple: int
|    |-- mango: int
|    |-- pears: null

# Input 
dyf_unbox.toDF().show()

# Output
+-------------+----------------+
|warehouse_loc|            data|
+-------------+----------------+
| TX_WAREHOUSE|[220, 560, 350,]|
| CA_WAREHOUSE|  [34, 123, 42,]|
| CO_WAREHOUSE|  [340, 180, 2,]|
+-------------+----------------+

Unnest

Unnest allows us to flatten a single DynamicFrame to a more relational table format. We apply Unnest to the nested structure from the previous operation and flatten it:

# Input
dyf_unnest = UnnestFrame.apply(frame = dyf_unbox)

dyf_unnest.printSchema()

# Output 
root
|-- warehouse_loc: string
|-- data.strawberry: int
|-- data.pineapple: int
|-- data.mango: int
|-- data.pears: null

dyf_unnest.toDF().show()

# Output
+-------------+---------------+--------------+----------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|data.pears|
+-------------+---------------+--------------+----------+----------+
| TX_WAREHOUSE|            220|           560|       350|      null|
| CA_WAREHOUSE|             34|           123|        42|      null|
| CO_WAREHOUSE|            340|           180|         2|      null|
+-------------+---------------+--------------+----------+----------+

DropNullFields

The DropNullFields function makes it easy to drop columns with all null values. Our warehouse data indicated that it was out of pears and can be dropped. We apply the DropNullFields function on the DynamicFrame, which automatically identifies the columns with null values and drops them:

# Input
dyf_dropNullfields = DropNullFields.apply(frame = dyf_unnest)

dyf_dropNullfields.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| TX_WAREHOUSE|            220|           560|       350|
| CA_WAREHOUSE|             34|           123|        42|
| CO_WAREHOUSE|            340|           180|         2|
+-------------+---------------+--------------+----------+

SplitFields

SplitFields allows us to split a DyanmicFrame into two. The function takes the field names of the first DynamicFrame that we want to generate followed by the names of the two DynamicFrames:

# Input
dyf_splitFields = SplitFields.apply(frame = dyf_dropNullfields, paths = ["`data.strawberry`", "`data.pineapple`"], name1 = "a", name2 = "b")

For the first DynamicFrame, see the following code:

# Input
dyf_retrieve_a = SelectFromCollection.apply(dyf_splitFields, "a")
dyf_retrieve_a.toDF().show()

# Output
+---------------+--------------+
|data.strawberry|data.pineapple|
+---------------+--------------+
|            220|           560|
|             34|           123|
|            340|           180|
+---------------+--------------+

For the second Dynamic Frame, see the following code:

# Input
dyf_retrieve_b = SelectFromCollection.apply(dyf_splitFields, "b")
dyf_retrieve_b.toDF().show()

# Output
+-------------+----------+
|warehouse_loc|data.mango|
+-------------+----------+
| TX_WAREHOUSE|       350|
| CA_WAREHOUSE|        42|
| CO_WAREHOUSE|         2|
+-------------+----------+

SplitRows

SplitRows allows us to filter our dataset within a specific range of counts and split them into two DynamicFrames:

# Input
dyf_splitRows = SplitRows.apply(frame = dyf_dropNullfields, comparison_dict = {"`data.pineapple`": {">": "100", "<": "200"}}, name1 = 'pa_200_less', name2 = 'pa_200_more')

For the first Dynamic Frame, see the following code:

# Input
dyf_pa_200_less = SelectFromCollection.apply(dyf_splitRows, 'pa_200_less')
dyf_pa_200_less.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| CA_WAREHOUSE|             34|           123|        42|
| CO_WAREHOUSE|            340|           180|         2|
+-------------+---------------+--------------+----------+

For the second Dynamic Frame, see the following code:

# Input
dyf_pa_200_more = SelectFromCollection.apply(dyf_splitRows, 'pa_200_more')
dyf_pa_200_more.toDF().show()

# Output
+-------------+---------------+--------------+----------+
|warehouse_loc|data.strawberry|data.pineapple|data.mango|
+-------------+---------------+--------------+----------+
| TX_WAREHOUSE|            220|           560|       350|
+-------------+---------------+--------------+----------+

Spigot

Spigot allows you to write a sample dataset to a destination during transformation. For our use case, we write the top 10 records locally:

# Input
dyf_splitFields = Spigot.apply(dyf_pa_200_less, '/home/glue/GlueLocalOutput/Spigot/', 'top10')

Depending on your local environment configuration, Spigot may run into errors. Alternatively, you can use an AWS Glue endpoint or an AWS Glue ETL job to run this function.

Write Dynamic Frame

The write_dynamic_frame function writes a DynamicFrame using the specified connection and format. For our use case, we write locally (we use a connection_type of S3 with a POSIX path argument in connection_options, which allows writing to local storage):

# Input
glueContext.write_dynamic_frame.from_options(\
frame = dyf_splitFields,\
connection_options = {'path': '/home/glue/GlueLocalOutput/'},\
connection_type = 's3',\
format = 'json')

Conclusion

This article discussed the PySpark ETL capabilities of AWS Glue. Further testing with an AWS Glue development endpoint or directly adding jobs in AWS Glue is a good pivot to take the learning forward. For more information, see General Information about Programming AWS Glue ETL Scripts.


About the Authors

Adnan Alvee is a Big Data Architect for AWS ProServe Remote Consulting Services. He helps build solutions for customers leveraging their data and AWS services. Outside of AWS, he enjoys playing badminton and drinking chai.

 

 

Imtiaz (Taz) Sayed is the World Wide Tech Leader for Data Analytics at AWS. He is an ardent data engineer and relishes connecting with the data analytics community.

 

How Our Paths Brought Us to Data and Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/how-our-paths-brought-us-to-data-and-netflix-4eced44a6872

Part of our series on who works in Analytics at Netflix — and what the role entails

by Julie Beckley & Chris Pham

This Q&A provides insights into the diverse set of skills, projects, and culture within Data Science and Engineering (DSE) at Netflix through the eyes of two team members: Chris Pham and Julie Beckley.

Photo from a team curling offsite — There’s us to the right!

[Chris] Julie and I joined the Streaming DSE team at Netflix a few years ago and have been close colleagues and friends since then. At work, we regularly lean on each other for help based on our respective areas of expertise — I bring my breadth of big data tools and technologies while Julie has been building statistical models for the past decade. Outside of work, we share a love of good food and coffee, exchanging tips on making espresso.

1. What was your path to working in data?

[Julie] I took a traditional path to data science. Since mathematics was my favorite subject in school, I decided to pursue it for my bachelors degree at McGill University (while indulging in French culture in the beautiful city of Montreal). Over the course of the four years it became clear that I enjoyed combining analytical skills with solving real world problems, so a PhD in Statistics was a natural next step. After completing my education, I was still not certain whether I wanted a job in academia or industry. I took a role as a Research Staff Member at IBM Research, which served as a middle ground with a joint focus on real world applications, academic research, and even allowed me to teach a graduate Machine Learning course! I then transitioned to a full industry role at Netflix.

[Chris] I initially wanted to build a career in consulting after receiving my graduate degree in Economics because I had a passion for analytical problem solving and statistical modeling. A role in data science eventually seemed like a natural transition, but it wasn’t without its hurdles: With my consulting background, I had to go through a few other roles first while learning how to code on the side. A lot of my learning and training was self-guided until 2016, when a manager at my last company took a chance on me and helped me make the rare transfer from a role in HR to Data Science.

2. Tell me about some of the exciting projects you’re a part of.

[Julie] Chris and I have the same primary stakeholders (or engineering team that we support): Encoding Technologies. They are continuously innovating compression algorithms to efficiently send high quality audio and video files to our customers over the internet. I focus on improving experimentation methodology to test how well the newest files are working: do they need less bits to stream while providing a higher video quality? Do they cause less errors? My work is typically developed in R or Python. I love the cross-functional nature of my work, as it allows me to learn from others and creatively explore new statistical methodologies to improve the Netflix service.

[Chris] When I first started working with Encoding Technologies, there was so much data waiting to be translated into actionable insights. It was fun starting from almost nothing and transforming all of that data into self-serve tools and dashboards for the team to understand their contribution to the Netflix streaming experience. These projects have involved using Spark, Python, SQL, Tableau, and Jupyter notebooks. Over the last year, I’ve spent a lot of time analyzing data to inform how we roll out new encoding innovations to the diverse ecosystem of devices that stream Netflix.

3. How do your projects impact the business at Netflix?

[Julie] Encoding experimentation (and more broadly, streaming experimentation) is critical for ensuring our customers have a good Quality of Experience when watching Netflix. In other words, the content you’re about to watch needs to load quickly with high video quality. When we test new encodes, we need effective data science methods to quickly and accurately understand whether customers are having a better experience. With these insights, the engineering teams can quickly understand what’s working well and what needs to be improved. It’s super exciting to see the impact of my work when I hear from friends and family that Netflix is streaming well for them!

[Chris] There’s a lot of things to consider when we roll out a new compression algorithm. Which devices get this treatment? What is the benefit to the streaming experience? Is the benefit uniform, or do certain cohorts of members — such as those who stream over a cellular connection — benefit more? How does a decision of this scale affect the efficiency of our globally distributed content delivery network, Open Connect? It’s one big optimization problem that requires balancing several different factors. Streaming DSE is at the center of it all, bringing together different teams at Netflix and using data to drive decisions that impact our members around the world.

4. What does it take to succeed at Netflix in a data role?

[Julie] One of the special things about working at Netflix is that a diverse set of skills and backgrounds is truly appreciated, since there are many ways to add value to the company. From my experience, being proactive in pushing forward on your ideas is key. The values in the Netflix culture document allow for a framework where everyone is a leader to work well — this is because we expect initiative, direct and candid feedback, and transparency in everything we do. This leads to a great environment where I am constantly challenged, learning, and receiving constructive feedback on how I can do better!

[Chris] I think a big part of our jobs is continuously thinking about how data can benefit our stakeholders. Julie and I will never know as much about video and audio compression algorithms as our talented Encoding Technologies team, but we should be the ones most familiar with the data: How to access, analyze, and visualize it; how to transform it into metrics that act as strong and accurate proxies for a member’s experience; and how to guide others to draw the right conclusions from data so they can act on it. Writing memos is a big part of Netflix culture, which I’ve found has been helpful for sharing ideas, soliciting feedback, and documenting project details. So writing well, especially the ability to translate technical concepts for a non-technical audience, is also very useful.

5. What piece of advice would you pass along to those just starting out their career in data?

[Julie] One piece of advice I would pass along (and wish I could give to my younger self) is not to stress and try to plan every step of your data science career. Your career is long (and unpredictable!), so as long as you work hard and stay motivated, it will move in an exciting direction.

[Chris] Everyone wants to build fancy models or tools, but fewer are willing to do the foundational things like cleaning the data and writing the documentation. I’ve found that volunteering and being proactive (no matter the task) has been an effective way of building trust with others, and it opened my career up to many more opportunities early on.

If this post resonates with you and you’d like to explore opportunities with Netflix, check out our analytics site, search open roles, and learn about our culture. You can also find more stories like this here.


How Our Paths Brought Us to Data and Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Analytics at Netflix: Who we are and what we do

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/analytics-at-netflix-who-we-are-and-what-we-do-7d9c08fe6965

Analytics at Netflix: Who We Are and What We Do

An Introduction to Analytics and Visualization Engineering at Netflix

by Molly Jackman & Meghana Reddy

Explained: Season 1 (Photo Credit: Netflix)

Across nearly every industry, there is recognition that data analytics is key to driving informed business decision-making. But there is far less agreement on what that term “data analytics” actually means — or what to call the people responsible for the work.

Even within Netflix, we have many groups that do some form of data analysis, including business strategy and consumer insights. But here we are talking about Netflix’s Data Science and Engineering group, which specializes in analytics at scale. The group has technical, engineering-oriented roles that fall under two broad category titles: “Analytics Engineers” and “Visualization Engineers.” In this post, we refer to these two titles collectively as the “analytics role.” These professionals come from a wide range of backgrounds and bring different skills to their work, while sharing a common drive to generate and scale business impact through data.

Individuals in these roles possess deep business context and are thought leaders alongside their business counterparts. This enables them to fully understand where their partners are coming from.

What’s the purpose of the analytics role at Netflix?

When you think about data at Netflix, what comes to mind? Oftentimes it is our content recommendation algorithm or the online delivery of video to your device at home. Both are integral parts of the business, but far from the whole picture. Data is used to inform a wide range of questions — ‘How can we make the product experience even better?’, ‘Which shows and films bring the most joy to our members?’, ‘Who can we partner with to expand access to our service in new markets?’. Our Analytics and Visualization Engineers are taking on these and other big questions for the company, informing decision-making across every corner of the business.

We align our analytic teams with business area verticals
We align our analytic teams with business area verticals

Since the problem space is so varied, we align our analytics professionals with the listed business area verticals rather than organizing them within a single functional horizontal. The expectation is that individuals in these roles possess deep business context and are thought leaders alongside their business counterparts. This enables them to fully understand where their partners are coming from. It also means Analytics and Visualization Engineers are a specialized resource and a rare commodity. There are many more questions and stakeholders than analytics team members, and the job is not to take on every request. Instead, these individual contributors are given freedom to choose their projects and are responsible for prioritizing the ones that will have the most business impact (and deprioritizing the rest). This requires a lot of judgment and embodies our “context not control” culture.

“OK, but what do they actually do…?”

What does the job entail?

You’ve probably caught on to some common themes: People in the analytics role are highly connected to the business, solve end-to-end problems, and are directly responsible for improving business outcomes. But what makes this group really shine are their differences. They come from lots of backgrounds, which yields different perspectives on how to approach problems. We use the catch-all titles of Analytics and Visualization Engineers so as to not get too hung up on specific credentials. Instead, people are empowered to leverage their unique skills to make Netflix better.

A couple other defining characteristics of the role are full ownership of the problem (in Netflix lingo, you are the “informed captain” of your space) and creating trustworthy outputs. These are only possible through the one-two punch of deep business context 👊 and technical excellence 👊. Full ownership often means building new data pipelines, navigating complex schemas and large data sets, developing or improving metrics for business performance, and creating intuitive visualizations and dashboards — always with an eye towards actionable insights.

We use the catch-all titles of Analytics and Visualization Engineers so as to not get too hung up on specific credentials. Instead, people are empowered to leverage their unique skills to make Netflix better.

Because these professionals vary in their expertise, so too does their day-to-day. Below are three broadly defined personas to help illustrate some of the different backgrounds, motivations, and activities of individuals in the analytics role at Netflix. Many of our colleagues have come in with expertise that spans multiple personas. Others have grown into new areas as part of their professional development at Netflix. Ultimately, these skills are all on a continuum, some broad and some deep, and these are just a few examples of such expertise. So if you find yourself connecting with any part of these descriptions, the analytics role could be for you.

  • The Analyst is motivated by delivering metrics, findings, or dashboards that drive analytical insights and business decisions. They love to communicate their discoveries to nontechnical audiences, explain caveats, and debate analytic choices and strategic implications with peers and stakeholders. Their expertise is descriptive analytic methodology, but they have the necessary tools to be scrappy (e.g. coding, math, stats), and do what’s required to answer the highest priority business questions.
  • The Engineer enjoys making data available by piping it in from new sources in optimal ways, building robust data models, prototyping systems, and doing project-specific engineering. They’re still analysts at heart but, similar to data engineers, they have a deep understanding of data warehouse capabilities and are pros at data processing optimization and performance tuning. Being at this intersection of disciplines allows them to produce full-stack outputs, layering visualizations and analytics on their projects.
  • The Visualizer is passionate about the scalability, beauty, and functionality of dashboards and their capability for telling a visual story. They also have an eye for principled engineering, i.e. managing the data under the surface. They want to pick the perfect chart type for the narrative while also focusing on delivering key analytic insights. They may use industry tools (e.g. Tableau, Looker, Power BI) to their fullest extent, developing a deeper understanding of analytics by examining these tools under the hood. Or they may create sophisticated visuals from scratch and build the type of custom UI that enterprise tools don’t offer (e.g. JavaScript web apps).

Introducing Analytics at Netflix

Whether you’re a data professional, student, or Netflix enthusiast, we invite you to meet our stunning colleagues and hear their stories. If this series resonates with you and you’d like to explore opportunities with us, check out our analytics site, search open roles, and learn about our culture.

Welcome to Analytics at Netflix!

Related Posts:


Analytics at Netflix: Who we are and what we do was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

How to delete user data in an AWS data lake

Post Syndicated from George Komninos original https://aws.amazon.com/blogs/big-data/how-to-delete-user-data-in-an-aws-data-lake/

General Data Protection Regulation (GDPR) is an important aspect of today’s technology world, and processing data in compliance with GDPR is a necessity for those who implement solutions within the AWS public cloud. One article of GDPR is the “right to erasure” or “right to be forgotten” which may require you to implement a solution to delete specific users’ personal data.

In the context of the AWS big data and analytics ecosystem, every architecture, regardless of the problem it targets, uses Amazon Simple Storage Service (Amazon S3) as the core storage service. Despite its versatility and feature completeness, Amazon S3 doesn’t come with an out-of-the-box way to map a user identifier to S3 keys of objects that contain user’s data.

This post walks you through a framework that helps you purge individual user data within your organization’s AWS hosted data lake, and an analytics solution that uses different AWS storage layers, along with sample code targeting Amazon S3.

Reference architecture

To address the challenge of implementing a data purge framework, we reduced the problem to the straightforward use case of deleting a user’s data from a platform that uses AWS for its data pipeline. The following diagram illustrates this use case.

We’re introducing the idea of building and maintaining an index metastore that keeps track of the location of each user’s records and allows us locate to them efficiently, reducing the search space.

You can use the following architecture diagram to delete a specific user’s data within your organization’s AWS data lake.

For this initial version, we created three user flows that map each task to a fitting AWS service:

Flow 1: Real-time metastore update

The S3 ObjectCreated or ObjectDelete events trigger an AWS Lambda function that parses the object and performs an add/update/delete operation to keep the metadata index up to date. You can implement a simple workflow for any other storage layer, such as Amazon Relational Database Service (RDS), Amazon Aurora, or Amazon Elasticsearch Service (ES). We use Amazon DynamoDB and Amazon RDS for PostgreSQL as the index metadata storage options, but our approach is flexible to any other technology.

Flow 2: Purge data

When a user asks for their data to be deleted, we trigger an AWS Step Functions state machine through Amazon CloudWatch to orchestrate the workflow. Its first step triggers a Lambda function that queries the metadata index to identify the storage layers that contain user records and generates a report that’s saved to an S3 report bucket. A Step Functions activity is created and picked up by a Lambda Node JS based worker that sends an email to the approver through Amazon Simple Email Service (SES) with approve and reject links.

The following diagram shows a graphical representation of the Step Function state machine as seen on the AWS Management Console.

The approver selects one of the two links, which then calls an Amazon API Gateway endpoint that invokes Step Functions to resume the workflow. If you choose the approve link, Step Functions triggers a Lambda function that takes the report stored in the bucket as input, deletes the objects or records from the storage layer, and updates the index metastore. When the purging job is complete, Amazon Simple Notification Service (SNS) sends a success or fail email to the user.

The following diagram represents the Step Functions flow on the console if the purge flow completed successfully.

For the complete code base, see step-function-definition.json in the GitHub repo.

Flow 3: Batch metastore update

This flow refers to the use case of an existing data lake for which index metastore needs to be created. You can orchestrate the flow through AWS Step Functions, which takes historical data as input and updates metastore through a batch job. Our current implementation doesn’t include a sample script for this user flow.

Our framework

We now walk you through the two use cases we followed for our implementation:

  • You have multiple user records stored in each Amazon S3 file
  • A user has records stored in homogenous AWS storage layers

Within these two approaches, we demonstrate alternatives that you can use to store your index metastore.

Indexing by S3 URI and row number

For this use case, we use a free tier RDS Postgres instance to store our index. We created a simple table with the following code:

CREATE UNLOGGED TABLE IF NOT EXISTS user_objects (
				userid TEXT,
				s3path TEXT,
				recordline INTEGER
			);

You can index on user_id to optimize query performance. On object upload, for each row, you need to insert into the user_objects table a row that indicates the user ID, the URI of the target Amazon S3 object, and the row that corresponds to the record. For instance, when uploading the following JSON input, enter the following code:

{"user_id":"V34qejxNsCbcgD8C0HVk-Q","body":"…"}
{"user_id":"ofKDkJKXSKZXu5xJNGiiBQ","body":"…"}
{"user_id":"UgMW8bLE0QMJDCkQ1Ax5Mg","body ":"…"}

We insert the tuples into user_objects in the Amazon S3 location s3://gdpr-demo/year=2018/month=2/day=26/input.json. See the following code:

(“V34qejxNsCbcgD8C0HVk-Q”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 0)
(“ofKDkJKXSKZXu5xJNGiiBQ”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 1)
(“UgMW8bLE0QMJDCkQ1Ax5Mg”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 2)

You can implement the index update operation by using a Lambda function triggered on any Amazon S3 ObjectCreated event.

When we get a delete request from a user, we need to query our index to get some information about where we have stored the data to delete. See the following code:

SELECT s3path,
                ARRAY_AGG(recordline)
                FROM user_objects
                WHERE userid = ‘V34qejxNsCbcgD8C0HVk-Q’
                GROUP BY;

The preceding example SQL query returns rows like the following:

(“s3://gdpr-review/year=2015/month=12/day=21/review-part-0.json“, {2102,529})

The output indicates that lines 529 and 2102 of S3 object s3://gdpr-review/year=2015/month=12/day=21/review-part-0.json contain the requested user’s data and need to be purged. We then need to download the object, remove those rows, and overwrite the object. For a Python implementation of the Lambda function that implements this functionality, see deleteUserRecords.py in the GitHub repo.

Having the record line available allows you to perform the deletion efficiently in byte format. For implementation simplicity, we purge the rows by replacing the deleted rows with an empty JSON object. You pay a slight storage overhead, but you don’t need to update subsequent row metadata in your index, which would be costly. To eliminate empty JSON objects, we can implement an offline vacuum and index update process.

Indexing by file name and grouping by index key

For this use case, we created a DynamoDB table to store our index. We chose DynamoDB because of its ease of use and scalability; you can use its on-demand pricing model so you don’t need to guess how many capacity units you might need. When files are uploaded to the data lake, a Lambda function parses the file name (for example, 1001-.csv) to identify the user identifier and populates the DynamoDB metadata table. Userid is the partition key, and each different storage layer has its own attribute. For example, if user 1001 had data in Amazon S3 and Amazon RDS, their records look like the following code:

{"userid:": 1001, "s3":{"s3://path1", "s3://path2"}, "RDS":{"db1.table1.column1"}}

For a sample Python implementation of this functionality, see update-dynamo-metadata.py in the GitHub repo.

On delete request, we query the metastore table, which is DynamoDB, and generate a purge report that contains details on what storage layers contain user records, and storage layer specifics that can speed up locating the records. We store the purge report to Amazon S3. For a sample Lambda function that implements this logic, see generate-purge-report.py in the GitHub repo.

After the purging is approved, we use the report as input to delete the required resources. For a sample Lambda function implementation, see gdpr-purge-data.py in the GitHub repo.

Implementation and technology alternatives

We explored and evaluated multiple implementation options, all of which present tradeoffs, such as implementation simplicity, efficiency, critical data compliance, and feature completeness:

  • Scan every record of the data file to create an index – Whenever a file is uploaded, we iterate through its records and generate tuples (userid, s3Uri, row_number) that are then inserted to our metadata storing layer. On delete request, we fetch the metadata records for requested user IDs, download the corresponding S3 objects, perform the delete in place, and re-upload the updated objects, overwriting the existing object. This is the most flexible approach because it supports a single object to store multiple users’ data, which is a very common practice. The flexibility comes at a cost because it requires downloading and re-uploading the object, which introduces a network bottleneck in delete operations. User activity datasets such as customer product reviews are a good fit for this approach, because it’s unexpected to have multiple records for the same user within each partition (such as a date partition), and it’s preferable to combine multiple users’ activity in a single file. It’s similar to what was described in the section “Indexing by S3 URI and row number” and sample code is available in the GitHub repo.
  • Store metadata as file name prefix – Adding the user ID as the prefix of the uploaded object under the different partitions that are defined based on query pattern enables you to reduce the required search operations on delete request. The metadata handling utility finds the user ID from the file name and maintains the index accordingly. This approach is efficient in locating the resources to purge but assumes a single user per object, and requires you to store user IDs within the filename, which might require InfoSec considerations. Clickstream data, where you would expect to have multiple click events for a single customer on a single date partition during a session, is a good fit. We covered this approach in the section “Indexing by file name and grouping by index key” and you can download the codebase from the GitHub repo.
  • Use a metadata file – Along with uploading a new object, we also upload a metadata file that’s picked up by an indexing utility to create and maintain the index up to date. On delete request, we query the index, which points us to the records to purge. A good fit for this approach is a use case that already involves uploading a metadata file whenever a new object is uploaded, such as uploading multimedia data, along with their metadata. Otherwise, uploading a metadata file on every object upload might introduce too much of an overhead.
  • Use the tagging feature of AWS services – Whenever a new file is uploaded to Amazon S3, we use the Put Object Tagging Amazon S3 operation to add a key-value pair for the user identifier. Whenever there is a user data delete request, it fetches objects with that tag and deletes them. This option is straightforward to implement using the existing Amazon S3 API and can therefore be a very initial version of your implementation. However, it involves significant limitations. It assumes a 1:1 cardinality between Amazon S3 objects and users (each object only contains data for a single user), searching objects based on a tag is limited and inefficient, and storing user identifiers as tags might not be compliant with your organization’s InfoSec policy.
  • Use Apache Hudi – Apache Hudi is becoming a very popular option to perform record-level data deletion on Amazon S3. Its current version is restricted to Amazon EMR, and you can use it if you start to build your data lake from scratch, because you need to store your as Hudi datasets. Hudi is a very active project and additional features and integrations with more AWS services are expected.

The key implementation decision of our approach is separating the storage layer we use for our data and the one we use for our metadata. As a result, our design is versatile and can be plugged in any existing data pipeline. Similar to deciding what storage layer to use for your data, there are many factors to consider when deciding how to store your index:

  • Concurrency of requests – If you don’t expect too many simultaneous inserts, even something as simple as Amazon S3 could be a starting point for your index. However, if you get multiple concurrent writes for multiple users, you need to look into a service that copes better with transactions.
  • Existing team knowledge and infrastructure – In this post, we demonstrated using DynamoDB and RDS Postgres for storing and querying the metadata index. If your team has no experience with either of those but are comfortable with Amazon ES, Amazon DocumentDB (with MongoDB compatibility), or any other storage layer, use those. Furthermore, if you’re already running (and paying for) a MySQL database that’s not used to capacity, you could use that for your index for no additional cost.
  • Size of index – The volume of your metadata is orders of magnitude lower than your actual data. However, if your dataset grows significantly, you might need to consider going for a scalable, distributed storage solution rather than, for instance, a relational database management system.

Conclusion

GDPR has transformed best practices and introduced several extra technical challenges in designing and implementing a data lake. The reference architecture and scripts in this post may help you delete data in a manner that’s compliant with GDPR.

Let us know your feedback in the comments and how you implemented this solution in your organization, so that others can learn from it.

 


About the Authors

George Komninos is a Data Lab Solutions Architect at AWS. He helps customers convert their ideas to a production-ready data product. Before AWS, he spent 3 years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.

 

 

 

 

Sakti Mishra is a Data Lab Solutions Architect at AWS. He helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives. Outside of work, Sakti enjoys learning new technologies, watching movies, and travel.

Streaming data from Amazon S3 to Amazon Kinesis Data Streams using AWS DMS

Post Syndicated from Mahesh Goyal original https://aws.amazon.com/blogs/big-data/streaming-data-from-amazon-s3-to-amazon-kinesis-data-streams-using-aws-dms/

Stream processing is very useful in use cases where we need to detect a problem quickly and improve the outcome based on data, for example production line monitoring or supply chain optimizations.

This blog post walks you through process of streaming existing data files and ongoing changes from Amazon Simple Storage Service (Amazon S3) to Amazon Kinesis. You achieve this by using AWS Database Migration Service (AWS DMS). AWS DMS enables you to seamlessly migrate data from supported sources to relational databases, data warehouses, streaming platforms, and other data stores in AWS cloud.

Many SaaS, third-party applications already integrate with Amazon S3 and can deliver records to S3 buckets. In certain use cases, you need to further process this data in near-real-time to generate alerts. Use cases like threat detection and application monitoring require generating insights in seconds. Waiting for batch processes often leads to a delay in data analysis and reduces the ability of systems to respond quickly to critical situations. For such use cases, you need a way to convert batch to stream processing by expanding the existing integrations of your applications with Amazon S3.

You can use AWS DMS for such data-processing requirements. AWS DMS lets to expand your existing application into Amazon S3 to produce data in Amazon Kinesis Data Streams for real-time analytics without writing and maintaining new code. AWS DMS supports specifying Amazon S3 as the source and streaming services like Kinesis and Amazon Managed Streaming of Kafka (Amazon MSK) as the target. AWS DMS allows migration of full and change data capture (CDC) files to these services. AWS DMS performs this task out of box without any complex configuration or code development. You can also configure an AWS DMS replication instance to scale up or down depending on the workload.

For this post, we focus on streaming data to Kinesis. We deploy an AWS CloudFormation template to get started in minutes and explore the streaming pipeline.

Architecture overview

Third-party applications such as web, API, and data-integration services produce data and log files in S3 buckets. Data lakes built on AWS process and store data in Amazon S3 at different stages. AWS DMS supports Amazon S3 as the source and Kinesis as the target, so data stored in an S3 bucket is streamed to Kinesis. Several consumers, such as AWS Lambda, Amazon Kinesis Data Firehose, Amazon Kinesis Data Analytics, and the Kinesis Consumer Library (KCL), can consume the data concurrently to perform real-time analytics on the dataset. Each AWS service in this architecture can scale independently as needed.

The following diagram shows the architecture of this solution.

Deploying AWS CloudFormation

To get started, you first deploy the CloudFormation template to create the core components of the architecture. AWS CloudFormation automates the deployment of technology and infrastructure in a safe and repeatable manner across multiple Regions and accounts with the least amount of effort and time. To create these resources, complete the following steps:

  1. Sign in to the AWS Management Console and choose the us-west-2 Region.
  2. Choose Launch Stack:
  3. Choose Next.

 This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the CloudFormation template on the console.

  1. For Stack name, enter a stack name.
  2. On the next screen, choose your VPC and subnet IDs.
  3. For Does DMS VPC and Cloudwatch role Exists?, enter Y if the managed AWS Identity and Access Management (IAM) roles dms-vpc-role and dms-cloudwatch-logs-role exist in your account. Otherwise, leave at the default N.

If you want to deploy the AWS DMS endpoint in a private subnet, enable the VPC endpoints for Kinesis and Amazon S3 before deploying the template.

  1. Choose Next.
  2. Acknowledge resource creation under Capabilities on the final screen and choose Create.

The stack takes 5–10 minutes to complete, during which it performs the following:

The files required for this demo don’t come with the template. Download blog_sample_file.zip and upload it to the source bucket before starting the AWS DMS task.

Using Amazon S3 as the source

When you use Amazon S3 as the source, the data files (full load and CDC) must be in comma-separated value (CSV) format.

In addition to the data files, AWS DMS also requires an external table definition. An external table definition is a JSON document that describes how AWS DMS should interpret the data from Amazon S3.

Amazon S3 file paths for full load and CDC files are required for AWS DMS to run the task. Make sure that files names are sequentially numbered to replicate the data in the correct order. In addition, AWS DMS allows you to specify the column delimiter, row delimiter, and other parameters using extra connection attributes.

AWS DMS can identify the operation to perform for each load record in two ways: from the record’s keyword value INSERT or I.

For more information, see Using Amazon S3 as a source for AWS DMS.

Using Amazon Kinesis as the target

AWS publishes records to a Kinesis data stream as JSON. During conversion, AWS DMS serializes each record from the source Amazon S3 files into an attribute-value pair in JSON format.

AWS DMS publishes each record in the source Amazon S3 file as one JSON data record in a data stream regardless of the action specified in the source file.

Additionally, AWS DMS allows object mapping to migrate data from source files to a data stream. Object mapping determines the structure of data records in the stream.

AWS DMS also supports multi-threaded migration for full load and CDC with task settings. You can promote the performance by setting multiple threads, buffer size, and parallel queue.

For more information, see Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service.

Walkthrough

The AWS CloudFormation deployment takes care of all the infrastructure. Now you need files to complete this use case.

  1. Download blog_sample_file.zip, which contains full and CDC load files in CSV format.

If your source files aren’t in CSV, convert the file format to CSV. One conversion method is by using AWS Glue. For more information, see Format Options for ETL Inputs and Outputs in AWS Glue.

The following screenshot shows the sample records of the full load files that you use for this use case.

CDC files require additional attributes for AWS DMS to identify the action, table, and schema.

  1. Reformat the files as follows:
  • Operation – The change operation to be performed: INSERT or I, UPDATE or U, or DELETE or D.
  • Table name – The name of the source table.
  • Schema name – The name of the source schema.
  • Data – One or more columns that represent the data to be changed.

The following screenshot shows sample records of the CDC file.

External table definition is required in the source endpoint configuration. For this post, the definition is embedded in AWS CloudFormation.

  1. Enter the following code for the table definition for the full and CDC files:
    {
    	“TableCount”: “1",
    	“Tables”: [{
    		“TableName”: “table01”,
    		“TablePath”: “schema01/table01/“,
    		“TableOwner”: “schema01",
    		“TableColumns”: [{
    			“ColumnName”: “ingest_time”,
    			“ColumnType”: “TIMESTAMP”,
    			“ColumnNullable”: “false”,
    			“ColumnIsPk”: “true”
    		}, {
    			“ColumnName”: “doi”,
    			“ColumnType”: “STRING”,
    			“ColumnLength”: “30”
    		}, {
    			“ColumnName”: “id”,
    			“ColumnType”: “INT8”
    		}, {
    			“ColumnName”: “value”,
    			“ColumnType”: “NUMERIC”,
    			“ColumnPrecision”: “5”,
    			“ColumnScale”: “2”
    		}, {
    			“ColumnName”: “data_sig”,
    			“ColumnType”: “STRING”,
    			“ColumnLength”: “10”
    		}],
    		“TableColumnsTotal”: “5”
    	}]
    }
    

  2. Create folder structures under the source S3 bucket created through the CloudFormation template.
    1. Create folders schema01/table01/ for full load and cdcfile/ for CDC data files.
    2. Also, file names should be in incremental, as listed in the following CLI output.
      $aws s3 ls s3://blog-xxxxxxxx/schema01/table01 --recursive --human-readable --summarize
      2020-08-03 22:05:57    5.0 MiB schema01/table01/full_000
      2020-08-03 22:05:51    5.0 MiB schema01/table01/full_001
      2020-08-03 22:06:00    5.0 MiB schema01/table01/full_002
      2020-08-03 22:05:56    5.0 MiB schema01/table01/full_003
      2020-08-03 22:05:59    3.1 MiB schema01/table01/full_004
      
      $aws s3 ls s3://blog-xxxxxxxx/cdcfile --recursive --human-readable --summarize
      2020-08-03 22:06:28    4.8 MiB cdc/cdc_000
      2020-08-03 22:06:28    4.8 MiB cdc/cdc_001
      2020-08-03 22:06:26    4.8 MiB cdc/cdc_002
      2020-08-03 22:06:19    4.8 MiB cdc/cdc_003
      

  3. After the files are copied, on the AWS DMS console, choose Replication.
  4. Validate the instance status and configuration.
  5. Choose Endpoints.
  6. Validate the status and configuration of the Amazon S3 source endpoint and make sure that the connection to the replication instance is successful.
  7. Similarly, validate the status and configuration of Kinesis target endpoint and make sure that the connection to the replication instance is successful.
  8. Choose Database migration task.
  9. Verify that the source and target are mapped correctly.
  10. After validating all the configurations, restart the AWS DMS task. Because the task has been created and never started, choose Restart/Resume to start full load and CDC.

After data migration starts, you can see it listed under Table statistics. For more information, see How do I use table statistics to monitor an AWS DMS task?

AWS DMS completes the full load first and migrates change data as files are uploaded to the bucket location specified in the cdcPath parameter.

  1. While the migration is in progress, on the Kinesis console, check the IncomingBytes metrics on the Monitoring tab to confirm the data is streaming to Kinesis Data Streams.
  2. To confirm that the data streamed is being consumed by the Lambda consumer, use the GetRecords.Bytes metric.

You’re now ready to validate the records in Lambda. Lambda is configured to read from Kinesis through a trigger.

The Lambda consumer for this post is a sample function that consumes the records from the Kinesis data stream, decodes the base64 encoded data, and prints the records to the Amazon CloudWatch log group.

  1. On the Monitoring tab, open the recent logstream under CloudWatch Log Insights to see the printed records.

For more information about monitoring, see Monitoring functions in the AWS Lambda console.

You can add processing logic to the Lambda function as per your requirements to aggregate or process the records. You can also configure a Lambda destination for further processing. Lambda asynchronous invocations can put an event or message on Amazon Simple Notification Service (Amazon SNS), Amazon Simple Queue Service (Amazon SQS), or Amazon EventBridge. For more information, see Introducing AWS Lambda Destinations.

Best practice considerations

When implementing this solution, consider the following best practices:

  • Full load allows to you stream existing data from an S3 bucket to Kinesis. You can use full load to migrate previously stored data before streaming CDC data. The full load data should already exist before the task starts. For new CDC files, the data is streamed to Kinesis on a file delivery event in real-time.
  • For loading multiple tables, you can specify the table count and table properties in an external table definition file. The CDC path remains the same and AWS DMS maps the records to tables based on the metadata fields.
  • During a heavy workload, the AWS DMS instance can be constrained to resources like CPU, memory, storage, and I/O. For optimal transfer speed, monitor the CloudWatch metrics and scale the replication instance.
  • For migrating a large number of tables, you can speed up the transfer by setting the multi-threading parameter to higher values.
  • The CloudFormation template creates a data stream with two shards. As the data flow rate to the stream increases, you can scale the number of shards in the stream to adapt to changes. Monitoring Kinesis with CloudWatch metrics for IncomingRecords and WriteProvisionedThroughputExceeded provides insights on how to scale the shards.
  • Object mapping in the AWS DMS task defines the partition key. This partition key is used to group data by shard within a stream. The default partition key AWS DMS uses is TableName. You can use attribute mapping to change the partition key to a value of one of the fields in the JSON, or the primary key of the table in the source database. You can also set the partition key to a constant value to stream all the data to a single shard in the stream.
  • By default, Lambda invokes the function as soon as records are available in the stream. To avoid invoking the function with a small number of records, configure the event source to buffer records for up to 5 minutes by configuring a batch window. For more information, see Using AWS Lambda with Amazon Kinesis.
  • When Kinesis is configured as a trigger for Lambda, you can increase the concurrency to process multiple batches from each shard in parallel. Lambda can process up to 10 batches in each shard simultaneously. For more information about concurrency, see New AWS Lambda scaling controls for Kinesis and DynamoDB event sources.

Cleaning up

After successful testing and validation, you should delete all the resources deployed through the CloudFormation template to avoid any unwanted costs. First empty the S3 bucket and stop the AWS DMS task. Then delete the appropriate stacks on the AWS CloudFormation console.

Summary

This post describes a solution for converting batch processing to near real-time using AWS DMS. This solution greatly simplifies the process of migrating records from Amazon S3 to Kinesis for analysis. Kinesis as an AWS DMS target allows multiple systems to consume data simultaneously. Having a near-steaming pipeline allows you to make sense of all the changes in near-real time, which ultimately expands your organization’s ability for better decision-making. All the resources used in this solution scale seamlessly and allow you to focus on analysis, alerting, reporting, and fraud detection instead of focusing on platform setup and maintenance. This promotes cost-effectiveness while reducing operational burden.


About the Author

Mahesh Goyal is a Data Architect in Big Data at AWS. He works with customers in their journey to the cloud with a focus on big data and data warehouses. In his spare time, Mahesh likes to listen to music and explore new food places with his family.

 

 

 

 

Charishma Makineni is a Technical Account Manager at AWS. She works with enterprise customers to help them build secure and scalable solutions on the AWS cloud. She is focused on Big data and Analytics technologies. Outside of work, Charishma enjoys being outdoors, gardening and experimenting with cooking.

 

 

 

Suresh Patnam is a Solutions Architect at AWS. He helps customers innovate on the AWS platform by building highly available, scalable, and secure architectures on Big Data and AI/ML. In his spare time, Suresh enjoys playing tennis and spending time with his family.

Analyzing Amazon S3 server access logs using Amazon ES

Post Syndicated from Mahesh Goyal original https://aws.amazon.com/blogs/big-data/analyzing-amazon-s3-server-access-logs-using-amazon-es/

When you use Amazon Simple Storage Service (Amazon S3) to store corporate data and host websites, you need additional logging to monitor access to your data and the performance of your application. An effective logging solution enhances security and improves the detection of security incidents. With the advent of increased data storage needs, you can rely on Amazon S3 for a range of use cases and simultaneously looking for ways to analyze your logs to ensure compliance, perform the audit, and discover risks.

Amazon S3 lets you monitor the traffic using the server access logging feature. With server access logging, you can capture and monitor the traffic to your S3 bucket at any time, with detailed information about the source of the request. The logs are stored in the S3 bucket you own in the same Region. This addresses the security and compliance requirements of most organizations. The logs are critical for establishing baselines, analyzing access patterns, and identifying trends. For example, the logs could answer a financial organization’s question about how many requests are made to a bucket and who is making what type of access requests to the objects.

You can discover insights from server access logs through several different methods. One common option is by using Amazon Athena or Amazon Redshift Spectrum and query the log files stored in Amazon S3. However, this solution poses high latency with an exponential growth in volume. It requires further integration with Amazon QuickSight to add visualization capabilities.

You can address this by using Amazon Elasticsearch Service (Amazon ES). Amazon ES is a managed service that makes it easier to deploy, operate, and scale Elasticsearch clusters in the AWS Cloud. Elasticsearch is a popular open-source search and analytics engine for use cases such as log analytics, real-time application monitoring, and clickstream analysis. The service provides support for open-source Elasticsearch APIs, managed Kibana, and integration with other AWS services such as Amazon S3 and Amazon Kinesis for loading streaming data into Amazon ES.

This post walks you through automating ingestion of server access logs from Amazon S3 into Amazon ES using AWS Lambda and visualizing the data in Kibana.

Architecture overview

Server access logging is enabled on source buckets, and logs are delivered to access log bucket. The access log bucket is configured to send an event to the Lambda function when a log file is created. On an event trigger, the Lambda function reads the file, processes the access log, and sends it to Amazon ES. When the logs are available, you can use Kibana to create interactive visuals and analyze the logs over a time period.

When designing a log analytics solution for high-frequency incoming data, you should consider buffering layers to avoid instability in the system. Buffering helps you streamline processes for unpredictable incoming log data. For such use cases, you can take advantage of managed services like Amazon Kinesis Data Streams, Amazon Kinesis Data Firehose, and Amazon Managed Streaming for Apache Kafka (Amazon MSK).

Streaming services buffer data before delivering it to Amazon ES. This helps you avoid overwhelming your cluster with spiky ingestion events. Kinesis Data Firehose can reliably load data into Amazon ES. Kinesis Data Firehose lets you choose a buffer size of 1–100 MiBs and a buffer interval of 60–900 seconds when Amazon ES is selected as the destination. Kinesis Data Firehose also scales automatically to match the throughput of your data and requires no ongoing administration. For more information, see Ingest streaming data into Amazon Elasticsearch Service within the privacy of your VPC with Amazon Kinesis Data Firehose.

The following diagram illustrates the solution architecture.

Prerequisites

Before creating resources in AWS CloudFormation, you must enable server access logging on the source bucket. Open the S3 bucket properties and look for Amazon S3 access and delivery bucket. See the following screenshot.

You also need an AWS Identity and Access Management (IAM) user with sufficient permissions to interact with the AWS Management Console and related AWS services. The user must have access to create IAM roles and policies via the CloudFormation template.

Setting up the resources with AWS CloudFormation

First, deploy the CloudFormation template to create the core components of the architecture. AWS CloudFormation automates the deployment of technology and infrastructure in a safe and repeatable manner across multiple Regions and multiple accounts with the least amount of effort and time.

  1. Sign in to the console and choose the Region of the bucket storing the access log. For this post, I use us-east-1.
  2. Launch the stack:
  3. Choose Next.
  4. For Stack name, enter a name.
  5. On the Parameters page, enter the following parameters:
    1. VPC Configuration – Select any VPC that has at least two private subnets. The template deploys the Amazon ES service domain and Lambda within the VPC.
    2. Private subnets – Select two private subnets of the VPC. The route tables associated with subnets must have a NAT gateway configuration and VPC endpoint for Amazon S3 to privately connect the bucket from Lambda.
    3. Access log S3 bucket – Enter the S3 bucket where access logs are delivered. The template configures event notification on the bucket to trigger the Lambda function.
    4. Amazon ES domain name – Specify the Amazon ES domain name to be deployed through the template.
  6. Choose Next.
  7. On the next page, choose Next.
  8. Acknowledge resource creation under Capabilities and transforms and choose Create.

The stack takes about 10–15 minutes to complete. The CloudFormation stack does the following:

  • Creates an Amazon ES domain with fine-grained access control enabled on it. Fine-grained access control is configured with a primary user in the internal user database.
  • Creates IAM role for the Lambda function with required permission to read from S3 bucket and write to Amazon ES.
  • Creates Lambda within the same VPC of Amazon ES elastic network interfaces (ENI). Amazon ES places an ENI in the VPC for each of your data nodes. The communication from Lambda to the Amazon ES domain is via this ENI.
  • Configures file create event notification on Access log S3 bucket to trigger the Lambda function. The function code segments are discussed in detail in this GitHub project.

You must make several considerations before you proceed with a production-grade deployment. For this post, I use one primary shard with no replicas. As a best practice, we recommend deploying your domain into three Availability Zones with at least two replicas. This configuration lets Amazon ES distribute replica shards to different Availability Zones than their corresponding primary shards and improves the availability of your domain. For more information about sizing your Amazon ES, see Get started with Amazon Elasticsearch Service: T-shirt-size your domain.

We recommend setting the shard count based on your estimated index size, using 50 GB as a maximum target shard size. You should also define an index template to set the primary and replica shard counts before index creation. For more information about best practices, see Best practices for configuring your Amazon Elasticsearch Service domain.

For high-frequency incoming data, you can rotate indexes either per day or per week depending on the size of data being generated. You can use Index State Management to define custom management policies to automate routine tasks and apply them to indexes and index patterns.

Creating the Kibana user

With Amazon ES, you can configure fine-grained users to control access to your data. Fine-grained access control adds multiple capabilities to give you tighter control over your data. This feature includes the ability to use roles to define granular permissions for indexes, documents, or fields and to extend Kibana with read-only views and secure multi-tenant support. For more information on granular access control, see Fine-Grained Access Control in Amazon Elasticsearch Service.

For this post, you create a fine-grained role for Kibana access and map it to a user.

  1. Navigate to Kibana and enter the primary user credentials:
    1. User nameadminuser01
    2. Password[email protected]

To access Kibana, you must have access to the VPC. For more information about accessing Kibana, see Controlling Access to Kibana.

  1. Choose Security, Roles.
  2. For Role name, enter kibana_only_role.
  3. For Cluster-wide permissions, choose cluster_composite_ops_ro.
  4. For Index patterns, enter access-log and kibana.
  5. For Permissions: Action Groups, choose read, delete, index, and manage.
  6. Choose Save Role Definition.
  7. Choose Security, Internal User Database, and Create a New User.
  8. For Open Distro Security Roles, choose Kibana_only_role (created earlier).
  9. Choose Submit.

The user kibanauser01 now has full access to Kibana and access-logs indexes. You can log in to Kibana with this user and create the visuals and dashboards.

Building dashboards

You can use Kibana to build interactive visuals and analyze the trends and combine the visuals for different use cases in a dashboard. For example, you may want to see the number of requests made to the buckets in the last two days.

  1. Log in to Kibana using kibanauser01.
  2. Create an index pattern and set the time range
  3. On the Visualize section of your Kibana dashboard, add a new visualization.
  4. Choose Vertical Bar.

You can select any time range and visual based on your requirements.

  1. Choose the index pattern and then configure your graph options.
  2. In the Metrics pane, expand Y-Axis.
  3. For Aggregation, choose Count.
  4. For Custom Label, enter Request Count.
  5. Expand the X-Axis
  6. For Aggregation, choose Terms.
  7. For Field, choose bucket.
  8. For Order By, choose metric: Request Count.
  9. Choose Apply changes.
  10. Choose Add sub-bucket and expand the Split Series
  11. For Sub Aggregation, choose Date Histogram.
  12. For Field, choose requestdatetime.
  13. For Interval, choose Daily.
  14. Apply the changes by choosing the play icon at the top of the page.

You should see the visual on the right side, similar to the following screenshot.

You can combine graphs of different use cases into a dashboard. I have built some example graphs for general use cases like the number of operations per bucket, user action breakdown for buckets, HTTPS status rate, top users, and tabular formatted error details. See the following screenshots.

Cleaning up

Delete all the resources deployed through the CloudFormation template to avoid any unintended costs.

  1. Disable the access log on source bucket.
  2. On to the CloudFormation console, identify the stacks appropriately, and delete

Summary

This post detailed a solution to visualize and monitor Amazon S3 access logs using Amazon ES to ensure compliance, perform security audits, and discover risks and patterns at scale with minimal latency. To learn about best practices of Amazon ES, see Amazon Elasticsearch Service Best Practices. To learn how to analyze and create a dashboard of data stored in Amazon ES, see the AWS Security Blog.


About the Authors

Mahesh Goyal is a Data Architect in Big Data at AWS. He works with customers in their journey to the cloud with a focus on big data and data warehouses. In his spare time, Mahesh likes to listen to music and explore new food places with his family.

 

 

 

 

Implementing LDAP authentication for Hive on a multi-tenant Amazon EMR cluster

Post Syndicated from Kiran Erra original https://aws.amazon.com/blogs/big-data/implementing-ldap-authentication-for-hive-on-a-multi-tenant-amazon-emr-cluster/

As Amazon EMR continues its widespread adoption, it’s important to enforce separation of duties using role-based access when submitting your hive jobs on EMR clusters in multi-tenant environments. In this post, we walk through the steps to set up authentication for Hive using Lightweight Directory Access Protocol (LDAP) and Microsoft Active Directory Domain Controller.

Solution overview

In a multi-tenant environment, it’s critical to enforce role-based access when submitting Hive jobs to an EMR cluster. Although you may add Hive steps to an existing cluster, such a setup doesn’t enforce role-based access, because Amazon EMR steps are always submitted using the default Hive user. The default way of submitting a Hive job to an EMR cluster is by using the Add Step functionality. This post outlines the process by which you can enforce EMRFS role mappings when an active directory user submits a Hive job after authenticating via LDAP and Microsoft Active Directory Domain Controller. The following diagram illustrates the provisioned infrastructure from AWS CloudFormation.

The following AWS services are used as part of the recommended solution:

  • AWS Secrets ManagerAWS Secrets Manager helps you protect secrets needed to access your applications, services, and IT resources. The service enables you to easily rotate, manage, and retrieve database credentials, API keys, and other secrets throughout their lifecycle.
  • Amazon EMR – Amazon EMR makes it easy to process large amounts of data efficiently. Amazon EMR uses Hadoop processing combined with several AWS products to do tasks such as web indexing, data mining, log file analysis, machine learning, scientific simulation, and data warehousing.
  • Amazon EC2Amazon Elastic Compute Cloud (Amazon EC2) provides secure, resizable compute capacity in the cloud. It’s designed to make web-scale cloud computing easier for developers.

In our solution (as we discuss it in this post), the corporate user base is maintained in the Microsoft Active Directory Domain Controller. The EMR cluster is integrated with AD using a bootstrap action so that you can securely submit Hive jobs using a beeline by establishing an LDAP connection from an edge node (represented by an EC2 instance). The user credentials are stored in and fetched from Secrets Manager, when establishing the beeline connection.

Prerequisites

Before getting started, you must have the following prerequisites:

  • Microsoft Active Directory Domain Controller needs to be installed and set up. For a quick setup of Microsoft Active Directory Domain Controller and VPC, see the step Launch and configure an Active Directory domain controller in the Deploying each component individually section of the post Implement perimeter security in Amazon EMR using Apache Knox.
  • A valid AWS account with access to AWS services.
  • An Amazon VPC with a public subnet.
  • An AWS Identity and Access Management (IAM) policy for Secrets Manager permissions.

Implementing the solution

We provide the CloudFormation template in this post as a general guide. Please review and customize it as needed. You should also be aware that some of the resources deployed by this stack incur costs when they remain in use. The CloudFormation template has the following steps:

  1. Start an EMR cluster with the configuration from the parameters.
  2. Integrate the EMR cluster with AD using a bootstrap action.
  3. Create and launch an EC2 instance to test the integration.
  4. Add an inbound rule to the Amazon EMR primary additional security group to allow port 10000 on the newly launched EC2 instance.

This section describes how to use the Cloud Formation templates to launch an EMR cluster with the following parameters:

ParameterDefault ValueDescription
ClusterNameemr-ldap4hiveThe name of the cluster.
CoreInstanceTypem4.xlargeThe instance type of the nodes.
CoreNodeCount2The number of nodes in the cluster.
CreateLogBucketFALSEA Boolean flag to see if we need to set up a bucket for logs.
KeyPairKey pair used to log in to the EC2 instance for validation.
MasterInstanceTypem4.xlargeThe instance type of the nodes.
ReleaseLabelemr-6.0.0Amazon EMR version. This template is tested with emr-6.0.0 or emr-5.29.0.
RemoteAccessCIDRThe CIDR range to access Amazon EMR. This is usually the same as the IP address of the local machine.
VPCIDVPC ID used in Amazon EMR configuration. Make sure you select a public VPC.
SubnetIdSubnet ID used in Amazon EMR configuration. Make sure you select the subnet that belongs to the VPC selected.
ldapurlThe LDAP URL of the AD domain controller, in the format ldap://<Private IP of AD domain controller>:389. Please refer to the first item in the Prerequisites section.
passwd4awsadmin[email protected]The AD admin password. Must be at least eight characters containing letters, numbers, and symbols.
EC2 AMIami-0ac80df6eff0e70b5The AMI used to create the EC2 instance for validation.
My IPThe IP address of the local machine.

The following screenshot shows the Specify stack details page when launching your template.

A bootstrap script ldap-bootstrap.sh is invoked during the cluster creation to perform the following actions:

  • Fetch the login credentials for the Active Directory domain admin from Secrets Manager
  • Perform the realm join using the credentials fetched
  • Enable password-based authentication to the cluster

To deploy the template into your account, choose Launch Stack:

The following screenshot shows the EMR cluster the Cloud Formation stack created.

Validating the solution

To validate the solution, SSH to the Ubuntu EC2 instance using the EC2 key pair, as shown in the following screenshot. Refer to the Outputs tab from your AWS CloudFormation stack.

For this post, we used the Ubuntu Server 18.04 LTS (HVM), SSD Volume Type – ami-07ebfd5b3428b6f4d (64-bit x86) / ami-0400a1104d5b9caa1 (64-bit Arm) AMI.

You should see the Python Hive beeline script in /home/ubuntu:

Run demo-hive-beeline.py as shown in the following screenshot. This Python script fetches the AD credentials from Secrets Manager, establishes a beeline connection for Hive on Amazon EMR, submits Hive commands to create an external table for the NYC taxi dataset located in your Amazon Simple Storage Service (Amazon S3) bucket, and runs a sample select statement on the table.

The script has the following parameters:

  • -r or –region_name – AWS Region
  • -s or –secret-id – Secret ARN
  • -h or –host-name – Amazon EMR public DNS address

Cleaning up

Delete the CloudFormation stack to clean up all the resources created in this post. Also, stop the EC2 Ubuntu instance that you created in the verification step. If you used the nested stack, AWS CloudFormation deletes all resources in one operation. If you deployed the templates individually, delete them in the reverse order of creation, deleting the VPC stack last.

Conclusion

In this post, we went through the setup and validation of LDAP authentication for Hive using an EMR cluster. This decouples the authentication mechanism from Hive and Amazon EMR and leverages the system of record using LDAP and Active Directory Domain Controller.


About the authors

Kiran Erra is a data architect with AWS. He works with AWS customers to provide guidance and technical assistance about Big Data, AI/ML and Security projects, helping them improve the value of their solutions when using AWS.

 

 

 

Rajarao Vijjapu is a security data architect with AWS. He works with AWS customers and partners to provide guidance and technical assistance about Big Data, Analytics, AI/ML and Security projects, helping them improve the value of their solutions when using AWS.

 

 

 

Enhanced monitoring and automatic scaling for Apache Flink

Post Syndicated from Karthi Thyagarajan original https://aws.amazon.com/blogs/big-data/enhanced-monitoring-and-automatic-scaling-for-apache-flink/

Thousands of developers use Apache Flink to build streaming applications to transform and analyze data in real time. Apache Flink is an open-source framework and engine for processing data streams. It’s highly available and scalable, delivering high throughput and low latency for the most demanding stream-processing applications. Monitoring and scaling your applications is critical to keep your applications running successfully in a production environment.

Amazon Kinesis Data Analytics reduces the complexity of building and managing Apache Flink applications. Amazon Kinesis Data Analytics manages the underlying Apache Flink components that provide durable application state, metrics and logs, and more. Kinesis Data Analytics recently announced new Amazon CloudWatch metrics and the ability to create custom metrics to provide greater visibility into your application.

In this post, we show you how to easily monitor and automatically scale your Apache Flink applications with Amazon Kinesis Data Analytics. We walk through three examples. First, we create a custom metric in the Kinesis Data Analytics for Apache Flink application code. Second, we use application metrics to automatically scale the application. Finally, we share a CloudWatch dashboard for monitoring your application and recommend metrics that you can alarm on.

Custom metrics

Kinesis Data Analytics uses Apache Flink’s metrics system to send custom metrics to CloudWatch from your applications. For more information, see Using Custom Metrics with Amazon Kinesis Data Analytics for Apache Flink.

We use a basic word count program to illustrate the use of custom metrics. The following code shows how to extend RichFlatMapFunction to track the number of words it sees. This word count is then surfaced via the Flink metrics API.

private static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> {
     
            private transient Counter counter;
     
            @Override
            public void open(Configuration config) {
                this.counter = getRuntimeContext().getMetricGroup()
                        .addGroup("kinesisanalytics")
                        .addGroup("Service", "WordCountApplication")
                        .addGroup("Tokenizer")
                        .counter("TotalWords");
            }
     
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>>out) {
                // normalize and split the line
                String[] tokens = value.toLowerCase().split("\\W+");
     
                // emit the pairs
                for (String token : tokens) {
                    if (token.length() > 0) {
                        counter.inc();
                        out.collect(new Tuple2<>(token, 1));
                    }
                }
            }
        }

Custom metrics emitted through the Flink metrics API are forwarded to CloudWatch metrics by Kinesis Data Analytics for Apache Flink. The following screenshot shows the word count metric in CloudWatch.

Custom automatic scaling

This section describes how to implement an automatic scaling solution for Kinesis Data Analytics for Apache Flink based on CloudWatch metrics. You can configure Kinesis Data Analytics for Apache Flink to perform CPU-based automatic scaling. However, you can automatically scale your application based on something other than CPU utilization. To perform custom automatic scaling, use Application Auto Scaling with the appropriate metric.

For applications that read from a Kinesis stream source, you can use the metric millisBehindLatest. This captures how far behind your application is from the head of the stream.

A target tracking policy is one of two scaling policy types offered by Application Auto Scaling. You can specify a threshold value around which to vary the degree of parallelism of your Kinesis Data Analytics application. The following sample code on GitHub configures Application Auto Scaling when millisBehindLatest for the consuming application exceeds 1 minute. This increases the parallelism, which increases the number of KPUs.

The following diagram shows how Application Auto Scaling, used with Amazon API Gateway and AWS Lambda, scales a Kinesis Data Analytics application in response to a CloudWatch alarm.

The sample code includes examples for automatic scaling based on the target tracking policy and step scaling policy.

Automatic scaling solution components

The following is a list of key components used in the automatic scaling solution. You can find these components in the AWS CloudFormation template in the GitHub repo accompanying this post.

  • Application Auto Scaling scalable target – A scalable target is a resource that Application Auto Scaling can scale in and out. It’s uniquely identified by the combination of resource ID, scalable dimension, and namespace. For more information, see RegisterScalableTarget.
  • Scaling policy – The scaling policy defines how your scalable target should scale. As described in the PutScalingPolicy, Application Auto Scaling supports two policy types: TargetTrackingScaling and StepScaling. In addition, you can configure a scheduled scaling action using Application Auto Scaling. If you specify TargetTrackingScaling, Application Auto Scaling also creates corresponding CloudWatch alarms for you.
  • API Gateway – Because the scalable target is a custom resource, we have to specify an API endpoint. Application Auto Scaling invokes this to perform scaling and get information about the current state of our scalable resource. We use an API Gateway and Lambda function to implement this endpoint.
  • Lambda – API Gateway invokes the Lambda function. This is called by Application Auto Scaling to perform the scaling actions. It also fetches information such as current scale value and returns information requested by Application Auto Scaling.

Additionally, you should be aware of the following:

  • When scaling out or in, this sample only updates the overall parallelism. It doesn’t adjust parallelism or KPU.
  • When scaling occurs, the Kinesis Data Analytics application experiences downtime.
  • The throughput of a Flink application depends on many factors, such as complexity of processing and destination throughput. The step-scaling example assumes a relationship between incoming record throughput and scaling. The millisBehindLatest metric used for target tracking automatic scaling works the same way.
  • We recommend using the default scaling policy provided by Kinesis Data Analytics for CPU-based scaling, the target tracking auto scaling policy for the millisBehindLatest metric, and a step scaling auto scaling policy for a metric such as numRecordsInPerSecond. However, you can use any automatic scaling policy for the metric you choose.

CloudWatch operational dashboard

Customers often ask us about best practices and the operational aspects of Kinesis Data Analytics for Apache Flink. We created a CloudWatch dashboard that captures the key metrics to monitor. We categorize the most common metrics in this dashboard with the recommended statistics for each metric.

This GitHub repo contains a CloudFormation template to deploy the dashboard for any Kinesis Data Analytics for Apache Flink application. You can also deploy a demo application with the dashboard. The dashboard includes the following:

  • Application health metrics:
    • Use uptime to see how long the job has been running without interruption and downtime to determine if a job failed to run. Non-zero downtime can indicate issues with your application.
    • Higher-than-normal job restarts can indicate an unhealthy application.
    • Checkpoint information size, duration, and number of failed checkpoints can help you understand application health and progress. Increasing checkpoint duration values can signify application health problems like backpressure and the inability to keep up with input data. Increasing checkpoint size over time can point to an infinitely growing state that can lead to out-of-memory errors.
  • Resource utilization metrics:
    • You can check the CPU and heap memory utilization along with the thread count. You can also check the garbage collection time taken across all Flink task managers.
  • Flink application progress metrics:
    • numRecordsInPerSecond and numRecordsOutPerSecond show the number of records accepted and emitted per second.
    • numLateRecordsDropped shows the number of records this operator or task has dropped due to arriving late.
    • Input and output watermarks are valid only when using event time semantics. You can use the difference between these two values to calculate event time latency.
  • Source metrics:
    • The Kinesis Data Streams-specific metric millisBehindLatest shows that the consumer is behind the head of the stream, indicating how far behind current time the consumer is. We used this metric to demonstrate Application Auto Scaling earlier in this post.
    • The Kafka-specific metric recordsLagMax shows the maximum lag in terms of number of records for any partition in this window.

The dashboard contains useful metrics to gauge the operational health of a Flink application. You can modify the threshold, configure additional alarms, and add other system or custom metrics to customize the dashboard for your use. The following screenshot shows a section of the dashboard.

Summary

In this post, we covered how to use the enhanced monitoring features for Kinesis Data Analytics for Apache Flink applications. We created custom metrics for an Apache Flink application within application code and emitted it to CloudWatch. We also used Application Auto Scaling to scale an application. Finally, we shared a CloudWatch dashboard to monitor the operational health of Kinesis Data Analytics for Apache Flink applications. For more information about using Kinesis Data Analytics, see Getting Started with Amazon Kinesis Data Analytics.


About the Authors

Karthi Thyagarajan is a Principal Solutions Architect on the Amazon Kinesis team.

 

 

 

 

Deepthi Mohan is a Sr. TPM on the Amazon Kinesis Data Analytics team.

Stream CDC into an Amazon S3 data lake in Parquet format with AWS DMS

Post Syndicated from Viral Shah original https://aws.amazon.com/blogs/big-data/stream-cdc-into-an-amazon-s3-data-lake-in-parquet-format-with-aws-dms/

Most organizations generate data in real time and ever-increasing volumes. Data is captured from a variety of sources, such as transactional and reporting databases, application logs, customer-facing websites, and external feeds. Companies want to capture, transform, and analyze this time-sensitive data to improve customer experiences, increase efficiency, and drive innovations. With increased data volume and velocity, it’s imperative to capture the data from source systems as soon as they are generated and store them on a secure, scalable, and cost-efficient platform.

AWS Database Migration Service (AWS DMS) performs continuous data replication using change data capture (CDC). Using CDC, you can determine and track data that has changed and provide it as a stream of changes that a downstream application can consume and act on. Most database management systems manage a transaction log that records changes made to the database contents and metadata. AWS DMS reads the transaction log by using engine-specific API operations and functions and captures the changes made to the database in a nonintrusive manner.

Amazon Simple Storage Service (Amazon S3) is the largest and most performant object storage service for structured and unstructured data and the storage service of choice to build a data lake. With Amazon S3, you can cost-effectively build and scale a data lake of any size in a secure environment where data is protected by 99.999999999% of durability.

AWS DMS offers many options to capture data changes from relational databases and store the data in columnar format (Apache Parquet) into Amazon S3:

The second option helps you build a flexible data pipeline to ingest data into an Amazon S3 data lake from several relational and non-relational data sources, compared to just relational data sources support in the former option. Kinesis Data Firehose provides pre-built AWS Lambda blueprints for converting common data sources such as Apache logs and system logs to JSON and CSV formats or writing your own custom functions. It can also convert the format of incoming data from JSON to Parquet or Apache ORC before storing the data in Amazon S3. Data stored in columnar format gives you faster and lower-cost queries with downstream analytics services like Amazon Athena.

In this post, we focus on the technical challenges outlined in the second option and how to address them.

As shown in the following reference architecture, data is ingested from a database into Parquet format in Amazon S3 via AWS DMS integrating with Kinesis Data Streams and Kinesis Data Firehose.

Our solution provides flexibility to ingest data from several sources using Kinesis Data Streams and Kinesis Data Firehose with built-in data format conversion and integrated data transformation capabilities before storing data in a data lake. For more information about data ingestion into Kinesis Data Streams, see Writing Data into Amazon Kinesis Data Streams. You can then query Parquet data in Amazon S3 efficiently with Athena.

Implementing the architecture

AWS DMS can migrate data to and from most widely used commercial and open-source databases. You can migrate and replicate data directly to Amazon S3 in CSV and Parquet formats, and store data in Amazon S3 in Parquet because it offers efficient compression and encoding schemes. Parquet format allows compression schemes on a per-column level, and is future-proofed to allow adding more encodings as they are invented and implemented.

AWS DMS supports Kinesis Data Streams as a target. Kinesis Data Streams is a massively scalable and durable real-time data streaming service that can collect and process large streams of data records in real time. AWS DMS service publishes records to a data stream using JSON. For more information about configuration details, see Use the AWS Database Migration Service to Stream Change Data to Amazon Kinesis Data Streams.

Kinesis Data Firehose can pull data from Kinesis Data Streams. It’s a fully managed service that delivers real-time streaming data to destinations such as Amazon S3, Amazon Redshift, Amazon Elasticsearch Service (Amazon ES), and Splunk. Kinesis Data Firehose can convert the format of input data from JSON to Parquet or ORC before sending it to Amazon S3. It needs reference schema to interpret the AWS DMS streaming data in JSON and convert into Parquet. In this post, we use AWS Glue, a fully managed ETL service, to create a schema in the AWS Glue Data Catalog for Kinesis Data Firehose to reference.

When AWS DMS migrates records, it creates additional fields (metadata) for each migrated record. The metadata provides additional information about the record being migrated, such as source table name, schema name, and type of operation. Most metadata fields add – in their field names (for example, record-type, schema-name, table-name, transaction-id). See the following code:

{
        "data": {
            "MEET_CODE": 5189459,
            "MEET_DATE": "2020-02-21T19:20:04Z",
            "RACE_CODE": 5189459,
            "LAST_MODIFIED_DATE": "2020-02-24T19:20:04Z",
            "RACE_ENTRY_CODE": 11671651,
            "HORSE_CODE": 5042811
        },
        "metadata": {
            "transaction-id": 917505,
            "schema-name": "SH",
            "operation": "insert",
            "table-name": "RACE_ENTRY",
            "record-type": "data",
            "timestamp": "2020-02-26T00:20:07.482592Z",
            "partition-key-type": "schema-table"
        }
    }

Additional metadata added by AWS DMS leads to an error during the data format conversion phase in Kinesis Data Firehose. Kinesis Data Firehose follows Hive style formatting and therefore doesn’t recognize the – character in the metadata field names during data conversion from JSON into Parquet and returns an error message: expected at the position 30 of ‘struct’ but ‘-’ is found. For example, see the following code:

{
	"deliveryStreamARN": "arn:aws:firehose:us-east-1:1234567890:deliverystream/abc-def-KDF",
	"destination": "arn:aws:s3:::abc-streaming-bucket",
	"deliveryStreamVersionId": 13,
	"message": "The schema is invalid. Error parsing the schema:
	 Error: : expected at the position 30 of 'struct<timestamp:string,record-type:string,operation:string,partition-key-type:string,schema-name:string,table-name:string,transaction-id:int>' but '-' is found.",
	"errorCode": "DataFormatConversion.InvalidSchema"
}

You can resolve the issue by making the following changes: specifying JSON key mappings and creating a reference table in AWS Glue before configuring Kinesis Data Firehose.

Specifying JSON key mappings

In your Kinesis Data Firehose configuration, specify JSON key mappings for fields with – in their names. Mapping transforms these specific metadata fields names to _ (for example, record-type changes to record_type).

Use AWS Command Line Interface (AWS CLI) to create Kinesis Data Firehose with the JSON key mappings. Modify the parameters to meet your specific requirements.

Kinesis Data Firehose configuration mapping is only possible through the AWS CLI or API and not through the AWS Management Console.

The following code configures Kinesis Data Firehose with five columns with – in their field names mapped to new field names with _”:

"S3BackupMode": "Disabled",
                    "DataFormatConversionConfiguration": {
                        "SchemaConfiguration": {
                            "RoleARN": "arn:aws:iam::123456789012:role/sample-firehose-delivery-role",
                            "DatabaseName": "sample-db",
                            "TableName": "sample-table",
                            "Region": "us-east-1",
                            "VersionId": "LATEST"
                        },
                        "InputFormatConfiguration": {
                            "Deserializer": {
                                "OpenXJsonSerDe": {
                                "ColumnToJsonKeyMappings":
                                {
                                 "record_type": "record-type","partition_key_type": "partition-key-type","schema_name":"schema-name","table_name":"table-name","transaction_id":"transaction-id"
                                }
                                }

Creating a reference table in AWS Glue

Because Kinesis Data Firehose uses the Data Catalog to reference schema for Parquet format conversion, you must first create a reference table in AWS Glue before configuring Kinesis Data Firehose. Use Athena to create a Data Catalog table. For instructions, see CREATE TABLE. In the table, make sure that the column name uses _ in their names, and manually modify it in advance through the Edit schema option for the referenced table in AWS Glue, if needed.

Use Athena to query the results of data ingested by Kinesis Data Firehose into Amazon S3.

This solution is only applicable in the following use cases:

  • Capturing data changes from your source with AWS DMS
  • Converting data into Parquet with Kinesis Data Firehose

If you want to store data in non-Parquet format (such CSV or JSON) or ingest into Kinesis through other routes, then you don’t need to modify your Kinesis Data Firehose configuration.

Conclusion

This post demonstrated how to convert AWS DMS data into Parquet format and specific configurations to make sure metadata follows the expected format of Kinesis Data Streams and Kinesis Data Firehose. We encourage you to try this solution and take advantage of all the benefits of using AWS DMS with Kinesis Data Streams and Kinesis Data Firehose. For more information, see Getting started with AWS Database Migration Service and Setting up Amazon Kinesis Firehose.

If you have questions or suggestions, please leave a comment.

 


About the Author

Viral Shah is a Data Lab Architect with Amazon Web Services. Viral helps our customers architect and build data and analytics prototypes in just four days in the AWS Data Lab. He has over 20 years of experience working with enterprise customers and startups primarily in the Data and Database space.

 

 

Developing AWS Glue ETL jobs locally using a container

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/developing-aws-glue-etl-jobs-locally-using-a-container/

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. In the fourth post of the series, we discussed optimizing memory management. In this post, we focus on writing ETL scripts for AWS Glue jobs locally. AWS Glue is built on top of Apache Spark and therefore uses all the strengths of open-source technologies. AWS Glue comes with many improvements on top of Apache Spark and has its own ETL libraries that can fast-track the development process and reduce boilerplate code.

The AWS Glue team released the AWS Glue binaries and let you set up an environment on your desktop to test your code. We have used these libraries to create an image with all the right dependencies packaged together. The image has AWS Glue 1.0, Apache Spark, OpenJDK, Maven, Python3, the AWS Command Line Interface (AWS CLI), and boto3. We have also bundled Jupyter and Zeppelin notebook servers in the image so you don’t have to configure an IDE and can start developing AWS Glue code right away.

The AWS Glue team will release new images for various AWS Glue updates. The tags of the new images will follow the following convention: glue_libs_<glue-version>_image_<image-version>. For example, glue_libs_1.0.0_image_01. In this name, 1.0 is the AWS Glue major version, .0 is the patch version, and 01 is the image version. The patch version will be incremented for updates to the AWS Glue libraries of a major release. Image version will be incremented for the release of a new image of a major AWS Glue release. Both these increments will be reset with every major AWS Glue release. So, the first image released for AWS Glue 2.0 will be glue_libs_2.0.0_image_01.

We recommend pulling the highest image version for an AWS Glue major version to get the latest updates.

Prerequisites

Before you start, make sure that Docker is installed and the Docker daemon is running. For installation instructions, see the Docker documentation for Mac, Windows, or Linux. The machine running the Docker hosts the AWS Glue container. Also make sure that you have at least 7 GB of disk space for the image on the host running the Docker.

For more information about restrictions when developing AWS Glue code locally, see Local Development Restrictions.

Solution overview

In this post, we use amazon/aws-glue-libs:glue_libs_1.0.0_image_01 from Docker Hub. This image has only been tested for an AWS Glue 1.0 Spark shell (both for PySpark and Scala). It hasn’t been tested for an AWS Glue 1.0 Python shell.

We organize this post into the following three sections. You only have to complete one of the three sections (not all three) depending on your requirement:

  • Setting up the container to use Jupyter or Zeppelin notebooks
  • Setting up the Docker image with PyCharm Professional
  • Running against the CLI interpreter

This post uses the following two terms frequently:

  • Client – The system from which you access the notebook. You open a web browser on this system and put the notebook URL.
  • Host – The system that hosts the Docker daemon. The container runs on this system.

Sometimes, your client and host can be the same system.

Setting up the container to use Jupyter or Zeppelin notebooks

Setting up the container to run PySpark code in a notebook includes three high-level steps:

  1. Pulling the image from Docker Hub.
  2. Running the container.
  3. Opening the notebook.

Pulling the image from Docker Hub

If you’re running Docker on Windows, choose the Docker icon (right-click) and choose Switch to Linux containers… before pulling the image.

Open cmd on Windows or terminal on Mac and run the following command:

docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

Running the container

We pulled the image from Docker Hub in the previous step. We now run a container using this image.

The general format of the run command is:

docker run -itd -p <port_on_host>:<port_on_container_either_8888_or_8080> -p 4040:4040 <credential_setup_to_access_AWS_resources> --name <container_name> amazon/aws-glue-libs:glue_libs_1.0.0_image_01 <command_to_start_notebook_server>

The code includes the following information:

  • <port_on_host> – The local port of your host that is mapped to the port of the container. For our use case, the container port is either 8888 (for a Jupyter notebook) or 8080 (for a Zeppelin notebook). To keep things simple, we use the same port number as the notebook server ports on the container in the following examples.
  • <port_on_container_either_8888_or_8080> – The port of the notebook server on the container. The default port of Jupyter is 8888; the default port of Zeppelin is 8080.
  • 4040:4040 – This is required for SparkUI. 4040 is the default port for SparkUI. For more information, see Web Interfaces.
  • <credential_setup_to_access_AWS_resources> – In this section, we go with the typical case of mounting the host’s directory, containing the credentials. We assume that your host has the credentials configured using aws configure. The flow chart in the Appendix section explains various ways to set the credentials if the assumption doesn’t hold for your environment.
  • <container_name> – The name of the container. You can use any text here.

  • amazon/aws-glue-libs:glue_libs_1.0.0_image_01 – The name of the image that we pulled in the previous step.
  • <command_to_start_notebook_server> – We run /home/zeppelin/bin/zeppelin.sh for a Zeppelin notebook and /home/jupyter/jupyter_start.sh for a Jupyter notebook. If you want to run your code against the CLI interpreter, you don’t need a notebook server and can leave this argument blank.
The following example code starts a Jupyter notebook and passes read-only credentials from a Mac or Linux host:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh
The following example code starts a Jupyter notebook and passes read-write credentials from a Windows host:

docker run -itd -p 8888:8888 -p 4040:4040 -v %UserProfile%\.aws:/root/.aws:rw --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

To run a Zeppelin notebook, replace 8888:8888 with 8080:8080, glue_jupyter with glue_zeppelin, and /home/jupyter/jupyter_start.sh with /home/zeppelin/bin/zeppelin.sh. For example, the following command starts a Zeppelin notebook server and passes read-only credentials from a Mac or Linux host:

docker run -itd -p 8080:8080 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_zeppelin amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/zeppelin/bin/zeppelin.sh

You can now run the following command to make sure that the container is running:

docker ps

The Jupyter notebook is configured to allow connections from all IP addresses without authentication, and the Zeppelin notebook is configured to use anonymous access. This configuration makes sure that you can start working on your local machine with just two commands (docker pull and docker run). If your scenario mandates a different configuration, run the container without running the notebook startup script (/home/jupyter/jupyter_start.sh or /home/zeppelin/bin/zeppelin.sh). This starts the container but not the notebook server. You can then run the bash shell on the container using the following command, edit the required notebook configurations, and start the notebook server:

docker exec -it <container_name> bash

For example,

docker exec -it glue_jupyter bash.

The following example code is the docker run command without the notebook server startup:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01

If you’re running the container on Amazon Elastic Compute Cloud (Amazon EC2) instance, you have to set up your inbound rules in the security group to allow communication on the ports used by the notebook server. A broad inbound rule can create security risks. For more information, see AWS Security Best Practices.

Opening the notebook

If your client and host are the same machine, enter the following URL for Jupyter: http://localhost:8888.

You can write PySpark code in the notebook as shown here. You can also use SQL magic (%%sql) to directly write SQL against the tables in the AWS Glue Data Catalog. If your catalog table is on top of JSON data, you have to place json-serde.jar in the /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/jars directory of the container and restart the kernel in your Jupyter notebook. You can place the jar in this directory by first running the bash shell on the container using the following command:

docker exec -it <container_name> bash

If you have a local directory that holds your notebooks, you can mount it to /home/jupyter/jupyter_default_dir using the -v option. These notebooks are available to you when you open the Jupyter notebook URL. For example, see the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -v ~/.aws:/root/.aws:ro -v C:\Users\admin\Documents\notebooks:/home/jupyter/jupyter_default_dir --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

The URL for Zeppelin is http://localhost:8080.

For Zeppelin notebooks, include %spark.pyspark on the top to run PySpark code.

If your host is Amazon EC2 and your client is your laptop, replace localhost in the preceding URLs with your host’s public IP.

Depending on your network or if you’re on a VPN, you might have to set an SSH tunnel. The general format of the tunnel is the following code:

ssh -i <absolute_path_to_your_private_key_for_EC2> -v -N -L <port_on_client>:<ip_of_the_container>:<port_8888_or_8080> [email protected]<public_ip_address_of_ec2_host>

Your security group controlling the EC2 instance should allow inbound on port 22 from the client. A broad inbound rule can create security risks. For more information, see AWS Security Best Practices.

You can get the <ip_of_the_container> under the IPAddress field when you run docker inspect <container_name>. For example: docker inspect glue_jupyter.

If you set up the tunnel, the URL to access the notebook is: http://localhost:<port_on_client>.

Use 8888 or 8080 for <port_8888_or_8080>, depending on if you’re running a Jupyter or Zeppelin notebook.

You can now use the following sample code to test your notebook:

from pyspark import SparkContext
from awsglue.context import GlueContext

glueContext = GlueContext(SparkContext.getOrCreate()) 
inputDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://awsglue-datasets/examples/us-legislators/all/memberships.json"]}, format = "json")
inputDF.toDF().show()

Although awsglue-datasets is a public bucket, you at least need the following permissions, attached to the AWS Identity and Access Management (IAM) user used for your container, to view the data:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "S3ReadOnly",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::awsglue-datasets/*"
        }
    ]
}

You can also see the databases in your AWS Glue Data Catalog using the following code:

spark.sql("show databases").show()

You need AWS Glue permissions to run the preceding command. The following are the minimum permissions required to run the code. Replace <account_number> with your account number and <region> with your Region:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "GlueAccess",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetDatabases"
            ],
            "Resource": [
                "arn:aws:glue:<region>:<account_number>:database/*",
                "arn:aws:glue:<region>:<account_number>:catalog"
            ]
        }
    ]
}

Similarly, you can query the AWS Glue Data Catalog tables too. If your host is Amazon EC2 instance, you see the catalog of the Region of your EC2 instance. If your host is local, you see the catalog of the Region set in your aws configure or your AWS_REGION variable.

You can stop here if you want to develop AWS Glue code locally using only notebooks.

Setting up the Docker image with PyCharm Professional

This section talks about setting up PyCharm Professional to use the image. For this post, we use Windows. There may be a few differences when using PyCharm on a Mac.

  1. Open cmd (or terminal for Mac) and pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01 using the following command:
    docker pull amazon/aws-glue-libs:glue_libs_1.0.0_image_01

    If you’re running Docker on Windows, choose the Docker icon (right-click) and choose Switch to Linux containers… before pulling the image.

  2. Choose the Docker icon (right-click) and choose Settings (this step isn’t required for Mac or Linux).
  3. In the General section, select Expose daemon on tcp://localhost:2375 without TLS (this step isn’t required for Mac or Linux). Note the warning listed under the checkbox. This step is based on PyCharm documentation.
  4. Choose Apply & Restart (this step isn’t required for Mac or Linux).
  5. Choose the Docker icon (right-click) and choose Restart… if the Docker doesn’t restart automatically (this step isn’t required for Mac or Linux).
  6. Open PyCharm and create a Pure Python project (if you don’t have one).
  7. Under File, choose Settings… (for Mac, under PyCharm, choose Preferences).
  8. Under Settings, choose Project Interpreter. In the following screenshot, GlueProject is the name of my project. Your project name might be different.
  9. Choose Show All… from the drop-down menu.
  10. Choose the + icon.

  11. Choose Docker.
  12. Choose New.
  13. For Name, enter a name (for example, Docker-Glue).
  14. Keep other settings at their default.
  15. If running on Windows, for Connect to Docker daemon with, select TCP socket and enter the Engine API URL.
    For this post, we enter tcp://localhost:2375 because Docker and PyCharm are on the same Windows machine.
    If running on a Mac, select Docker for Mac. No API URL is required.
  16. Make sure you see the message Connection successful.

For Windows, if you don’t see this message, Docker may not have restarted after you changed the settings in Step 4. Restart the Docker and repeat these steps again. For more information about connection settings, see PyCharm documentation.

The following screenshots show steps 13-16 in Windows and Mac.

  1. Choose OK.

You should now see the image listed in the drop-down menu.

  1. Choose the image that you pulled from Docker Hub (amazon/aws-glue-libs:glue_libs_1.0.0_image_01).
  2. Choose OK.

You now see the interpreter listed.

  1. Choose OK.

This lists all the packages in the image.

  1. Choose OK.

Steps 22-27 help you get AWS Glue-related code completion suggestions from PyCharm.

  1. Download the following file: https://s3.amazonaws.com/aws-glue-jes-prod-us-east-1-assets/etl-1.0/python/PyGlue.zip.
  2. Under File, choose Settings (for Mac, under PyCharm, choose Preferences).
  3. Under Project: <Project name>, choose Project Structure.
  4. Choose Add Content Root.
  5. Choose the newly downloaded PyGlue.zip file.
  6. In the Settings window, choose OK.
  7. Choose the project (right-click) and choose New, Python File.
  8. Enter a name for the Python file and press Enter.
  9. Enter the following code in the file and save it. For more information about the minimum permissions required to run this code, see this section.
    from pyspark import SparkContext
    from awsglue.context import GlueContext
    
    glueContext = GlueContext(SparkContext.getOrCreate()) 
    inputDF = glueContext.create_dynamic_frame_from_options(connection_type = "s3", connection_options = {"paths": ["s3://awsglue-datasets/examples/us-legislators/all/memberships.json"]}, format = "json")
    inputDF.toDF().show()
    

  10. Choose Add Configuration.
  11. Choose the +icon.
  12. Under Add New Configuration, choose Python.
  13. For Name, enter a name.
  14. For Environment variables, enter the following:
    PYTHONPATH=/home/aws-glue-libs/awsglue.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/lib/pyspark.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python/lib/py4j-0.10.7-src.zip:/home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/python

  15. For Script path, select the newly created script in Step 29.
  16. For Python interpreter, choose the newly created interpreter.
  17. Choose Docker Container Settings.
  18. Under Volume bindings, choose the +icon.
  19. For Host path, add the absolute path .aws folder that holds the credentials and the config files.
  20. For Container path, add /root/.aws.
  21. Choose OK.
  22. For Run/Debug Configurations, choose OK.
  23. Run the code by choosing the green button on the top right.

You can also see the databases in your AWS Glue Data Catalog using the following code. For more information about the minimum permissions required to run this code, see this section.

spark.sql("show databases").show()

Similarly, you can also query the catalog tables. If your host is Amazon EC2 instance, you see the catalog of the Region of your EC2 instance. If your host is local, you see the catalog of the Region set in your aws configure or your AWS_REGION variable.

PyCharm gives code completion suggestions for AWS Glue (see the following screenshot). This is possible because of the steps you completed earlier.

Running against the CLI interpreter

You can always run the bash shell on the container and run your PySpark code directly against the CLI interpreter in the container.

  1. Complete Pulling the image from Docker Hub step and Running the container step in the section Setting up the container to use Jupyter of Zeppelin notebooks.
  2. Run the bash shell on the container by entering the following code. Replace <container_name> with the name (--name argument) you used earlier.
    docker exec -it <container_name> bash

  3. Run one of the following commands:
    1. For PySpark, enter the following code:
      /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/pyspark

    2. For Scala, enter the following code:
      /home/spark-2.4.3-bin-spark-2.4.3-bin-hadoop2.8/bin/spark-shell

Conclusion

In this post, we learned about a three-step process to get started on AWS Glue and Jupyter or Zeppelin notebook. Although notebooks are a great way to get started and a great asset to data scientists and data wranglers, data engineers generally have a source control repository, an IDE, and a well-defined CI/CD process. Because PyCharm is a widely used IDE for PySpark development, we showed how to use the image with PyCharm Professional. You can develop your code locally in your IDE and test it locally using the container, and your CI/CD process can run as it does with any other IDE and source control tool in your organization. Although we showed integration with PyCharm, you can similarly integrate the container with any IDE that you use to complete your CI/CD story with AWS Glue.


Appendix

The following section discusses various ways to set the credentials to access AWS resources (such as Amazon Simple Storage Service (Amazon S3), AWS Step Functions, and more) from the container.

You need to provide your AWS credentials to connect to an AWS service from the container. The AWS SDKs and CLIs use provider chains to look for AWS credentials in several different places, including system or user environment variables and in local AWS configuration files. For more information about how to set up credentials, see https://docs.aws.amazon.com/sdk-for-java/v2/developer-guide/credentials.html. To generate the credentials using the AWS Management Console, see Managing Access Keys (Console). For instructions on generating credentials with the AWS CLI, see create-access-key. For more information about generating credentials with an API, see CreateAccessKey.

The following flow chart shows the various ways to set up AWS credentials for the container. Most of these mechanisms don’t work with PyCharm because we use the image there and not the container. You can use the container as an SSH interpreter in PyCharm and then use one of the credential setting mechanisms listed here. However, that discussion is out of the scope of this post.

Note that the numbers, in brackets, match the code snippets that follow the chart.

(1) To find more info about the syntax of setting up the tunnel, see this.

(2) To set credentials using the docker cp command to copy credentials from the Windows host to the container, enter the following code (this example code uses the container name glue_jupyter):

docker cp %UserProfile%\.aws\.  glue_jupyter:/root/.aws

(3) To mount the host’s .aws directory on the container with rw option, see this.

(4) To mount the host’s .aws directory on the container with ro option, see this.

(5) To set the credentials in a file, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 --env-file /datalab_pocs/glue_local/env_variables.txt --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

/datalab_pocs/glue_local/env_variables.txt is the absolute path of the file holding the environment variables. The file should have the following variables:

  • AWS_ACCESS_KEY_ID=<Access_id>
  • AWS_SECRET_ACCESS_KEY=<Access_key>
  • AWS_REGION=<Region>

For more information about Regions, see Regions, Availability Zones, and Local Zones.

(6) To set the credentials in the docker run command, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 -e AWS_ACCESS_KEY_ID=<ID> -e AWS_SECRET_ACCESS_KEY=<Key> -e AWS_REGION=<Region>  --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh

(7) To set credentials using aws configure on the container, enter the following code:

docker run -itd -p 8888:8888 -p 4040:4040 --name glue_jupyter amazon/aws-glue-libs:glue_libs_1.0.0_image_01 /home/jupyter/jupyter_start.sh
docker exec -it glue_jupyter bash
aws configure


About the Author

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with the customers on their use cases, architects a solution to solve their business problems and helps the customers build an scalable prototype. Prior to his journey in AWS, Vishal helped customers implement BI, DW and DataLake projects in US and Australia.

 

 

 

Amazon EMR supports Apache Hive ACID transactions

Post Syndicated from Suthan Phillips original https://aws.amazon.com/blogs/big-data/amazon-emr-supports-apache-hive-acid-transactions/

Apache Hive is an open-source data warehouse package that runs on top of an Apache Hadoop cluster. You can use Hive for batch processing and large-scale data analysis. Hive uses Hive Query Language (HiveQL), which is similar to SQL.

ACID (atomicity, consistency, isolation, and durability) properties make sure that the transactions in a database are atomic, consistent, isolated, and reliable.

Amazon EMR 6.1.0 adds support for Hive ACID transactions so it complies with the ACID properties of a database. With this feature, you can run INSERT, UPDATE, DELETE, and MERGE operations in Hive managed tables with data in Amazon Simple Storage Service (Amazon S3). This is a key feature for use cases like streaming ingestion, data restatement, bulk updates using MERGE, and slowly changing dimensions.

This post demonstrates how to enable Hive ACID transactions in Amazon EMR, how to create a Hive transactional table, how it can achieve atomic and isolated operations, and the concepts, best practices, and limitations of using Hive ACID in Amazon EMR.

Enabling Hive ACID in Amazon EMR

To enable Hive ACID as the default for all Hive managed tables in an EMR 6.1.0 cluster, use the following hive-site configuration:

[
   {
      "classification": "hive-site",
      "properties": {
         "hive.support.concurrency": "true",
         "hive.exec.dynamic.partition.mode": "nonstrict",
         "hive.txn.manager": "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager"
      }
   }
]

For the complete list of configuration parameters related to Hive ACID and descriptions of the preceding parameters, see Hive Transactions.

Hive ACID use case

In this section, we explain the Hive ACID transactions with a straightforward use case in Amazon EMR.

Enter the following Hive command in the master node of an EMR cluster (6.1.0 release) and replace <s3-bucket-name> with the bucket name in your account:

hive --hivevar location=<s3-bucket-name> -f s3://aws-bigdata-blog/artifacts/hive-acid-blog/hive_acid_example.hql 

After Hive ACID is enabled on an Amazon EMR cluster, you can run the CREATE TABLE DDLs for Hive transaction tables.

To define a Hive table as transactional, set the table property transactional=true.

The following CREATE TABLE DDL is used in the script that creates a Hive transaction table acid_tbl:

CREATE TABLE acid_tbl (key INT, value STRING, action STRING)
PARTITIONED BY (trans_date DATE)
CLUSTERED BY (key) INTO 3 BUCKETS
STORED AS ORC
LOCATION 's3://${hivevar:location}/acid_tbl' 
TBLPROPERTIES ('transactional'='true');

This script generates three partitions in the provided Amazon S3 path. See the following screenshot.

The first partition, trans_date=2020-08-01, has the data generated as a result of sample INSERT, UPDATE, DELETE, and MERGE statements. We use the second and third partitions when explaining minor and major compactions later in this post.

ACID is achieved in Apache Hive using three types of files: base, delta, and delete_delta. Edits are written in delta and delete_delta files.

The base file is created by the Insert Overwrite Table query or as the result of major compaction over a partition, where all the files are consolidated into a single base_<write id> file, where the write ID is allocated by the Hive transaction manager for every write. This helps achieve isolation of Hive write queries and enables them to run in parallel.

The INSERT operation creates a new delta_<write id>_<write id> directory.

The DELETE operation creates a new delete_delta_<write id>_<write id> directory.

To support deletes, a unique row__id is added to each row on writes. When a DELETE statement runs, the corresponding row__id gets added to the delete_delta_<write id>_<write id> directory, which should be ignored on reads. See the following screenshot.

The UPDATE operation creates a new delta_<write id>_<write id> directory and a delete<write id>_<write id> directory.

The following screenshot shows the second partition in Amazon S3, trans_date=2020-08-02.

A Hive transaction provides snapshot isolation for reads. When an application or query reads the transaction table, it opens all the files of a partition/bucket and returns the records from the last transaction committed.

Hive compactions

With the previously mentioned logic for Hive writes on a transactional table, many small delta and delete_delta files are created, which could adversely impact read performance over time because each read over a particular partition has to open all the files (including delete_delta) to eliminate the deleted rows.

This brings the need for a compaction logic for Hive transactions. In the following sections, we use the same use case to explain minor and major compactions in Hive.

Minor compaction

A minor compaction merges all the delta and delete_delta files within a partition or bucket to a single delta_<start write id>_<end write id> and delete_delta_<start write id>_<end write id> file.

We can trigger the minor compaction manually for the second partition (trans_date=2020-08-02) in Amazon S3 with the following code:

ALTER TABLE acid_tbl PARTITION (trans_date='2020-08-02') COMPACT 'minor';

If you check the same second partition in Amazon S3, after a minor compaction, it looks like the following screenshot.

You can see all the delta and delete_delta files from write ID 0000005–0000009 merged to single delta and delete_delta files, respectively.

Major compaction

A major compaction merges the base, delta, and delete_delta files within a partition or bucket to a single base_<latest write id>. Here the deleted data gets cleaned.

A major compaction is automatically triggered in the third partition (trans_date='2020-08-03') because the default Amazon EMR compaction threshold is met, as described in the next section. See the following screenshot.

To check the progress of compactions, enter the following command:

hive> show compactions;

The following screenshot shows the output.

Compaction in Amazon EMR

Compaction is enabled by default in Amazon EMR 6.1.0. The following property determines the number of concurrent compaction tasks:

  • hive.compactor.worker.threads – Number of worker threads to run in the instance. The default is 1 or vCores/8, whichever is greater.

Automatic compaction is triggered in Amazon EMR 6.1.0 based on the following configuration parameters:

  • hive.compactor.check.interval – Time period in seconds to check if any partition requires compaction. The default is 300 seconds.
  • hive.compactor.delta.num.threshold – Triggers minor compaction when the total number of delta files is greater than this value. The default is 10.
  • hive.compactor.delta.pct.threshold – Triggers major compaction when the total size of delta files is greater than this percentage size of base file. The default is 0.1, or 10%.

Best practices

The following are some best practices when using this feature:

  • Use an external Hive metastore for Hive ACID tables – Our customers use EMR clusters for compute purposes and Amazon S3 as storage for cost-optimization. With this architecture, you can stop the EMR cluster when the Hive jobs are complete. However, if you use a local Hive metastore, the metadata is lost upon stopping the cluster, and the corresponding data in Amazon S3 becomes unusable. To persist the metastore, we strongly recommend using an external Hive metastore like an Amazon RDS for MySQL instance or Amazon Aurora. Also, if you need multiple EMR clusters running ACID transactions (read or write) on the same Hive table, you need to use an external Hive metastore.
  • Use ORC format – Use ORC format to get full ACID support for INSERT, UPDATE, DELETE, and MERGE statements.
  • Partition your data – This technique helps improve performance for large datasets.
  • Enable an EMRFS consistent view if using Amazon S3 as storage – Because you have frequent movement of files in Amazon S3, we recommend using an EMRFS consistent view to mitigate the issues related to the eventual consistency nature of Amazon S3.
  • Use Hive authorization – Because Hive transactional tables are Hive managed tables, to prevent users from deleting data in Amazon S3, we suggest implementing Hive authorization with required privileges for each user.

Limitations

Keep in mind the following limitations of this feature:

  • The AWS Glue Data Catalog doesn’t support Hive ACID transactions.
  • Hive external tables don’t support Hive ACID transactions.
  • Bucketing is optional in Hive 3, but in Amazon EMR 6.1.0 (as of this writing), if the table is partitioned, it needs to be bucketed. You can mitigate this issue in Amazon EMR 6.1.0 using the following bootstrap action:
    --bootstrap-actions '[{"Path":"s3://aws-bigdata-blog/artifacts/hive-acid-blog/make_bucketing_optional_for_hive_acid_EMR_6_1.sh","Name":"Set bucketing as optional for Hive ACID"}]'

Conclusion

This post introduced the Hive ACID feature in EMR 6.1.0 clusters, explained how it works and its concepts with a straightforward use case, described the default behavior of Hive ACID on Amazon EMR, and offered some best practices. Stay tuned for additional updates on new features and further improvements in Apache Hive on Amazon EMR.


About the Authors

Suthan Phillips is a big data architect at AWS. He works with customers to provide them architectural guidance and helps them achieve performance enhancements for complex applications on Amazon EMR. In his spare time, he enjoys hiking and exploring the Pacific Northwest.

 

 

 

 

Chao Gao is a Software Development Engineer at Amazon EMR. He mainly works on Apache Hive project at EMR, and has some in-depth knowledge of distributed database and database internals. In his spare time, he enjoys making roadtrips, visiting all the national parks and traveling around the world.

Zoopla drives KPIs with centralized data using Fivetran ELT for Amazon Redshift

Post Syndicated from Steven Collings original https://aws.amazon.com/blogs/big-data/zoopla-drives-kpis-with-centralized-data-using-fivetran-elt-for-amazon-redshift/

This is a guest post by Steven Collings, Senior Data Consultant at Zoopla

Zoopla is a property website that enables users to find residential or commercial property to buy or rent in the UK and overseas. Since acquiring Property Software Group and Expert Agent, we also offer a backend software that agents can use to build their businesses. Amidst the growth and acquisitions, we needed a way to bring together data from disparate systems to drive key performance indicators (KPIs) for all the Salesforce and NetSuite data we store in Amazon Redshift.

Building a flexible and scalable data warehouse with Amazon Redshift

Amazon Redshift is a fast, fully managed, petabyte-scale data warehouse solution that makes it simple and cost-effective to efficiently analyze all of your data using your existing business intelligence tools. We have used Amazon Redshift as our data warehouse for more than 5 years and have developed deep knowledge of the AWS analytics stack. Amazon Redshift has always performed well for us and integrates with other services we rely on, such as Amazon Simple Storage Service (Amazon S3), AWS Glue, and Microsoft Power BI, among others. Importantly, Amazon Redshift has evolved along with our needs. For example, we adopted Amazon Redshift Spectrum to query data directly in our Amazon S3 data lake so that we can scale efficiently from a cost and performance perspective, and easily combine data in the warehouse and the lake. In general, we’re pleased that AWS has continuously allowed us to scale and move forward.

Complicated custom scripts and disparate data

We had custom-built scripts pulling the data into Amazon Redshift from different places, including NetSuite and Salesforce. These were built by different people, often in different languages, and not documented. Each script required maintenance to keep up with changes to source systems and APIs. We wanted a solution to help us integrate data more quickly and efficiently, using less developer time.

In addition to custom scripts, we were using native connectors from Power BI to shortcut data straight into reports. We were integrating data too high up in the stack to be able to reuse it in the ways that we wanted to. A proliferation of Power BI models was causing data to become siloed, and we ended up with a series of point solutions. We wanted our data centralized in our Amazon Redshift data warehouse so we could ensure its quality, join it together, and create enterprise data models.

We recognized that feeding the data directly into Power BI wasn’t scalable. Power BI has a key role in our data stack for dashboarding and self-service analytics, and we wanted to keep our use of the tool squarely in its sweet spot. We didn’t want to push every piece of fine-grained data into Power BI just so we could use it for a deep-dive analysis. Not only would this approach be expensive, it also had performance implications and reduced the freedom of our analyst community to use the best tool for the job. It made more sense to have that data in Amazon Redshift (as our existing data warehouse solution), a platform that is well suited for running fine-grained, large-scale analyses using whichever tool best suits the use case.

Fivetran for automated data pipelines

We selected Fivetran to ingest the data. Fivetran replicates data from applications, databases, events, and files into Amazon Redshift. Fivetran connectors deploy in minutes, require zero maintenance, and automatically adjust to source changes so our data team can stop worrying about engineering and focus on driving insights. With Fivetran bringing data into Amazon Redshift, we have increased data quality and can easily integrate new datasets.

Freeing up engineering resources

Due to competing priorities for data engineering resources, my team faced a reduced level of support. With Fivetran, we could push ahead and make progress while working with fewer resources. We enabled existing members of the BI team to perform data integration tasks that previously required engineering effort (such as importing new sources, modifying existing sources, implementing data cleansing, and shaping logic) and freed up our data engineers to apply their skill set to value add activities beyond maintaining data pipelines.

We estimate that Fivetran currently does the work of up to one full-time engineer, and we expect that number to increase. We’re interested in adding more sources that aren’t being integrated at the moment (such as campaign performance or customer helpdesk), which will increase the number of engineering hours that Fivetran saves us.

Building out comprehensive KPIs

One of the biggest drivers for bringing on Fivetran was a project that required centralizing NetSuite and Salesforce data for a large KPI project. We had a custom-built Salesforce connector but we didn’t have the skill set on the team to maintain it, and we didn’t want to spend development resources when we could buy it off the shelf.

The project entailed building a KPI overview for the senior leadership team. The weekly dashboard monitors about 40 different KPIs and metrics across Sales, Product, Marketing, Financials, HR, and other departments. It’s constantly available to the senior leadership team and allows them to understand overall business performance and also drill down into areas of concern that require further investigation and analysis. A streamlined version of the dashboard is displayed on screens around the office so that everyone feels informed and connected to our mission.

While some of these KPIs were already available, they were spread around different systems, lived in different reports, or were never even surfaced. If they were attainable, the process was often manual and prone to errors. This has been the key deliverable. It was always in our mind that we didn’t want to build a point solution. We wanted to ensure that all the data we were landing could be leveraged for other purposes, and we wanted to make this data available in a self-service capacity. By providing faster, simpler access to data, we enable quicker, more informed decision-making and open up the next wave of questions as people understand what is possible.

Conclusion

By centralizing data into the existing Amazon Redshift data warehouse, using Fivetran to automate data ingestion, and building dashboards with Power BI we’ve created a consistent and efficient analytics process. It’s saved our team time, and made sure we’re able to continue to deliver valuable insight to our stakeholders.

Learn more about Zoopla, Fivetran and Amazon Redshift.


About the Author

Steven Collings is a Data Consultant (formerly Head of Data) at Zoopla, with 15 years experience of data storage, ETL, data modelling, and reporting & visualisation techniques and technologies.

Fast and predictable performance with serverless compilation using Amazon Redshift

Post Syndicated from Kiran Chinta original https://aws.amazon.com/blogs/big-data/fast-and-predictable-performance-with-serverless-compilation-using-amazon-redshift/

Amazon Redshift is a fast, fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing business intelligence (BI) tools. Customers tell us that they want extremely fast query response times so they can make equally fast decisions.

This post presents the recently launched, massively scalable serverless compilation capability for Amazon Redshift, which can now concurrently compile query segments with additional compute resources at no extra cost. We also share how our customers have enjoyed faster performance (in several cases, twice as fast) because of this new capability.

Amazon Redshift query compilation

When a query is sent to Amazon Redshift, the query processing engine parses it into multiple segments and compiles these segments to produce optimized object files that are processed during query execution. When similar or same queries are sent to Amazon Redshift, the corresponding segments are present in the cluster code compilation cache. Query segments that use already compiled code in the cache run faster because there’s no overhead of query compilation.

You can also accelerate your workloads of one-time and first-time queries, which don’t have query segments compiled in the cache. Depending on the query’s complexity, Amazon Redshift usually compiles those queries within seconds. However, some mission-critical workloads require even faster response time. This is where the massively scalable serverless compilation capability in Amazon Redshift makes a big difference.

Amazon Redshift serverless query compilation

Amazon Redshift breaks down a query into a set of segments, and each segment is a set of operations, such as SCAN or BUILD HASH TABLE. With the launch of the massively scalable serverless compilation capability, Amazon Redshift can now compile the query segments faster and in parallel because the compilation isn’t limited by the specific cluster being used and its available CPU and memory resources.

The Amazon Redshift compilation capability is managed with an external resource that your Amazon Redshift cluster uses based on your workload. During query processing, Amazon Redshift generates query segments and sends the segments that aren’t present in the cluster’s local cache to the external compilation farm to be compiled with massive parallelism. At the time of running the query, the segments are quickly fetched from the compilation service and saved in the cluster’s local cache for future processing. This makes sure that one-time and first-time queries are processed with high performance in a transparent way, without any additional cost.

Design and usage

The massively scalable serverless compilation capabilities benefit you whenever you need query compilation, especially with complex and highly concurrent workloads. The following are some specific use cases where this capability helps:

  • Dashboard applications that require fast query performance experience lower query compilation time, leading to improved user experience.
  • Dynamic one-time queries with new query segments that aren’t present in the code cache can be processed faster.
  • Scheduled ETL or reporting jobs with a strict SLA benefit from lower query compilation times.
  • Highly complex and concurrent workloads run with high performance without impacting the overall cluster performance.
  • Clusters that are resized, upgraded, or paused and resumed use the external code cache. No warmup is needed.

The following diagram illustrates the architecture of the Amazon Redshift serverless compilation.

Compilation improvements

Although the serverless compilation has already been improving query performance significantly since its launch, the Amazon Redshift team is working to further improve its effectiveness and performance. More recently, we announced an unlimited cache size to store compiled objects and increase cache hits across the Amazon Redshift fleet from 99.60% to 99.95%.

The following graph shows the percent cache hit that’s improved beyond the local cache over the releases.

Faster performance

During a standard maintenance window, an Amazon Redshift patch flushes the compilation cache. Before we launched the new compilation capabilities, your cluster’s performance was impacted after being patched during maintenance periods. Now, that performance impact is almost unnoticeable with this feature.

Many Amazon Redshift customers are benefiting from these performance improvements and saving time and cost for their Amazon Redshift environments. In this section, we share the stories of two organizations.

Aptos

Aptos is the largest provider of enterprise software focused exclusively on retail. They use Amazon Redshift to power the analytics solution for retail clients. Jonathan Strohl, a cloud engineer on the Aptos team, shared this anecdote with us:

“Prior to last week’s Redshift maintenance, we sent our clients the typical notification letting them know to expect performance delays the following morning due to the object cache being flushed during the maintenance. However, the morning after the maintenance, a couple of our clients emailed back asking whether the maintenance had actually occurred, because there had been no noticeable delay. The performance delays they had previously noticed were now eliminated due to the serverless compilation recently released by Amazon Redshift. This is the best result we could have hoped for—our clients were unable to tell that a cache-flushing maintenance had even occurred!”

Manthan

Manthan delivers BI, analytics, and artificial intelligence solutions to more than 200 leading retailers across 22 countries. Vijay Chidambaram, Head of Cloud Engineering at Manthan, shared the following with us:

“The normal ETL runtimes are around 90–100 minutes. The ETL runtime would go to around 290 minutes post an upgrade without the serverless compilation feature. That value has come down to about 150 minutes, which is a 2X improvement. Across the clusters, there is no increase in the ETL wall clock runtime compared to normal runtimes on day two and beyond.”

Intentwise

Intentwise is an Amazon Advertising optimization platform that empowers brands, sellers, and agencies with insights, automation, and expertise. They use Amazon Redshift to power the analytics for their SaaS offering. Raghavendra, a Software Architect at Intentwise, shared the following with us:

“The new serverless compilation feature improves the query compilation time by 3x. This makes Amazon Redshift an even more powerful data warehouse for our analytical platform because it continues to innovate to offer better performance and lower costs, all with no efforts on our end.”

Summary

This post explained how the massively scalable serverless compilation capability for Amazon Redshift works and gave examples of the benefits you can expect from the performance improvements. The capability is free and automatically enabled on all new and existing Amazon Redshift clusters.

For more information about Amazon Redshift query planning and workflow, see Query planning and execution workflow. For more information about improving query performance, see Factors affecting query performance.


About the Authors

Kiran Chinta is a Senior Software Development Engineer at Amazon Redshift. He has been working on distributed databases for over 13 years and has focused on high availability, disaster recovery, SQL language features and performance features for on-prem and cloud databases. In his spare time, he enjoys reading and playing various sports.

 

 

 

 

Naresh Chainani is a Senior Software Development Manager at Amazon Redshift. He leads Query Processing, Query Performance, Distributed Systems and Workload Management with a strong team. Naresh is passionate about building high-performance databases to enable customers to gain timely insights and make critical business decisions. In his spare time, Naresh enjoys reading and playing tennis.

 

 

 

 

Maor Kleider is a product and database engineering leader for Amazon Redshift. Maor is passionate about collaborating with customers and partners, learning about their unique big data use cases and making their experience even better. In his spare time, Maor enjoys traveling and exploring new restaurants with his family.

 

 

 

 

Quan Li is a Senior Database Engineer at Amazon Redshift. His focus is enabling customers to deliver maximum business value. Quan is passionate about optimizing high performance analytical databases. During his spare time, he enjoys traveling and experiencing different types of cuisines with his family.

Power data analytics, monitoring, and search use cases with the Open Distro for Elasticsearch SQL Engine on Amazon ES

Post Syndicated from Viraj Phanse original https://aws.amazon.com/blogs/big-data/power-data-analytics-monitoring-and-search-use-cases-with-the-open-distro-for-elasticsearch-sql-engine-on-amazon-es/

Amazon Elasticsearch Service (Amazon ES) is a popular choice for log analytics, search, real-time application monitoring, clickstream analysis, and more. One commonality among these use cases is the need to write and run queries to obtain search results at lightning speed. However, doing so requires expertise in the JSON-based Elasticsearch query domain-specific language (Query DSL). Although Query DSL is powerful, it has a steep learning curve, and wasn’t designed as a human interface to easily create one-time queries and explore user data.

To solve this problem, we provided the Open Distro for Elasticsearch SQL Engine on Amazon ES, which we have been expanding since the initial release. The Structured Query Language (SQL) engine is powered by Open Distro for Elasticsearch, an Apache 2.0 licensed distribution of Elasticsearch. For more information about the Open Distro project, see Open Distro for Elasticsearch. For more information about the SQL engine capabilities, see SQL.

As part of this continued investment, we’re happy to announce new capabilities, including a Kibana-based SQL Workbench and a new SQL CLI that makes it even easier for Amazon ES users to use the Open Distro for Elasticsearch SQL Engine to work with their data.

SQL is the de facto standard for data and analytics and one of the most popular languages among data engineers and data analysts. Introducing SQL in Amazon ES allows you to manifest search results in a tabular format with documents represented as rows, fields as columns, and indexes as table names, respectively, in the WHERE clause. This acts as a straightforward and declarative way to represent complex DSL queries in a readable format. The newly added tools can act as a powerful yet simplified way to extract and analyze data, and can support complex analytics use cases.

Features overview

The following is a brief overview of the features of Open Distro for Elasticsearch SQL Engine on Amazon ES:

  • Query tools
    • SQL Workbench – A comprehensive and integrated visual tool to run on-demand SQL queries, translate SQL into its REST equivalent, and view and save results as text, JSON, JDBC, or CSV. The following screenshot shows a query on the SQL Workbench page.

  • SQL CLI – An interactive, standalone command line tool to run on-demand SQL queries, translate SQL into its REST equivalent, and view and save results as text, JSON, JDBC, or CSV. For following screenshot shows a query on the CLI.

  • Connectors and drivers
    • ODBC driver – The Open Database Connectivity (ODBC) driver enables connecting with business intelligence (BI) applications such as Tableau and exporting data to CSV and JSON.
    • JDBC driver – The Java Database Connectivity (JDBC) driver also allows you to connect with BI applications such as Tableau and export data to CSV and JSON.
  • Query support
    • Basic queries – You can use the SELECT clause, along with FROM, WHERE, GROUP BY, HAVING, ORDER BY, and LIMIT to search and aggregate data.
    • Complex queries – You can perform complex queries such as subquery, join, and union on more than one Elasticsearch index.
    • Metadata queries – You can query basic metadata about Elasticsearch indexes using the SHOW and DESCRIBE commands.
  • Delete support
    • Delete – You can delete all the documents or documents that satisfy predicates in the WHERE clause from search results. However, it doesn’t delete documents from the actual Elasticsearch index.
  • JSON and full-text search support
    • JSON – Support for JSON by following PartiQL specification, a SQL-compatible query language, lets you query semi-structured and nested data for any data format.
    • Full-text search support – Full-text search on millions of documents is possible by letting you specify the full range of search options using SQL commands such as match and score.
  • Functions and operator support
    • Functions and operators – Support for string functions and operators, numeric functions and operators, and date-time functions is possible by enabling fielddata in the document mapping.
  • Settings
    • Settings – You can view, configure, and modify settings to control the behavior of SQL without needing to restart or bounce the Elasticsearch cluster.
  • Interfaces
    • Endpoints – The explain endpoint allows translating SQL into Query DSL, and the cursor helps obtain a paginated response for the SQL query result.
  • Monitoring
    • Monitoring – You can obtain node-level statistics by using the stats endpoint.
  • Request and response protocols

Conclusion

Open Distro for Elasticsearch SQL Engine on Amazon ES provides a comprehensive, flexible, and user-friendly set of features to obtain search results from Amazon ES in a declarative manner using SQL. For more information about querying with SQL, see SQL Support for Amazon Elasticsearch Service.

 


About the Author

Viraj Phanse (@vrphanse) is a product management leader at Amazon Web Services for Search Services/Analytics. Prior to AWS, he was in product management/strategy and go-to-market leadership roles at Oracle, Aerospike, INSZoom and Persistent Systems. He is a Fellow and Selection Committee member at Berkeley Angel Network, and a Big Data Advisory Board Member at San Francisco State University. He has completed his M.S. in Computer Science from UCLA and MBA from UC Berkeley’s Haas School of Business.

 

 

How Aruba Networks built a cost analysis solution using AWS Glue, Amazon Redshift, and Amazon QuickSight

Post Syndicated from Siddharth Thacker original https://aws.amazon.com/blogs/big-data/how-aruba-networks-built-a-cost-analysis-solution-using-aws-glue-amazon-redshift-and-amazon-quicksight/

This is a guest post co-written by Siddharth Thacker and Swatishree Sahu from Aruba Networks.

Aruba Networks is a Silicon Valley company based in Santa Clara that was founded in 2002 by Keerti Melkote and Pankaj Manglik. Aruba is the industry leader in wired, wireless, and network security solutions. Hewlett-Packard acquired Aruba in 2015, making it a wireless networking subsidiary with a wide range of next-generation network access solutions.

Aruba Networks provides cloud-based platform called Aruba Central for network management and AI Ops. Aruba cloud platform supports thousands of workloads to support customer facing production environment and also a separate development platform for Aruba engineering.

The motivation to build the solution presented in this post was to understand the unit economics of the AWS resources used by multiple product lines across different organization pillars. Aruba wanted a faster, effective, and reliable way to analyze cost and usage data and visualize that into a dashboard. This solution has helped Aruba in multiple ways, including:

  • Visibility into costs – Multiple Aruba teams can now analyze the cost of their application via data surfaced with this solution
  • Cost optimization – The solution helps teams identify new cost-optimization opportunities by making them aware of the higher-cost resources with low utilization so they can optimize accordingly
  • Cost management – The Cloud DevOps organization, the group who built this solution, can effectively plan at the application level and have a direct positive impact on gross margins
  • Cost savings – With daily cost data available, engineers can see the monetary impact of right-sizing compute and other AWS resources almost immediately
  • Big picture as well as granular – Users can visualize cost data from the top down and track cost at a business level and a specific resource level

Overview of the solution

This post describes how Aruba Networks automated the solution, from generating the AWS Cost & Usage Report (AWS CUR) to its final visualization on Amazon QuickSight. In this solution, they start by configuring the CUR on their primary payer account, which publishes the billing reports to an Amazon Simple Storage Service (Amazon S3) bucket. Then they use an AWS Glue crawler to define and catalog the CUR data. As the new CUR data is delivered daily, the data catalog is updated, and the data is loaded into an Amazon Redshift database using Amazon Redshift Spectrum and SQL. The reporting and visualization layer is built using QuickSight. Finally, the entire pipeline is automated by using AWS Data Pipeline.

The following diagram illustrates this architecture.

Aruba prefers the AWS CUR Report to AWS Cost Explorer because AWS Cost Explorer provides usage information at a high level, and not enough granularity for detailed operations, such as data transfer cost. AWS CUR provides the most detailed information available about your AWS costs and usage at an hourly granularity. This allows the Aruba team to drill down the costs by the hour or day, product or product resource, or custom tags, enabling them to achieve their goals.

Aruba implemented the solution with the following steps:

  1. Set up the CUR delivery to a primary S3 bucket from the billing dashboard.
  2. Use Amazon S3 replication to copy the primary payer S3 bucket to the analytics bucket. Having a separate analytics account helps prevent direct access to the primary account.
  3. Create and schedule the crawler to crawl the CUR data. This is required to make the metadata available in the Data Catalog and update it quickly when new data arrives.
  4. Create respective Amazon Redshift schema and tables.
  5. Orchestrate an ETL flow to load data to Amazon Redshift using Data Pipeline.
  6. Create and publish dashboards using QuickSight for executives and stakeholders.

Insights generated

The Aruba DevOps team built various reports that provide the cost classifications on AWS services, weekly cost by applications, cost by product, infrastructure, resource type, and much more using the detailed CUR data as shown by the following screenshot.

For example, using the following screenshot, Aruba can conveniently figure out that compute cost is the biggest contributor compared to other costs. To reduce the cost, they can consider using various cost-optimization methods like buying reserved instances, savings plans, or Spot Instances wherever applicable.

Similarly, the following screenshot highlights the cost doubled compared to the first week of April. This helps Aruba to identify anomalies quickly and make informed decisions.

Setting up the CUR delivery

For instructions on setting up a CUR, see Creating Cost and Usage Reports.

To reduce complexity in the workflow, Aruba chose to create resources in the same region with hourly granularity, mainly to see metrics more frequently.

To lower the storage costs for data files and maximize the effectiveness of querying data with serverless technologies like Amazon Athena, Amazon Redshift Spectrum, and Amazon S3 data lake, save the CUR in Parquet format. The following screenshot shows the configuration for delivery options.

The following table shows some example CUR data.

bill_payer_account_idline_item_usage_account_idline_item_usage_start_dateline_item_usage_end_dateline_item_product_codeline_item_usage_typeline_item_operation
12345678911122233344400:00.000:00.0AmazonEC2USW2-EBS:VolumeP-IOPS.piopsCreateVolume-P-IOPS
12345678911122233344400:00.000:00.0AmazonEC2USW2-APN1-AWS-In-BytesLoadBalancing-PublicIP-In
12345678911122233344400:00.000:00.0AmazonEC2USW2-DataProcessing-BytesLoadBalancing
12345678911122233344400:00.000:00.0AmazonEC2USW2-EBS:SnapshotUsageCreateSnapshot
12345678955566677788800:00.000:00.0AmazonEC2USW2-EBS:SnapshotUsageCreateSnapshot
12345678955566677788800:00.000:00.0AmazonEC2USW2-EBS:SnapshotUsageCreateSnapshot
12345678955566677788800:00.000:00.0AmazonEC2USW2-DataTransfer-Regional-BytesInterZone-In
12345678955566677788800:00.000:00.0AmazonS3USW2-Requests-Tier2ReadLocation
12345678955566677788800:00.000:00.0AmazonEC2USW2-DataTransfer-Regional-BytesInterZone-In

Replicating the CUR data to your analytics account

For security purposes, other teams aren’t allowed to access the primary (payer) account, and therefore can’t access CUR data generated from that account. Aruba replicated the data to their analytics account and build the cost analysis solution there. Other teams can access the cost data without getting access permission for the primary account. The data is replicated across accounts by adding an Amazon S3 replication rule in the bucket. For more information, see Adding a replication rule when the destination bucket is in a different AWS account.

Cataloging the data with a crawler and scheduling it to run daily

Because AWS delivers all daily reports in a report date range report-prefix/report-name/yyyymmdd-yyyymmdd folder, Aruba uses AWS Glue crawlers to crawl through the data and update the catalog.

AWS Glue is a fully managed ETL service that makes it easy to prepare and load the data for analytics. Once the AWS Glue is pointed to the data stored on AWS, it discovers the data and stores the associated metadata (such as table definition and schema) in the Data Catalog. After the data is cataloged, the data is immediately searchable, queryable, and available for ETL. For more information, see Populating the AWS Glue Data Catalog.

The following screenshot shows the crawler created on Amazon S3 location of the CUR data.

The following code is an example table definition populated by the crawler.:

CREATE EXTERNAL TABLE `cur_parquet`(
  `identity_line_item_id` string, 
  `identity_time_interval` string, 
  `bill_invoice_id` string, 
………
………
  `resource_tags_user_infra_role` string)

PARTITIONED BY ( 
  `year` string, 
  `month` string )

ROW FORMAT SERDE  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://curS3bucket/Parquet/'

Transforming and loading using Amazon Redshift

Next in the analytics service, Aruba chose Amazon Redshift over Athena. Aruba has a use case to integrate cost data together with other tables already present in Amazon Redshift and hence using the same service makes it easy to integrate with their existing data. To further filter and transform data at the same time, and simplify the multi-step ETL, Aruba chose Amazon Redshift Spectrum. It helps to efficiently query and load CUR data from Amazon S3. For more information, see Getting started with Amazon Redshift Spectrum.

Use the following query to create an external schema and map it to the AWS Glue database created earlier in the Data Catalog:

--Choose a schema name of your choice, cur_redshift_external_schema name is just an example--
 create external schema cur_redshift_spectrum_external_schema from data catalog database 
 'aruba_curr_db' iam_role 'arn:aws:iam::xxxxxxxxxxxxx:role/redshiftclusterrole' 
 create external database if not exists;

The table created in the Data Catalog appears under the Amazon Redshift Spectrum schema. The schema, table, and records created can be verified with the following SQL code:

SELECT Count(*) 
FROM   cur_redshift_spectrum_external_schema.<TABLE>; 

--Query the right partition, year=2020 and month=2 is used an example
SELECT Count(*) 
FROM   cur_redshift_spectrum_external_schema.<TABLE> 
WHERE  year=2020 
AND    month=2;

Next, transform and load the data into the Amazon Redshift table. Aruba started by creating an Amazon Redshift table to contain the data. The following SQL code can be used to create the production table with the desired columns:

CREATE TABLE redshift_schema.redshift_table 
  ( 
     usage_start_date TIMESTAMP, 
     usage_end_date   TIMESTAMP, 
     service_region   VARCHAR (256), 
     service_az       VARCHAR (256), 
     aws_resource_id  VARCHAR (256), 
     usage_amount     FLOAT (17), 
     charge_currency  VARCHAR (256), 
     aws_product_name VARCHAR (256), 
     instance_family  VARCHAR (256), 
     instance_type    VARCHAR (256), 
     unblended_cost   FLOAT (17), 
     usage_cost       FLOAT (17)
  ); 

CUR is dynamic in nature, which means that some columns may appear or disappear with each update. When creating the table, we take static columns only. For more information, see Line item details.

Next, insert and update to ingest the data from Amazon S3 to the Amazon Redshift table. Each CUR update is cumulative, which means that each version of the CUR includes all the line items and information from the previous version.

The reports generated throughout the month are estimated and subject to change during the rest of the month. AWS finalizes the report at the end of each month. Finalized reports have the calculations for the blended and unblended costs, and cover all the usage for the month. For this use case, Aruba updates the last 45 days of data to make sure the finalized cost is captured. The below sample query can be used to verify the updated data:

-- Create Table Statement
 INSERT INTO redshift_schema.redshift_table
            (usage_start_date, 
             usage_end_date, 
             service_region, 
             service_az, 
             aws_resource_id, 
             usage_amount, 
             charge_currency, 
             aws_product_name, 
             instance_family, 
             instance_type, 
             unblended_cost,
             Usage_Cost ) 
 SELECT line_item_usage_start_date, 
       line_item_usage_end_date, 
       line_item_operation, 
       line_item_availability_zone, 
       line_item_resource_id, 
       line_item_usage_amount, 
       line_item_currency_code, 
       product_product_name, 
       product_instance_family, 
       product_instance_type, 
       line_item_unblended_cost,
       case when line_item_type='Usage' then line_item_unblended_cost
            else 0
            end as usage_cost 
 FROM   cur_redshift_external_schema.cur_parquet_parquet
 WHERE  line_item_usage_start_date >= date_add('day', -45, getdate()) 
       AND line_item_usage_start_date < date_add('day', 1, getdate()); 

Using Data Pipeline to orchestrate the ETL workflow

To automate this ETL workflow, Aruba chose Data Pipeline. Data Pipeline helps to reliably process and move data between different AWS compute and storage services, as well as on-premises data sources. With Data Pipeline, Aruba can regularly access their data where it’s stored, transform and process it at scale, and efficiently transfer the results to AWS services such as Amazon S3, Amazon Relational Database Service (Amazon RDS), Amazon DynamoDB, and Amazon EMR. Although the detailed steps of setting up this pipeline are out of scope for this blog, there is a sample workflow definition JSON file, which can be imported after making the necessary changes.

Data Pipeline workflow

The following screenshot shows the multi-step ETL workflow using Data Pipeline. Data Pipeline is used to run the INSERT query daily, which inserts and updates the latest CUR data into our Amazon Redshift table from the external table.

In order to copy data to Amazon Redshift,  RedshiftDataNode and RedshiftCopyActivity can be used, and then scheduled to run periodically.

Sharing metrics and creating visuals with QuickSight

To share the cost and usage with other teams, Aruba choose QuickSight using Amazon Redshift as the data source. QuickSight is a native AWS service that seamlessly integrates with other AWS services such as Amazon Redshift, Athena, Amazon S3, and many other data sources.

As a fully managed service, QuickSight lets Aruba to easily create and publish interactive dashboards that include ML Insights. In addition to building powerful visualizations, QuickSight provides data preparation tools that makes it easy to filter and transform the data into the exact needed dataset. As a cloud-native service, dashboards can be accessed from any device and embedded into applications and portals, allowing other teams to monitor their resource usage easily. For more information about creating a dataset, see Creating a Dataset from a Database. Quicksight Visuals can then be created from this dataset.

The following screenshot shows a visual comparison of device cost and count to help find the cost per device. This visual helped Aruba quickly identify the cost per device increase in April and take necessary actions.

Similarly, the following visualization helped Aruba identify an increase in data transfer cost and helped them decide to invest in rearchitecting their application.

The following visualization classifies the cost spend per resource.

Conclusion

In this post, we discussed how Aruba Networks was able to successfully achieve the following:

  • Generate CUR and use AWS Glue to define data, catalog the data, and update the metadata
  • Use Amazon Redshift Spectrum to transform and load the data to Amazon Redshift tables
  • Query, visualize, and share the data stored using QuickSight
  • Automate and orchestrate the entire solution using Data Pipeline

Aruba use this solution to automatically generate a daily cost report and share it with their stakeholders, including executives and cloud operations team.

 


About the Authors

Siddharth Thacker works in Business & Finance Strategy in Cloud Software division at Aruba Networks. Siddharth has Master’s in Finance with experience in industries like banking, investment management, cloud software and focuses on business analytics, margin improvement and strategic partnerships at Aruba. In his spare time, he likes exploring outdoors and participate in team sports.

Swatishree Sahu is a Technical Data Analyst at Aruba Networks. She has lived and worked in India for 7 years as an SME for SOA-based integration tools before coming to US to pursue her master’s in Business Analytics from UT Dallas. Breaking down and analyzing data is her passion. She is a Star Wars geek, and in her free time, she loves gardening, painting, and traveling.

Ritesh Chaman is a Technical Account Manager at Amazon Web Services. With 10 years of experience in the IT industry, Ritesh has a strong background in Data Analytics, Data Management, and Big Data systems. In his spare time, he loves cooking (spicy Indian food), watching sci-fi movies, and playing sports.

 

 

 

Kunal Ghosh is a Solutions Architect at AWS. His passion is to build efficient and effective solutions on the cloud, especially involving Analytics, AI, Data Science, and Machine Learning. Besides family time, he likes reading and watching movies, and is a foodie.

Build a self-service environment for each line of business using Amazon EMR and AWS Service Catalog

Post Syndicated from Tanzir Musabbir original https://aws.amazon.com/blogs/big-data/build-a-self-service-environment-for-each-line-of-business-using-amazon-emr-and-aws-service-catalog/

Enterprises often want to centralize governance and compliance requirements, and provide a common set of policies on how Amazon EMR instances should be set up. You can use AWS Service Catalog to centrally manage commonly deployed Amazon EMR cluster configurations, and this helps you achieve consistent governance and meet your compliance requirements, while at the same time enabling your end users to quickly deploy only the approved EMR cluster configurations on a self-service basis.

In this post, we will demonstrate how enterprise administrators can use AWS Service Catalog to create and manage catalogs, that data engineers and data scientists use to quickly discover and deploy clusters using a self-service environment. With AWS Service Catalog you can control which EMR release versions are available, cluster configuration, and permission access by individual, group, department, or cost center.

The following are a few key AWS Service Catalog concepts:

  • An AWS Service Catalog product is a blueprint for building the AWS resources that you want available for deployment. You create your products by importing AWS CloudFormation templates.
  • A portfolio is a collection of products. With AWS Service Catalog, you can create a customized portfolio for each type of user in your organization and selectively grant access to the appropriate portfolio.
  • A provisioned product is a collection of resources that result from instantiating an AWS CloudFormation

Use cases

You can use AWS Service Catalog to provide Amazon EMR as a self-serve Extract, Transform, Load (ETL) platform at scale while hiding all the security and network configurations from end users.

As an administrator in AWS Service Catalog, you can create one or more Service Catalog products that define different configurations to be used for EMR clusters. In those Service Catalog products, you can define the security and network configurations to be used for the EMR cluster, you can define auto-scaling rules, instance configurations, different purchase options, or you can preconfigure EMR to run different EMR Step jobs. On the other hand, as a user in Service Catalog, you can browse through different EMR templates through Service Catalog products and provision the product based on your requirement. By following this approach, you can make your EMR usage self-serviceable, reduce the EMR learning curve for your users, and ensure adherence to security standards and best practices.

The following image illustrates how the interactions look between Amazon EMR administrators and end-users when using AWS Service Catalog to provision EMR clusters.

The use cases in this post have three AWS Identity and Access Management (IAM) users with different access permissions:

  • emr-admin: This user is the administrator and has access to all the resources. This user creates EMR clusters for their end-users based on their requirements.
  • emr-data-engineer: The data engineer uses Spark and Hive most of the time. They run different ETL scripts on Hive and Spark to process, transform, and enrich their datasets.
  • emr-data-analyst: This user is very familiar with SQL and mostly uses Hue to submit queries to Hive.

You can solve several Amazon EMR operational use cases using AWS Service Catalog. The following sections discuss three different use cases. Later in this post, you walk through each of the use cases with a solution.

Use case 1: Ensuring least privilege and appropriate access

The administrator wants to enforce a few organizational standards. The first one is no default EMR_EC2_ROLE for any EMR cluster. Instead, the administrator wants to have a role that has limited access to Amazon Simple Storage Service (Amazon S3) and assigns that role automatically every time an EMR cluster is launched. Second, end-users sometimes forget to add appropriate tags to their resources. Because of that, often times it is hard for the administrator to identify their resources and allocate cost appropriately. So, the administrator wants to have a mechanism that assigns tags to EMR clusters automatically when they launch.

Use case 2: Providing Amazon EMR as a self-serve ETL platform with Spark and Hive

Data engineers use Spark and Hive applications, and they prefer to have a platform where they just submit their jobs without spending time creating the cluster. They also want to try out different Amazon EMR versions to see how their jobs run on different Spark or Hive versions. They don’t want to spend time learning AWS or Amazon EMR. Additionally, the administrator doesn’t want to give full Amazon EMR access to all users.

Use case 3: Automatically scaling the Hive cluster for analysts

Data analysts have strong SQL backgrounds, so they typically use Hue to submit their Hive queries. They run queries against a large dataset, so they want to have an EMR cluster that can scale when needed. They also don’t have access to the Amazon EMR console and don’t know how to configure automatic scaling for Amazon EMR.

Solution overview

Service Catalog, self-serve your Amazon EMR users, enforce best practices and compliance, and speed up the adoption process.

At a high level, the solution includes the following steps:

  1. Configuring the AWS environment to run this solution.
  2. Creating a CloudFormation template.
  3. Setting up AWS Service Catalog products and portfolios.
  4. Managing access to AWS Service Catalog and provisioning products.
  5. Demonstrating the self-service Amazon EMR platform for users.
  6. Enforcing best practices and compliance through AWS Service Catalog.
  7. Executing ETL workloads on Amazon EMR using AWS Service Catalog.
  8. Optionally, setting up AWS Service Catalog and launching Amazon EMR products through the AWS Command Line Interface (AWS CLI).

The following section looks at the CloudFormation template, which you use to set up the AWS environment to run this solution.

Setting up the AWS environment

To set up this solution, you need to create a few AWS resources. The CloudFormation template provided in this post creates all the required AWS resources. This template requires you to pass the following parameters during the launch:

  • A password for your test users.
  • An Amazon Compute Cloud (Amazon EC2) key pair.
  • The latest AMI ID for the EC2 helper instance. This instance configures the environment and sets up the required files and templates for this solution.

This template is designed only to show how you can use Amazon EMR with AWS Service Catalog. This setup isn’t intended for production use without modification.

To launch the CloudFormation stack, choose Launch Stack:

Launching this stack creates several AWS resources. The following resources shown in the AWS CloudFormation output are the ones you need in the next step:

KeyDescription
ConsoleLoginURLURL you use to switch between multiple users
EMRSCBlogBucketName of the S3 bucket to store blog-related files
UserPasswordPassword to use for all the test users
DataAdminUsernameIAM user name for the administrator user
DataEngineerUsernameIAM user name for the data engineer user
DataAnalystUsernameIAM user name for the data analyst user
HiveScriptURLAmazon S3 path for the Hive script
HiveETLInputParameterPath for the Hive input parameter
HiveETLOutputParameterPath for the Hive output parameter
SparkScriptURLAmazon S3 path for the Spark script
SparkETLInputParameterPath for the Spark input parameter
SparkETLOutputParameterPath for the Spark output parameter

When the CloudFormation template is complete, record the outputs listed on the Outputs tab on the AWS CloudFormation console. See the following screenshot.

(Optional) Configuring the AWS CLI

The AWS CLI is a unified tool to manage your AWS services. In the optional step, you use the AWS CLI to create AWS Service Catalog products and portfolios. Installation of AWS CLI isn’t required for this solution. For instructions on configuring the AWS CLI in your environment, see Configuring the AWS CLI.

Provisioning EMR clusters through AWS Service Catalog

You can create AWS Service Catalog products from the existing CloudFormation template and use those products to provision a variety of EMR clusters. You can create an EMR cluster and consume the cluster’s services without having access to the cluster, which improves the Amazon EMR adoption process.

The following CloudFormation template creates an EMR cluster. This template takes two parameters:

  • Cluster size – You select how many core nodes you want to have in the EMR cluster
  • Compute type – Based on the compute type you choose; the template selects the respective EC2 instance type

As an account administrator, you can define the internal configuration for the EMR cluster. End users are not required to know all the security groups, subnet ID, key pair, and other information. They also don’t need to access the EMR cluster or spend time setting up your cluster. As an administrator, you define a template for the cluster; enforce all the compliance, versions, applications, automatic scaling rules through the CloudFormation template, and expose this template as a product through AWS Service Catalog.

The following section walks you through the solution for each use case.

Use cases walkthrough

The CloudFormation template already configured AWS Service Catalog portfolios and products. You can review these on the AWS Service Catalog console.

  1. Use the ConsoleLoginURL from the AWS CloudFormation console Outputs tab and sign in as an emr-admin user.
  2. On the AWS Service Catalog console, you can see two portfolios for engineers and analysts. In each of those portfolios, you can see two products.

The Data Analysts Stack contains products for the analyst and is assigned to the user emr-data-analyst. The Data Engineering Stack contains products for engineers and is assigned to the emr-data-engineer user. Upon logging in, they can see their respective products and portfolios.

Use case 1: Ensuring least privilege and appropriate access

The cluster administrator creates the least privilege IAM role for their users and associated that role through the Service Catalog product. Similarly, the administrator also assigns appropriate tags for each product. When data engineers or analysts launch an EMR cluster using any of their assigned products, the cluster has the least privilege access and resources are tagged automatically. To confirm this access is in place, complete the following steps:

  1. Sign in to the AWS Management Console as either emr-data-engineer user or emr-data-analyst user.

Your console looks slightly different because the end-user does not manage the products, they just use the product to launch the clusters or execute jobs on the cluster.

  1. Choose Default EMR and provision this product by choosing Launch Product.
  2. For the name of the provisioned product, enter SampleEMR.

The next screen shows a list of allowed parameters your administrator thinks you may need.

  1. Leave all parameters as default.
  2. For the cluster name, enter Sample EMR.
  3. Review all the information and launch the product.

It takes few minutes to spin up the cluster. When the cluster is ready, the status changes to Succeeded. The provision product page also shows you a list of outputs your product owner wants you to see. For example, using output values, your product owner can share Master DNS Address, Resource Manager URL, and Hue URL as shown in the following figure.

To verify if this launched EMR cluster has the expected IAM role and tags, sign in as emr-admin user and go to the AWS EMR Console to review the service role for EC2 instances and tags.

Use case 2: Providing Amazon EMR as a self-serve ETL platform with Spark and Hive

For this use case, data engineers have two different ETL scripts:

  • A Spark script that reads Amazon reviews stored in Amazon S3 and converts them into Parquet before writing back to Amazon S3
  • A Hive script that reads Amazon reviews data from Amazon S3 and finds out the top toys based on customer ratings.

The administrator creates a product to self-serve these users; the product defines the job type and the job parameters. End users selects the job type and passes script, input and output locations.

  1. Sign in as emr-data-engineer.
  2. Select the EMR ETL Engine product.
  3. Choose Launch.

The next page shows if the product has multiple versions. Because the engineer wants to try out two different Amazon EMR versions, the administrator provided both options through the product version. You can launch the EMR cluster with the required version by selecting your preferred product version.

  1. Enter the name of the product.
  2. For this post, select EMR 5.29.0.
  3. Choose Next.

  1. For JobType, choose Spark.
  2. For JobArtifacts, enter the following value (you can get these values from the AWS CloudFormation output):
s3://blog-emr-sc-<account-id>/scripts/spark_converter.py s3://amazon-reviews-pds/tsv/amazon_reviews_us_Toys_v1_00.tsv.gz s3://blog-emr-sc-<account-id>/spark/
  1. Choose Next.

Based on your configuration, an EMR cluster launches. When the cluster is ready, the Spark job runs.

  1. In a different browser, sign in as emr-admin using the ConsoleLoginURL (from the AWS CloudFormation output).

You can see the cluster status, job status, and output path from the Amazon EMR console.

Now, go to Amazon S3 console to check the output path:

The Parquet files are written inside the Spark folder.

  1. To test the Hive job, go back to the first browser where you already signed in as emr-data-engineer.
  2. Choose Provisioned products list.
  3. Choose the product options menu (right-click) and choose Update provisioned product.

  1. On the next page, you can select a different version or the same version.
  2. In the Parameters section, choose Hive.
  3. In the JobArtifacts field, enter the following Hive parameters:
s3://blog-emr-sc-<account-id>/scripts/hive_converter.sql -d INPUT=s3://amazon-reviews-pds/tsv/ -d OUTPUT=s3://blog-emr-sc-<account-id>/hive/
  1. Choose Update.

If you select the same version, AWS Service Catalog compares the old provisioned product with the updated product and only runs the portion that you changed. For this post, I chose the same Amazon EMR version and only updated the job type and parameters. You can see that the same EMR cluster is still there, but on the Steps tab, a new step is executed for Hive.

  1. On the Amazon S3 console using the second browser, verify that a new folder hive is created with data that represents top toys based on Amazon reviews.

To recap, you saw how to use AWS Service Catalog to provide a product to run your ETL jobs. Your data engineers can focus on their ETL scripts and your platform can self-serve them to run their ETL jobs on the EMR cluster.

Use case 3: Automatically scaling the Hive cluster for data analysts

To automatically scale the Hive cluster for data analysts, complete the following steps:

  1. Using the console login URL from the AWS CloudFormation output, and sign in as emr-data-analyst and go to AWS Service Catalog console.

You can see a different set of products for this user.

For this use case, your data analysts want to have an automatically scaling EMR cluster with Hive application. The administrator set up the Auto-scaling EMR product with preconfigured rules.

  1. Choose Auto-scaling EMR.
  2. Enter a provisioned product name.
  3. Select Hive Auto-scaling.
  4. Choose Next.
  5. In the Parameters section, leave the options at their default and enter a cluster name.
  6. Launch the product.

The product owner also provided a client URL (for example, Hue URL) through the product output so business analysts can connect to it.

  1. Sign in as emr-admin and validate if this new cluster is configured with the expected automatic scaling rules.
  2. On the Amazon EMR console, choose the cluster.

You can see the configuration on the Hardware tab.

In this use case, you learned how to use AWS Service Catalog to provide business analyst users a preconfigured, automatically scaled EMR cluster.

(Optional) Setting up AWS Service Catalog for Amazon EMR using AWS CLI

In the previous section, I demonstrated the solution using the AWS Service Catalog console. In the following section, I will show you how you use AWS Service Catalog using the AWS CLI. You can create AWS Service Catalog products and portfolios, assign IAM principals, and launch products.

  1. Create a portfolio named CLI – Stack for the user emr-admin. See the following command:
aws --region us-east-1 servicecatalog create-portfolio --display-name "CLI - Stack" --provider-name "@emr-admin" --description "Sample stack for pre-defined EMR clusters"

You receive a JSON output.

  1. Record the portfolio id port-xxxxxxxx from the output to use later.

The emr-admin user is the provider for this portfolio. The user is created with power user access, so the user can see the full-service catalog console and can manage products and portfolios.

You can associate this portfolio with multiple users. By assigning them to a portfolio, they can use the portfolio, browse through its products, and provision new products. For this use case, you associate a portfolio to emr-admin and the AWS CLI user name (the name of the user that you used to configure your AWS CLI). Make sure to update the portfolio and AWS account ID.

  1. Enter the following code:
aws --region us-east-1 servicecatalog associate-principal-with-portfolio --portfolio-id port-xxxxxxxxxx --principal-type IAM --principal-arn arn:aws:iam::xxxxx:user/emr-admin

aws --region us-east-1 servicecatalog associate-principal-with-portfolio --portfolio-id port-xxxxxxxxxx --principal-type IAM --principal-arn arn:aws:iam::xxxxx:user/<aws-cli-user-name>
  1. To verify the portfolio to the user’s association, enter the following command with the portfolio ID:
aws --region us-east-1 servicecatalog list-principals-for-portfolio --portfolio-id port-xxxxxxxxx

It will list out the associated principals for the above portfolio as shown in this following figure:

The CloudFormation template already copied the Amazon EMR template into your Amazon S3 account at the path s3://blog-emr-sc-<account-id>/products.

  1. To create the product CLI - Sample EMR using that template from Amazon S3, enter the following command:
aws --region us-east-1 servicecatalog create-product --name "CLI - Sample EMR" --owner "@emr-admin" --description "Sample EMR cluster with default" --product-type CLOUD_FORMATION_TEMPLATE --provisioning-artifact-parameters '{"Name": "Initial revision", "Description": "", "Info":{"LoadTemplateFromURL":"https://s3.amazonaws.com/blog-emr-sc-<account-id>/products/sample-cluster.template"},"Type":"CLOUD_FORMATION_TEMPLATE"}'

  1. Record the product ID and provision ID from the JSON output.

You now have a product and a portfolio. A portfolio can have one to many products, and each product can have multiple versions.

  1. To assign the CLI -Sample EMR product to the portfolio you created in Step 1, enter the following command:
aws --region us-east-1 servicecatalog associate-product-with-portfolio --product-id prod-xxxxxx --portfolio-id port-xxxxxx

A launch constraint specifies the IAM role that AWS Service Catalog assumes when an end-user launches a product. With a launch constraint, you can control end-user access to your AWS resources and limit usage.

The CloudFormation template already created the role Blog-SCLaunchRole; create a launch constraint using that IAM role. Use the portfolio and product IDs that you collected from the previous step and your AWS account ID.

  1. To create the launch constraint, enter the following command:
aws --region us-east-1 servicecatalog create-constraint --type LAUNCH --portfolio-id port-xxxxxx --product-id prod-xxxxxx --parameters '{"RoleArn" : "arn:aws:iam::<account-id>:role/Blog-SCLaunchRole"}'

  1. Record the launch constraint ID to use later.

You now have an AWS Service Catalog product that you can use to provision an EMR cluster. The CloudFormation template that you used to create the CLI - Sample EMR product takes three parameters (ClusterName, ComputeRequirements, ClusterSize).

  1. To pass those three parameters as a key value pair, enter the following command (use the product ID and provision ID that you recorded earlier):
aws --region us-east-1 servicecatalog provision-product --product-id prod-xxxxxx --provisioning-artifact-id pa-xxxxx --provisioned-product-name cli-emr --provisioning-parameters Key=ClusterName,Value=cli-emr-cluster Key=ComputeRequirements,Value=CPU Key=ClusterSize,Value=2

  1. Check the provisioned product’s status by using the provisioned product ID:
aws --region us-east-1 servicecatalog describe-provisioned-product --id pp-xxxxx

To recap, in this section you learned how to use AWS Service Catalog CLI to configure AWS Service Catalog products and portfolios, and how to provision an EMR cluster through AWS Service Catalog product.

Cleaning up

To clean up the resources you created, complete the following steps:

  1. Terminate the product that you provisioned in the previous step:
aws --region us-east-1 servicecatalog terminate-provisioned-product --provisioned-product-id pp-xxxxx
  1. Disassociate the product CLI – Sample EMR from the portfolio CLI – Stack:
aws --region us-east-1 servicecatalog disassociate-product-from-portfolio --product-id prod-xxxxx --portfolio-id port-xxxxx
  1. Disassociate IAM principals from the portfolio CLI – Stack:
aws --region us-east-1 servicecatalog disassociate-principal-from-portfolio --portfolio-id port-xxxxx --principal-arn arn:aws:iam::xxxxxx:user/emr-admin

aws --region us-east-1 servicecatalog disassociate-principal-from-portfolio --portfolio-id port-xxxxx --principal-arn arn:aws:iam::xxxxxx:user/<aws-cli-user-name> 
  1. Delete the launch constraint created in the previous step:
aws --region us-east-1 servicecatalog delete-constraint --id cons-xxxxx
  1. Delete the product CLI – Sample EMR:
aws --region us-east-1 servicecatalog delete-product --id prod-xxxxx
  1. Delete the portfolio CLI – Stack:
aws --region us-east-1 servicecatalog delete-portfolio --id port-xxxxx

Cleaning up additional resources

You must also clean up the resources you created with the CloudFormation template.

  1. On the AWS Service Catalog console, choose Provisioned products list.
  2. Terminate each product that you provisioned for these use cases.
  3. Check each of the users and their provisioned products to make sure they’re terminated.
  4. On the Amazon S3 console, empty the bucket blog-emr-sc-<account-id>.
  5. If you are using the AWS CLI, delete the objects in the blog-emr-sc-<account-id> bucket with the following command (make sure you’re running this command on the correct bucket):
aws S3 s3://blog-emr-sc-<account-id> --recursive
  1. If you ran the optional AWS CLI section, make sure you follow the cleanup process mentioned in that section.
  2. On the AWS CloudFormation console or AWS CLI, delete the stack named Blog-EMR-Service-Catalog.

Next steps

To enhance this solution, you can explore the following options:

  • In this post, I enforced resource tagging through AWS CloudFormation. You can also use the AWS Service Catalog TagOptions library to provide a consistent taxonomy and tagging of AWS Service Catalog resources. During a product launch (provisioning), AWS Service Catalog aggregates the associated portfolio and product TagOptions and applies them to the provisioned product.
  • This solution demonstrates the usage of launch constraints and how you can provide limited access to your AWS resources to your users. You can also use template constraints to manage parameters. Template constraints make sure that end-users only have options that you allow them when launching products. This can help you maintain your organization’s compliance requirements.
  • You can integrate AWS Budgets with AWS Service Catalog. By associating AWS Budgets with your products and portfolios, you can track your usage and service costs. You can set a custom budget for each of the portfolios and trigger alerts when your costs exceed your threshold.

Summary

In this post, I showed you how you can simplify your Amazon EMR provisional process using the AWS Service Catalog, how to make Amazon EMR a self-service platform for your end-users, and how you can enforce best practices and compliance to your EMR clusters. You also walked through three different use cases and implemented solutions with AWS Service Catalog. Give this solution a try and share your experience with us!

 


About the Author

Tanzir Musabbir is a Data & Analytics Architect with AWS. At AWS, he works with our customers to provide them architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena & AWS Glue. Tanzir is a big Real Madrid fan and he loves to travel in his free time.

Top 10 performance tuning techniques for Amazon Redshift

Post Syndicated from Matt Scaer original https://aws.amazon.com/blogs/big-data/top-10-performance-tuning-techniques-for-amazon-redshift/

Customers use Amazon Redshift for everything from accelerating existing database environments, to ingesting weblogs for big data analytics. Amazon Redshift is a fully managed, petabyte-scale, massively parallel data warehouse that offers simple operations and high performance. Amazon Redshift provides an open standard JDBC/ODBC driver interface, which allows you to connect your existing business intelligence (BI) tools and reuse existing analytics queries.

Amazon Redshift can run any type of data model, from a production transaction system third-normal-form model to star and snowflake schemas, data vault, or simple flat tables.

This post takes you through the most common performance-related opportunities when adopting Amazon Redshift and gives you concrete guidance on how to optimize each one.

What’s new

This post refreshes the Top 10 post from early 2019. We’re pleased to share the advances we’ve made since then, and want to highlight a few key points.

Query throughput is more important than query concurrency.

Configuring concurrency, like memory management, can be relegated to Amazon Redshift’s internal ML models through Automatic WLM with Query Priorities. On production clusters across the fleet, we see the automated process assigning a much higher number of active statements for certain workloads, while a lower number for other types of use-cases. This is done to maximize throughput, a measure of how much work the Amazon Redshift cluster can do over a period of time. Examples are 300 queries a minute, or 1,500 SQL statements an hour. It’s recommended to focus on increasing throughput over concurrency, because throughput is the metric with much more direct impact on the cluster’s users.

In addition to the optimized Automatic WLM settings to maximize throughput, the concurrency scaling functionality in Amazon Redshift extends the throughput capability of the cluster to up to 10 times greater than what’s delivered with the original cluster. The tenfold increase is a current soft limit, you can reach out to your account team to increase it.

Investing in the Amazon Redshift driver.

AWS now recommends the Amazon Redshift JDBC or ODBC driver for improved performance. Each driver has optional configurations to further tune it for higher or lower number of statements, with either fewer or greater row counts in the result set.

Ease of use by automating all the common DBA tasks.

In 2018, the SET DW “backronym” summarized the key considerations to drive performance (sort key, encoding, table maintenance, distribution, and workload management). Since then, Amazon Redshift has added automation to inform 100% of SET DW, absorbed table maintenance into the service’s (and no longer the user’s) responsibility, and enhanced out-of-the-box performance with smarter default settings. Amazon Redshift Advisor continuously monitors the cluster for additional optimization opportunities, even if the mission of a table changes over time. AWS publishes the benchmark used to quantify Amazon Redshift performance, so anyone can reproduce the results.

Scaling compute separately from storage with RA3 nodes and Amazon Redshift Spectrum.

Although the convenient cluster building blocks of the Dense Compute and Dense Storage nodes continue to be available, you now have a variety of tools to further scale compute and storage separately. Amazon Redshift Managed Storage (the RA3 node family) allows for focusing on using the right amount of compute, without worrying about sizing for storage. Concurrency scaling lets you specify entire additional clusters of compute to be applied dynamically as-needed. Amazon Redshift Spectrum uses the functionally-infinite capacity of Amazon Simple Storage Service (Amazon S3) to support an on-demand compute layer up to 10 times the power of the main cluster, and is now bolstered with materialized view support.

Pause and resume feature to optimize cost of environments

All Amazon Redshift clusters can use the pause and resume feature. For clusters created using On Demand, the per-second grain billing is stopped when the cluster is paused. Reserved Instance clusters can use the pause and resume feature to define access times or freeze a dataset at a point in time.

Tip #1: Precomputing results with Amazon Redshift materializes views

Materialized views can significantly boost query performance for repeated and predictable analytical workloads such as dash-boarding, queries from BI tools, and extract, load, transform (ELT) data processing. Data engineers can easily create and maintain efficient data-processing pipelines with materialized views while seamlessly extending the performance benefits to data analysts and BI tools.

Materialized views are especially useful for queries that are predictable and repeated over and over. Instead of performing resource-intensive queries on large tables, applications can query the pre-computed data stored in the materialized view.

When the data in the base tables changes, you refresh the materialized view by issuing the Amazon Redshift SQL statement “refresh materialized view“. After issuing a refresh statement, your materialized view contains the same data as a regular view. Refreshes can be incremental or full refreshes (recompute). When possible, Amazon Redshift incrementally refreshes data that changed in the base tables since the materialized view was last refreshed.

To demonstrate how it works, we can create an example schema to store sales information, each sale transaction and details about the store where the sales took place.

To view the total amount of sales per city, we create a materialized view with the create materialized view SQL statement (city_sales) joining records from two tables and aggregating sales amount (sum(sales.amount)) per city (group by city):

CREATE MATERIALIZED VIEW city_sales AS 
  (
  SELECT st.city, SUM(sa.amount) as total_sales
  FROM sales sa, store st
  WHERE sa.store_id = st.id
  GROUP BY st.city
  );

Now we can query the materialized view just like a regular view or table and issue statements like “SELECT city, total_sales FROM city_sales” to get the following results. The join between the two tables and the aggregate (sum and group by) are already computed, resulting in significantly less data to scan.

When the data in the underlying base tables changes, the materialized view doesn’t automatically reflect those changes. You can refresh the data stored in the materialized view on demand with the latest changes from the base tables using the SQL refresh materialized view command. For example, see the following code:

!-- let's add a row in the sales base table

INSERT INTO sales (id, item, store_id, customer_id, amount) 
VALUES(8, 'Gaming PC Super ProXXL', 1, 1, 3000);

SELECT city, total_sales FROM city_sales WHERE city = 'Paris'

|city |total_sales|
|-----|-----------|
|Paris|        690|

!-- the new sale is not taken into account !!
-- let's refresh the materialized view
REFRESH MATERIALIZED VIEW city_sales;

SELECT city, total_sales FROM city_sales WHERE city = 'Paris'

|city |total_sales|
|-----|-----------|
|Paris|       3690|

!-- now the view has the latest sales data

The full code for this use case is available as a very simple demo is available as a gist in GitHub.

You can also extend the benefits of materialized views to external data in your Amazon S3 data lake and federated data sources. With materialized views, you can easily store and manage the pre-computed results of a SELECT statement referencing both external tables and Amazon Redshift tables. Subsequent queries referencing the materialized views run much faster because they use the pre-computed results stored in Amazon Redshift, instead of accessing the external tables. This also helps you reduce the associated costs of repeatedly accessing the external data sources, because you can only access them when you explicitly refresh the materialized views.

Tip #2: Handling bursts of workload with concurrency scaling and elastic resize

The legacy, on-premises model requires you to estimate what the system will need 3-4 years in the future to make sure you’re leasing enough horsepower at the time of purchase. But the ability to resize a cluster allows for right-sizing your resources as you go. Amazon Redshift extends this ability with elastic resize and concurrency scaling.

Elastic resize lets you quickly increase or decrease the number of compute nodes, doubling or halving the original cluster’s node count, or even change the node type. You can expand the cluster to provide additional processing power to accommodate an expected increase in workload, such as Black Friday for internet shopping, or a championship game for a team’s web business. Choose classic resize when you’re resizing to a configuration that isn’t available through elastic resize. Classic resize is slower but allows you to change the node type or expand beyond the doubling or halving size limitations of an elastic resize. 

Elastic resize completes in minutes and doesn’t require a cluster restart. For anticipated workload spikes that occur on a predictable schedule, you can automate the resize operation using the elastic resize scheduler feature on the Amazon Redshift console, the AWS Command Line Interface (AWS CLI), or API.

Concurrency scaling allows your Amazon Redshift cluster to add capacity dynamically in response to the workload arriving at the cluster.

By default, concurrency scaling is disabled, and you can enable it for any workload management (WLM) queue to scale to a virtually unlimited number of concurrent queries, with consistently fast query performance. You can control the maximum number of concurrency scaling clusters allowed by setting the “max_concurrency_scaling_clusters” parameter value from 1 (default) to 10 (contact support to raise this soft limit). The free billing credits provided for concurrency scaling is often enough and the majority of customers using this feature don’t end up paying extra for it. For more information about the concurrency scaling billing model see Concurrency Scaling pricing.

You can monitor and control the concurrency scaling usage and cost by creating daily, weekly, or monthly usage limits and instruct Amazon Redshift to automatically take action (such as logging, alerting or disabling further usage) if those limits are reached. For more information, see Managing usage limits in Amazon Redshift.

Together, these options open up new ways to right-size the platform to meet demand. Before these options, you needed to size your WLM queue, or even an entire Amazon Redshift cluster, beforehand in anticipation of upcoming peaks.

Tip #3: Using the Amazon Redshift Advisor to minimize administrative work

Amazon Redshift Advisor offers recommendations specific to your Amazon Redshift cluster to help you improve its performance and decrease operating costs.

Advisor bases its recommendations on observations regarding performance statistics or operations data. Advisor develops observations by running tests on your clusters to determine if a test value is within a specified range. If the test result is outside of that range, Advisor generates an observation for your cluster. At the same time, Advisor creates a recommendation about how to bring the observed value back into the best-practice range. Advisor only displays recommendations that can have a significant impact on performance and operations. When Advisor determines that a recommendation has been addressed, it removes it from your recommendation list. In this section, we share some examples of Advisor recommendations:

Distribution key recommendation

Advisor analyzes your cluster’s workload to identify the most appropriate distribution key for the tables that can significantly benefit from a KEY distribution style. Advisor provides ALTER TABLE statements that alter the DISTSTYLE and DISTKEY of a table based on its analysis. To realize a significant performance benefit, make sure to implement all SQL statements within a recommendation group.

The following screenshot shows recommendations regarding distribution keys.

If you don’t see a recommendation, that doesn’t necessarily mean that the current distribution styles are the most appropriate. Advisor doesn’t provide recommendations when there isn’t enough data or the expected benefit of redistribution is small.

Sort key recommendation

Sorting a table on an appropriate sort key can accelerate query performance, especially queries with range-restricted predicates, by requiring fewer table blocks to be read from disk.

Advisor analyzes your cluster’s workload over several days to identify a beneficial sort key for your tables. See the following screenshot.

If you don’t see a recommendation for a table, that doesn’t necessarily mean that the current configuration is the best. Advisor doesn’t provide recommendations when there isn’t enough data or the expected benefit of sorting is small.

Table compression recommendation

Amazon Redshift is optimized to reduce your storage footprint and improve query performance by using compression encodings. When you don’t use compression, data consumes additional space and requires additional disk I/O. Applying compression to large uncompressed columns can have a big impact on your cluster.

The compression analysis in Advisor tracks uncompressed storage allocated to permanent user tables. It reviews storage metadata associated with large uncompressed columns that aren’t sort key columns.

The following screenshot shows an example of table compression recommendation.

Table statistics recommendation

Maintaining current statistics helps complex queries run in the shortest possible time. The Advisor analysis tracks tables whose statistics are out-of-date or missing. It reviews table access metadata associated with complex queries. If tables that are frequently accessed with complex patterns are missing statistics, Amazon Redshift Advisor creates a critical recommendation to run ANALYZE. If tables that are frequently accessed with complex patterns have out-of-date statistics, Advisor creates a suggested recommendation to run ANALYZE.

The following screenshot shows a table statistics recommendation.

Tip #4: Using Auto WLM with priorities to increase throughput

Auto WLM simplifies workload management and maximizes query throughput by using ML to dynamically manage memory and concurrency, which ensures optimal utilization of the cluster resources

Amazon Redshift runs queries using the queuing system (WLM). You can define up to eight queues to separate workloads from each other.

Amazon Redshift Advisor automatically analyzes the current WLM usage and can make recommendations to get more throughput from your cluster. Periodically reviewing the suggestions from Advisor helps you get the best performance.

Query priorities is a feature of Auto WLM that lets you assign priority ranks to different user groups or query groups, to ensure that higher priority workloads get more resources for consistent query performance, even during busy times. It is a good practice to set up query monitoring rules (QMR) to monitor and manage resource intensive or runaway queries. QMR also enables you to dynamically change a query’s priority based on its runtime performance and metrics-based rules you define.

For more information on migrating from manual to automatic WLM with query priorities, see Modifying the WLM configuration.

It’s recommended to take advantage of Amazon Redshift’s short query acceleration (SQA). SQA uses ML to run short-running jobs in their own queue. This keeps small jobs processing, rather than waiting behind longer-running SQL statements. SQA is enabled by default in the default parameter group and for all new parameter groups. You can enable and disable SQA via a check box on the Amazon Redshift console, or by using the Amazon Redshift CLI.

If you enable concurrency scaling, Amazon Redshift can automatically and quickly provision additional clusters should your workload begin to back up. This is an important consideration when deciding the cluster’s WLM configuration.

A common pattern is to optimize the WLM configuration to run most SQL statements without the assistance of supplemental memory, reserving additional processing power for short jobs. Some queueing is acceptable because additional clusters spin up if your needs suddenly expand. To enable concurrency scaling on a WLM queue, set the concurrency scaling mode value to AUTO. You can best inform your decisions by reviewing the concurrency scaling billing model. You can also monitor and control the concurrency scaling usage and cost by using the Amazon Redshift usage limit feature.

In some cases, unless you enable concurrency scaling for the queue, the user or query’s assigned queue may be busy, and you must wait for a queue slot to open. During this time, the system isn’t running the query at all. If this becomes a frequent problem, you may have to increase concurrency.

First, determine if any queries are queuing, using the queuing_queries.sql admin script. Review the maximum concurrency that your cluster needed in the past with wlm_apex.sql, or get an hour-by-hour historical analysis with wlm_apex_hourly.sql. Keep in mind that increasing concurrency allows more queries to run, but each query gets a smaller share of the memory. You may find that by increasing concurrency, some queries must use temporary disk storage to complete, which is also sub-optimal.

Tip #5: Taking advantage of Amazon Redshift data lake integration

Amazon Redshift is tightly integrated with other AWS-native services such as Amazon S3 which let’s the Amazon Redshift cluster interact with the data lake in several useful ways.

Amazon Redshift Spectrum lets you query data directly from files on Amazon S3 through an independent, elastically sized compute layer. Use these patterns independently or apply them together to offload work to the Amazon Redshift Spectrum compute layer, quickly create a transformed or aggregated dataset, or eliminate entire steps in a traditional ETL process.

  • Use the Amazon Redshift Spectrum compute layer to offload workloads from the main cluster, and apply more processing power to the specific SQL statement. Amazon Redshift Spectrum automatically assigns compute power up to approximately 10 times the processing power of the main cluster. This may be an effective way to quickly process large transform or aggregate jobs.
  • Skip the load in an ELT process and run the transform directly against data on Amazon S3. You can run transform logic against partitioned, columnar data on Amazon S3 with an INSERT … SELECT statement. It’s easier than going through the extra work of loading a staging dataset, joining it to other tables, and running a transform against it.
  • Use Amazon Redshift Spectrum to run queries as the data lands in Amazon S3, rather than adding a step to load the data onto the main cluster. This allows for real-time analytics.
  • Land the output of a staging or transformation cluster on Amazon S3 in a partitioned, columnar format. The main or reporting cluster can either query from that Amazon S3 dataset directly or load it via an INSERT … SELECT statement.

Within Amazon Redshift itself, you can export the data into the data lake with the UNLOAD command, or by writing to external tables. Both options export SQL statement output to Amazon S3 in a massively parallel fashion. You can do the following:

  • Using familiar CREATE EXTERNAL TABLE AS SELECT and INSERT INTO SQL commands, create and populate external tables on Amazon S3 for subsequent use by Amazon Redshift or other services participating in the data lake without the need to manually maintain partitions. Materialized views can also cover external tables, further enhancing the accessibility and utility of the data lake.
  • Using the UNLOAD command, Amazon Redshift can export SQL statement output to Amazon S3 in a massively parallel fashion. This technique greatly improves the export performance and lessens the impact of running the data through the leader node. You can compress the exported data on its way off the Amazon Redshift cluster. As the size of the output grows, so does the benefit of using this feature. For writing columnar data to the data lake, UNLOAD can write partition-aware Parquet data.

Tip #6: Improving the efficiency of temporary tables

Amazon Redshift provides temporary tables, which act like normal tables but have a lifetime of a single SQL session. The proper use of temporary tables can significantly improve performance of some ETL operations. Unlike regular permanent tables, data changes made to temporary tables don’t trigger automatic incremental backups to Amazon S3, and they don’t require synchronous block mirroring to store a redundant copy of data on a different compute node. Due to these reasons, data ingestion on temporary tables involves reduced overhead and performs much faster. For transient storage needs like staging tables, temporary tables are ideal.

You can create temporary tables using the CREATE TEMPORARY TABLE syntax, or by issuing a SELECT … INTO #TEMP_TABLE query. The CREATE TABLE statement gives you complete control over the definition of the temporary table. The SELECT … INTO and C(T)TAS commands use the input data to determine column names, sizes and data types, and use default storage properties. Consider default storage properties carefully, because they may cause problems. By default, for temporary tables, Amazon Redshift applies EVEN table distribution with no column encoding (such as RAW compression) for all columns. This data structure is sub-optimal for many types of queries.

If you employ the SELECT…INTO syntax, you can’t set the column encoding, column distribution, or sort keys. The CREATE TABLE AS (CTAS) syntax instead lets you specify a distribution style and sort keys, and Amazon Redshift automatically applies LZO encoding for everything other than sort keys, Booleans, reals, and doubles. You can exert additional control by using the CREATE TABLE syntax rather than CTAS.

If you create temporary tables, remember to convert all SELECT…INTO syntax into the CREATE statement. This ensures that your temporary tables have column encodings and don’t cause distribution errors within your workflow. For example, you may want to convert a statement using this syntax:

SELECT column_a, column_b INTO #my_temp_table FROM my_table;

You need to analyze the temporary table for optimal column encoding:

Master=# analyze compression #my_temp_table;
Table | Column | Encoding
----------------+----------+---------
#my_temp_table | columb_a | lzo
#my_temp_table | columb_b | bytedict
(2 rows)

You can then convert the SELECT INTO a statement to the following:

BEGIN;

CREATE TEMPORARY TABLE my_temp_table(
column_a varchar(128) encode lzo,
column_b char(4) encode bytedict)
distkey (column_a) -- Assuming you intend to join this table on column_a
sortkey (column_b) -- Assuming you are sorting or grouping by column_b
;

INSERT INTO my_temp_table SELECT column_a, column_b FROM my_table;

COMMIT;

If you create a temporary staging table by using a CREATE TABLE LIKE statement, the staging table inherits the distribution key, sort keys, and column encodings from the parent target table. In this case, merge operations that join the staging and target tables on the same distribution key performs faster because the joining rows are collocated. To verify that the query uses a collocated join, run the query with EXPLAIN and check for DS_DIST_NONE on all the joins.

You may also want to analyze statistics on the temporary table, especially when you use it as a join table for subsequent queries. See the following code:

ANALYZE my_temp_table;

With this trick, you retain the functionality of temporary tables but control data placement on the cluster through distribution key assignment. You also take advantage of the columnar nature of Amazon Redshift by using column encoding.

Tip #7: Using QMR and Amazon CloudWatch metrics to drive additional performance improvements

In addition to the Amazon Redshift Advisor recommendations, you can get performance insights through other channels.

The Amazon Redshift cluster continuously and automatically collects query monitoring rules metrics, whether you institute any rules on the cluster or not. This convenient mechanism lets you view attributes like the following:

  • The CPU time for a SQL statement (query_cpu_time)
  • The amount of temporary space a job might ‘spill to disk’ (query_temp_blocks_to_disk)
  • The ratio of the highest number of blocks read over the average (io_skew)

It also makes Amazon Redshift Spectrum metrics available, such as the number of Amazon Redshift Spectrum rows and MBs scanned by a query (spectrum_scan_row_count and spectrum_scan_size_mb, respectively). The Amazon Redshift system view SVL_QUERY_METRICS_SUMMARY shows the maximum values of metrics for completed queries, and STL_QUERY_METRICS and STV_QUERY_METRICS carry the information at 1-second intervals for the completed and running queries respectively.

The Amazon Redshift CloudWatch metrics are data points for use with Amazon CloudWatch monitoring. These can be cluster-wide metrics, such as health status or read/write, IOPS, latency, or throughput. It also offers compute node–level data, such as network transmit/receive throughput and read/write latency. At the WLM queue grain, there are the number of queries completed per second, queue length, and others. CloudWatch facilitates monitoring concurrency scaling usage with the metrics ConcurrencyScalingSeconds and ConcurrencyScalingActiveClusters.

It’s recommended to consider the CloudWatch metrics (and the existing notification infrastructure built around them) before investing time in creating something new. Similarly, the QMR metrics cover most metric use cases and likely eliminate the need to write custom metrics.

Tip #8: Federated queries connect the OLAP, OLTP and data lake worlds

The new Federated Query feature in Amazon Redshift allows you to run analytics directly against live data residing on your OLTP source system databases and Amazon S3 data lake, without the overhead of performing ETL and ingesting source data into Amazon Redshift tables. This feature gives you a convenient and efficient option for providing realtime data visibility on operational reports, as an alternative to micro-ETL batch ingestion of realtime data into the data warehouse. By combining historical trend data from the data warehouse with live developing trends from the source systems, you can gather valuable insights to drive real-time business decision making.

For example, consider sales data residing in three different data stores:

  • Live sales order data stored on an Amazon RDS for PostgreSQL database (represented as “ext_postgres” in the following external schema)
  • Historical sales data warehoused in a local Amazon Redshift database (represented as “local_dwh”)
  • Archived, “cold” sales data older than 5 years stored on Amazon S3 (represented as “ext_spectrum”)

We can create a late binding view in Amazon Redshift that allows you to merge and query data from all three sources. See the following code:

CREATE VIEW store_sales_integrated AS 
SELECT * FROM ext_postgres.store_sales_live 
UNION ALL 
SELECT * FROM local_dwh.store_sales_current 
UNION ALL 
SELECT ss_sold_date_sk, ss_sold_time_sk, ss_item_sk, ss_customer_sk, ss_cdemo_sk, 
ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, ss_ticket_number, ss_quantity, 
ss_wholesale_cost, ss_list_price, ss_sales_price, ss_ext_discount_amt, 
ss_ext_sales_price, ss_ext_wholesale_cost, ss_ext_list_price, ss_ext_tax, 
ss_coupon_amt, ss_net_paid, ss_net_paid_inc_tax, ss_net_profit 
FROM ext_spectrum.store_sales_historical 
WITH NO SCHEMA BINDING
;

Currently, direct federated querying is supported for data stored in Amazon Aurora PostgreSQL and Amazon RDS for PostgreSQL databases, with support for other major RDS engines coming soon. You can also use the federated query feature to simplify the ETL and data-ingestion process. Instead of staging data on Amazon S3, and performing a COPY operation, federated queries allow you to ingest data directly into an Amazon Redshift table in one step, as part of a federated CTAS/INSERT SQL query.

For example, the following code shows an upsert/merge operation in which the COPY operation from Amazon S3 to Amazon Redshift is replaced with a federated query sourced directly from PostgreSQL:

BEGIN;

CREATE TEMP TABLE staging (LIKE ods.store_sales);

-- replace the following COPY from S3: 
   /*COPY staging FROM 's3://yourETLbucket/daily_store_sales/' 
   IAM_ROLE 'arn:aws:iam::<account_id>:role/<s3_reader_role>' 
   DELIMITER '|' COMPUPDATE OFF; */
      
-- with this federated query to load staging data directly from PostgreSQL source
INSERT INTO staging SELECT * FROM pg.store_sales p
    WHERE p.last_updated_date > (SELECT MAX(last_updated_date) FROM ods.store_sales);

DELETE FROM ods.store_sales USING staging s WHERE ods.store_sales.id = s.id;

INSERT INTO ods.store_sales SELECT * FROM staging;

DROP TABLE staging;

COMMIT;

For more information about setting up the preceding federated queries, see Build a Simplified ETL and Live Data Query Solution using Redshift Federated Query. For additional tips and best practices on federated queries, see Best practices for Amazon Redshift Federated Query.

Tip #9: Maintaining efficient data loads

Amazon Redshift best practices suggest using the COPY command to perform data loads of file-based data. Single-row INSERTs are an anti-pattern. The COPY operation uses all the compute nodes in your cluster to load data in parallel, from sources such as Amazon S3, Amazon DynamoDB, Amazon EMR HDFS file systems, or any SSH connection.

When performing data loads, compress the data files whenever possible. For row-oriented (CSV) data, Amazon Redshift supports both GZIP and LZO compression. It’s more efficient to load a large number of small files than one large one, and the ideal file count is a multiple of the cluster’s total slice count. Columnar data, such as Parquet and ORC, is also supported. You can achieve best performance when the compressed files are between 1MB-1GB each.

The number of slices per node depends on the cluster’s node size (and potentially elastic resize history). By ensuring an equal number of files per slice, you know that the COPY command evenly uses cluster resources and complete as quickly as possible. Query for the cluster’s current slice count with SELECT COUNT(*) AS number_of_slices FROM stv_slices;.

Another script in the amazon-redshift-utils GitHub repo, CopyPerformance, calculates statistics for each load. Amazon Redshift Advisor also warns of missing compression or too few files based on the number of slices (see the following screenshot):

Conducting COPY operations efficiently reduces the time to results for downstream users, and minimizes the cluster resources utilized to perform the load.

Tip #10: Using the latest Amazon Redshift drivers from AWS

Because Amazon Redshift is based on PostgreSQL, we previously recommended using JDBC4 PostgreSQL driver version 8.4.703 and psql ODBC version 9.x drivers. If you’re currently using those drivers, we recommend moving to the new Amazon Redshift–specific drivers. For more information about drivers and configuring connections, see JDBC and ODBC drivers for Amazon Redshift in the Amazon Redshift Cluster Management Guide.

While rarely necessary, the Amazon Redshift drivers do permit some parameter tuning that may be useful in some circumstances. Downstream third-party applications often have their own best practices for driver tuning that may lead to additional performance gains.

For JDBC, consider the following:

  • To avoid client-side out-of-memory errors when retrieving large data sets using JDBC, you can enable your client to fetch data in batches by setting the JDBC fetch size parameter or BlockingRowsMode.
  • Amazon Redshift doesn’t recognize the JDBC maxRows parameter. Instead, specify a LIMIT clause to restrict the result set. You can also use an OFFSET clause to skip to a specific starting point in the result set.

For ODBC, consider the following:

  • A cursor is enabled on the cluster’s leader node when useDelareFecth is enabled. The cursor fetches up to fetchsize/cursorsize and then waits to fetch more rows when the application request more rows.
  • The CURSOR command is an explicit directive that the application uses to manipulate cursor behavior on the leader node. Unlike the JDBC driver, the ODBC driver doesn’t have a BlockingRowsMode mechanism.

It’s recommended that you do not undertake driver tuning unless you have a clear need. AWS Support is available to help on this topic as well.

Conclusion

Amazon Redshift is a powerful, fully managed data warehouse that can offer increased performance and lower cost in the cloud. As Amazon Redshift grows based on the feedback from its tens of thousands of active customers world-wide, it continues to become easier to use and extend its price-for-performance value proposition. Staying abreast of these improvements can help you get more value (with less effort) from this core AWS service.

We hope you learned a great deal about making the most of your Amazon Redshift account with the resources in this post.

If you have questions or suggestions, please leave a comment.

 


About the Authors

Matt Scaer is a Principal Data Warehousing Specialist Solution Architect, with over 20 years of data warehousing experience, with 11+ years at both AWS and Amazon.com.

 

 

 

 

 

Manish Vazirani is an Analytics Specialist Solutions Architect at Amazon Web Services.

 

 

 

 

 

 

Tarun Chaudhary is an Analytics Specialist Solutions Architect at AWS.

Configure and optimize performance of Amazon Athena federation with Amazon Redshift

Post Syndicated from Harsha Tadiparthi original https://aws.amazon.com/blogs/big-data/configure-and-optimize-performance-of-amazon-athena-federation-with-amazon-redshift/

This post provides guidance on how to configure Amazon Athena federation with AWS Lambda and Amazon Redshift, while addressing performance considerations to ensure proper use.

If you use data lakes in Amazon Simple Storage Service (Amazon S3) and use Amazon Redshift as your data warehouse, you may want to integrate the two for a lake house approach. Lake House is the ability to integrate Data Lake and Data warehouse seamlessly. When you need to query your data lake from your Amazon Redshift Data warehouse, you can use Amazon Redshift Spectrum, which works great in unifying your data lake and data warehouse. However, when you use Athena in the data lake and need to access data in Amazon Redshift for the following two scenarios which are commonly seen, there is no easy approach:

  • Team A has a data lake in Amazon S3 and uses Athena. They need access to the data in an Amazon Redshift cluster owned by Team B.
  • Analysts using Athena to query their data lake for analytics need agility and flexibility to access data in an Amazon Redshift data warehouse without moving the data to Amazon S3 Data Lake.

In these scenarios, Athena federation with Amazon Redshift allows you to seamlessly access the data in your Amazon Redshift data warehouse without having to wait to unload the data to the Amazon S3 data lake, which removes the overhead in managing such jobs.

In this post, you walk through a step-by-step configuration to set up Athena federation using Lambda to access data in Amazon Redshift. You also see a performance benchmark analysis of interactive and ad hoc TPC-DS queries, and learn some key performance considerations and best practices when using federation.

Solution overview

Data federation is the capability to integrate data in another data store using a single interface. The following diagram depicts how Athena federation works by using Lambda to integrate with a federated data source.

Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Simply point to your data in Amazon S3, define the schema, and start querying using standard SQL.

Lambda lets you run code without provisioning or managing servers. You can run code for virtually any type of application with zero administration and only pay for when the code is running.

Amazon Redshift is a petabyte-scale data warehouse designed from the ground up, natively for the cloud. Amazon Redshift is the most popular and fastest cloud data warehouse. It’s integrated with your data lake, offers performance up to three times faster than any other data warehouse, and costs up to 75% less than any other cloud data warehouse.

The following diagram depicts all the data source connectors available as of this writing in the AWS Serverless Application Repository.

The AWS Serverless Application Repository is a managed repository for serverless applications. It enables you to store and share reusable applications, and easily assemble and deploy serverless architectures in powerful new ways.

You can also create a custom connector for sources that aren’t in the AWS Serverless Application Repository.

Prerequisites

Before you get started, create a secret for the Amazon Redshift login ID and password using AWS Secrets Manager.

  1. On the Secrets Manager console, choose Secrets.
  2. Choose Store a new secret.
  3. Choose credentials for your Amazon Redshift cluster, and set your user name and password.
  4. Choose the cluster you want to use.
  5. For Secret name, enter a name for your secret. Use the prefix AthenaJDBCFederation so it’s easy to find.
  6. Leave the remaining fields at their defaults and choose Next.
  7. Complete your secret creation.

Setting up your S3 bucket

On the Amazon S3 console, create a new S3 bucket and subfolder for Lambda to use. For this post, use the name myworkspace0009/athenafederation.

Configuring Athena federation with Amazon Redshift

To configure Athena federation with Amazon Redshift, complete the following steps:

  1. On the AWS Serverless Application Repository, choose Available applications.
  2. In the search field, enter athena federation.

  1. Choose
  2. In the Application settings section, provide the following details:
  3. Application nameAthenaRedshiftConnector
  4. SecretNamePrefixAthenaJdbcFederation
  5. SpillBucketmyworkspace0009/athenafederation
  6. JDBCConnectorConfigRedshift://jdbc:Redshift://<YourAmazon Redshift1Hostname>:5439/<DBName>?user=sample2&password=sample2
  7. DisableSpillEncyption – False
  8. LambdaFunctionNamerstpcds30
  9. SecurityGroupID – Security group ID where Amazon Redshift is deployed
  10. SpillPrefix – Leave default
  11. Subnetids – Use the subnets where Amazon Redshift is running with comma separation
  12. Select the I acknowledge check box.
  13. Choose Deploy.

In the next steps, you configure an Amazon Virtual Private Cloud (Amazon VPC) endpoint for Amazon S3 to allow Lambda to write federated query results to Amazon S3.

  1. On the Amazon VPC console, choose Endpoints.
  2. Choose Create endpoint.
  3. Choose the VPC for your endpoint.

  1. Make any necessary security changes as per your security requirements.

  1. Choose Create endpoint.

Running federated queries with Athena

To start running federated queries, complete the following steps:

  1. On the Athena console, choose Workgroups.
  2. If you don’t see a workgroup called AmazonAthenaPreviewFunctionality, create one.

When this feature becomes generally available, you won’t need to use this workgroup name.

  1. Run your queries, using lambda:rstpcds30 to run against tables in Amazon Redshift.

Athena query performance comparison

Several customers have asked us for performance insights and prescriptive guidance on how queries in Athena compare against federated queries and how to use them. In this section, we use a TPC-DS 3 TB standard dataset and a select few queries that fall in the category of ad hoc and interactive. The comparison of their performance should give you an idea of what to expect when running federated queries against Amazon Redshift.

For the following tests, we used a 3 TB TPC-DS dataset in Amazon S3 data lake with Parquet compressed, partitioned and served by Athena, and the same 3 TB TPC-DS dataset on Amazon Redshift cluster running four RA3.4XL nodes.

The following table summarizes the dataset sizes:

DatasetTable Size (Records)
store_sales8.6 billion
customer30 million
customer_address15 million
customer_demographics1.92 million
item360,000
date_dim73,000
store1,350

We ran the following four tests:

  • T1 – Queries ran in Athena without federation. All table data is in Amazon S3.
  • T2 – Queries ran in Athena with federation to Amazon Redshift. All table data is in Amazon S3, except the store_sales fact table in Amazon Redshift.
  • T3 – Queries ran in Athena with federation to Amazon Redshift. All tables and data are in Redshift.
  • T4 – Queries ran in Amazon Redshift without federation. All tables and data are in Redshift.

The following graph represents the performance of some of the ad hoc and interactive TPC-DS queries.

In the preceding graph, all T3 queries timed out at 900 seconds, depicted by the pink reference line, due to the Lambda 900-second timeout limit. This is due to overhead from store_sales fact data that needed to be transferred back to Athena.

The following graph removes T3 from the visualization, which gives better visibility when comparing the other tests.

Notice the query performance between T1 and T2 that completed in almost the same time while T4 queries ran significantly faster.

Amazon Redshift beats the performance of Athena in providing extremely low latency and should be the tool of choice if you’re looking for very low SLAs for analytics queries that Athena can’t achieve.

The following graph shows the data scanned in Amazon S3 for T1 and T2, which outlines why there isn’t much difference in query performance when compared to federated queries.

For the T2 federated queries, a small amount of dimension data is filtered in Amazon Redshift and brought back to Athena, instead of scanning the entire dimension tables. This is a typical nature for several ad hoc and interactive queries.

The performance of these TPC-DS queries between T1 and T2 is comparable because very little data is transferred back to Athena. You can see a similar behavior in several ad hoc and interactive query use cases because they use limited dimensions and scan a small subset of dimension data. Due to the 900-second timeout for the Lambda instances that connect to Amazon Redshift, it’s advised to minimize the amount of data the query brings back. Although Athena uses multiple Lambda instances in parallel to run your federated query, it’s also important to make sure the Amazon Redshift WLM queue has enough slots to process it, thereby not leading to queue wait time. For example, in some of the preceding queries, 20 Lambda executions were connecting to Amazon Redshift concurrently.

Key performance best practice considerations

When considering Athena federation with Amazon Redshift, you could take into account the following best practices:

  • Athena federation works great for queries with predicate filtering because the predicates are pushed down to Amazon Redshift. Use filter and limited-range scans in your queries to avoid full table scans.
  • If your SQL query requires returning a large volume of data from Amazon Redshift to Athena (which could lead to query timeouts or slow performance), unload the large tables in your query from Redshift to your Amazon S3 data lake.
  • Star schema is a commonly used data model in Amazon Redshift. In the star schema model, unload your large fact tables into your data lake and leave the dimension tables in Amazon Redshift. If large dimension tables are contributing to slow performance or query timeouts, unload those tables to your data lake.
  • When you run federated queries, Athena spins up multiple Lambda functions, which causes a spike in database connections. It’s important to monitor the Amazon Redshift WLM queue slots to ensure there is no queuing. Additionally, you can use concurrency scaling on your Amazon Redshift cluster to benefit from concurrent connections to queue up.

Conclusion

In this post, you learned how to configure and use Athena federation with Amazon Redshift using Lambda. Now you don’t need to wait for all the data in your Amazon Redshift data warehouse to be unloaded to Amazon S3 and maintained on a day-to-day basis to run your queries. You can use the best practice considerations outlined in the post to minimize the data transferred from Amazon Redshift for better performance. When queries are well written for federation, the performance penalties are negligible, as observed in the TPC-DS benchmark queries in this post. Happy query federating!

 


About the Author

Harsha Tadiparthi is a Specialist Sr. Solutions Architect, AWS Analytics. He enjoys solving complex customer problems in Databases and Analytics and delivering successful outcomes. Outside of work, he loves to spend time with his family, watch movies, and travel whenever possible.