Tag Archives: AWS Big Data

Keeping your data lake clean and compliant with Amazon Athena

Post Syndicated from David Roberts original https://aws.amazon.com/blogs/big-data/keeping-your-data-lake-clean-and-compliant-with-amazon-athena/

With the introduction of CTAS support for Amazon Athena (see Use CTAS statements with Amazon Athena to reduce cost and improve performance), you can not only query but also create tables using Athena with the associated data objects stored in Amazon Simple Storage Service (Amazon S3). These tables are often temporary in nature and used to filter or aggregate data that already exists in another Athena table. Although this offers great flexibility to perform exploratory analytics, when tables are dropped, the underlying Amazon S3 data remains indefinitely. Over time, the accumulation of these objects can increase Amazon S3 costs, become administratively challenging to manage, and may inadvertently preserve data that should have been deleted for privacy or compliance reasons. Furthermore, the AWS Glue table entry is purged so there is no convenient way to trace back which Amazon S3 path was mapped to a deleted table.

This post shows how you can automate  deleting Amazon S3 objects associated with a table  after dropping it using Athena. AWS Glue is required to be the metadata store for Athena.

Overview of solution

The solution requires that the AWS Glue table record (database, table, Amazon S3 path) history is preserved outside of AWS Glue, because it’s removed immediately  after a table is dropped. Without this record, you can’t delete the associated Amazon S3 object entries after the fact.

When Athena CTAS statements  are issued, AWS Glue generates Amazon CloudWatch events that specify the database and table names. These events are available from Amazon EventBridge and can be used to trigger an AWS Lambda function (autoCleanS3) to fetch the new or updated Amazon S3 path from AWS Glue and write the database, table, and Amazon S3 path into an AWS Glue history table stored in Amazon DynamoDB (GlueHistoryDDB). When Athena drop table queries are detected, CloudWatch events are generated that trigger autoCleanS3 to look up the Amazon S3 path from GlueHistoryDDB and delete all related objects from Amazon S3.

Not all dropped tables should trigger Amazon S3 object deletion. For example, when you create a table using existing Amazon S3 data (not CTAS), it’s not advisable to automatically delete the associated Amazon S3 tables, because other analysts may have other tables referring to the same source data. For this reason, you must include a user-defined comment (--dropstore ) in the Athena drop table query to cause autoCleanS3 to purge the Amazon S3 objects.

Lastly, after objects are successfully deleted, the corresponding entry in GlueHistoryDDB  is updated for historical and audit purposes. The overall workflow is described in the following diagram.

The workflow contains the following steps:

  1. A user creates a table either via Athena or the AWS Glue console or API.
  2. AWS Glue generates a CloudWatch event, and an EventBridge rule triggers the Lambda function.
  3. The function creates an entry in DynamoDB containing a copy of the AWS Glue record and Amazon S3 path.
  4. The user drops the table from Athena, including the special comment --dropstore.
  5. The Lambda function fetches the dropped table entry from DynamoDB, including the Amazon S3 path.
  6. The function deletes data from the Amazon S3 path, including manifest files, and marks the DynamoDB entry as purged.

Walkthrough overview

To implement this solution, we complete the following steps:

  1. Create the required AWS Identity and Access Management (IAM) policy and role.
  2. Create the AWS Glue history DynamoDB table.
  3. Create the Lambda autoCleanS3 function.
  4. Create the EventBridge rules.
  5. Test the solution.

If you prefer to use a preconfigured CloudFormation template, launch one of the following stacks depending on your Region.

Region Launch Button
us-east-1 (N. Virginia)
us-west-2 (Oregon)
eu-west-1 (Ireland)

Prerequisites

Before implementing this solution, create an AWS Glue database and table with the data residing in  Amazon S3. Be sure your user has the necessary permissions to access Athena and  perform CTAS operations writing  in a sample Amazon S3 location.

For more information about building a data lake, see Build a Data Lake Foundation with AWS Glue and Amazon S3.

Creating an IAM policy and role

You need to first create the required IAM policy for the Lambda function role to use to query AWS Glue and write to DynamoDB.

  1. On the IAM console, choose Policies.
  2. Choose Create policy.
  3. On the JSON tab, enter the following code (update the Region, account ID, and S3 bucket accordingly, and the table name GlueHistoryDDB if you choose to change it):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Action": [
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:ListObjectsV2",
                    "s3:DeleteObjectVersion",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::<athena-query-results-s3-bucket>",
                    "arn:aws:s3:::<athena-query-results-s3-bucket>/*"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "dynamodb:PutItem",
                    "dynamodb:Scan",
                    "dynamodb:Query",
                    "dynamodb:UpdateItem"
                ],
                "Resource": [
                    "arn:aws:dynamodb:<region>:<accountId>:table/GlueHistoryDDB"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "glue:GetTable"
                ],
                "Resource": [
                    "arn:aws:glue:<region>:<accountId>:catalog",
                    "arn:aws:glue:<region>:<accountId>:database/*",
                    "arn:aws:glue:<region>:<accountId>:table/*"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "logs:CreateLogGroup"
                ],
                "Resource": [
                    "arn:aws:logs:<region>:<accountId>:*"
                ],
                "Effect": "Allow"
            },
            {
                "Action": [
                    "logs:CreateLogStream",
                    "logs:PutLogEvents",
                    "logs:DescribeLogStreams"
                ],
                "Resource": [
                    "arn:aws:logs:<region>:<accountId>:log-group:/aws/lambda/autoCleanS3:*"
                ],
                "Effect": "Allow"
            }
        ]
    }

  1. Choose Review policy.
  2. For Name, enter autoCleanS3-LambdaPolicy.
  3. For Description, enter Policy used by Lambda role to purge S3 objects when an Amazon Athena table is dropped.
  4. Choose Create policy.

Next, you need to create an IAM role and attach this policy.

  1. On the IAM console, choose Roles.
  2. Choose Create role.
  3. Choose AWS service.
  4. Choose Lambda.
  5. Choose Next: Permissions.

  1. For Filter policies, enter autoCleanS3-LambdaPolicy.
  2. Choose Next: Tags.
  3. Choose Next: Review.
  4. For Role name, enter autoCleanS3-LambdaRole.
  5. For Description, enter Role used by Lambda to purge S3 objects when an Amazon Athena table is dropped.
  6. Choose Create role.

Creating the AWS Glue history DynamoDB table

You use this DynamoDB table to hold the current and historical list of AWS Glue tables and their corresponding Amazon S3 path. Create the table as follows:

  1. On the DynamoDB console, choose Dashboard.
  2. Choose Create table.
  3. For Table name, enter GlueHistoryDDB.
  4. For Partition key, enter database (leave type as String).
  5. Select Add sort key.
  6. Enter table_date (leave type as String).
  7. For Table settings, select Use default settings.
  8. Choose Create.

The following table summarizes the GlueHistoryDDB table attributes that the Lambda function creates.

Column Type Description
database partition key The name of the AWS Glue database.
table_date sort key A composite attribute of AWS Glue table name plus date created. Because the same database and table name can be created again, the date must be used to ensure uniqueness.
created_by attribute The user or Amazon EC2 instance ARN from which the table was created.
owner attribute The owner of the table or account number.
purged attribute A boolean indicating whether the Amazon S3 objects have been deleted (True/False).
s3_path attribute The Amazon S3 path containing objects associated with the table.
table attribute The AWS Glue table name.
update_time attribute The last time the table was updated (the Amazon S3 path changed or objects purged).
view_sql attribute The view DDL if a view was created.

Creating the Lambda function autoCleanS3

A CloudWatch event triggers the Lambda function autoCleanS3 when a new table is created, updated, or dropped. If the --dropstore keyword is included in the Athena query comments, the associated Amazon S3 objects are also removed.

  1. On the Lambda console, choose Create function.
  2. Select Author from scratch.
  3. For Function name¸ enter autoCleanS3.
  4. For Runtime, choose Python 3.8.
  5. Under Permissions, for Execution role, select Use an existing role.
  6. Choose the role you created (service-role/autoCleanS3-LambdaRole).
  7. Choose Create function.
  8. Scroll down to the Function code section.
  9. If using Region us-west-2, on the Actions menu, choose Upload a file to Amazon S3.

  1. Enter the following:
    https://aws-bigdata-blog.s3.amazonaws.com/artifacts/aws-blog-keep-your-data-lake-clean-and-compliant-with-amazon-athena/autoCleanS3.zip

  2. Choose Save.

If using a Region other than us-west-2, download the Lambda .zip file locally. Then choose Upload a .zip file and choose the file from your computer to upload the Lambda function.

  1. In the Environment variables section, choose Edit.
  2. Choose Add environment variable.
  3. Enter the following key-values in the following table (customize as desired):
Key Value Purpose
Athena_SQL_Drop_Phrase --dropstore String to embed in Athena drop table queries to cause associated Amazon S3 objects to be removed
db_list

Comma-separated regex filter

<.*>

Allows you to limit which databases may contain tables that autoCleanS3 is allowed to purge
ddb_history_table GlueHistoryDDB The name of the AWS Glue history DynamoDB table
disable_s3_cleanup False If set to True, it disables the Amazon S3 purge, still recording attempts in the history table
log_level INFO Set to DEBUG to troubleshoot if needed

You must use a standard regex expression, which can be a simple comma-separated list of the AWS Glue databases that you want autoCleanS3 to evaluate.

 The following table shows example patterns for db_list.

Example Regex Pattern Result
.* Default, includes all databases
clickstream_web, orders_web, default Includes only clickstream_web, orders_web, default
.*_web Includes all databases having names ending in _web
.*stream.* Includes all databases containing stream in their name

For a complete list or supported patterns, see https://docs.python.org/3/library/re.html#re.Pattern.match

  1. Choose Save.

Creating EventBridge rules

You need to create EventBridge rules that invoke your Lambda function whenever Athena query events and AWS Glue CreateTable and UpdateTable events are generated.

Creating the Athena event rule

To create the Athena query event rule, complete the following steps:

  1. On the EventBridge console, choose Create rule.
  2. For Name, enter autoCleanS3-AthenaQueryEvent.
  3. For Description, enter Amazon Athena event for any query to trigger autoCleanS3.
  4. For Define pattern, choose Event pattern.
  5. For Event matching pattern, choose Custom pattern.
  6. For Event pattern, enter the following:
    {
    	"detail-type": [
    		"AWS API Call via CloudTrail"
    	],
    	"source": [
    		"aws.athena"
    	],
    	"detail": {
    		"eventName": [
    			"StartQueryExecution"
    		]
    	}
    }

  1. Choose Save.
  2. For Select targets, choose Lambda function.
  3. For Function¸ choose autoClean3.
  4. Choose Create.

Creating the AWS Glue event rule

To create the AWS Glue table event  rule, complete the following steps:

  1. On the EventBridge console, choose Create rule.
  2. For Name, enter autoCleanS3-GlueTableEvent.
  3. For Description, enter AWS Glue event for any table creation or update to trigger autoCleanS3.
  4. For Define pattern, choose Event pattern.
  5. For Event matching pattern, choose Custom pattern.
  6. For Event pattern, enter the following:
    {
    	"detail-type": [
    		"Glue Data Catalog Database State Change"
    	],
    	"source": [
    		"aws.glue"
    	],
    	"detail": {
    		"typeOfChange": [
    			"CreateTable",
    			"UpdateTable"
    		]
    	}
    }

  1. Choose Save.
  2. For Select targets, choose Lambda function.
  3. For Function¸ choose autoClean3.
  4. Choose Create.

You’re finished!

Testing the solution

Make sure you already have a data lake with tables defined in your AWS Glue Data Catalog and permission to access Athena. For this post, we use NYC taxi ride data. For more information, see Build a Data Lake Foundation with AWS Glue and Amazon S3.

  1. Create a new table using Athena CTAS.

Next, verify that the entry appears in the new GlueHistoryDDB table.

  1. On the DynamoDB console, open the GlueHistoryDDB table.
  2. Choose Items.
  3. Confirm the s3_path value for the table.

You can also view  the Amazon S3 table path and objects associated with the table.

  1. On the Amazon S3 console, navigate to the s3_path found in GlueHistoryDDB.
  2. Confirm the table and path containing the data folder and associated manifest and metadata objects.

  1. Drop the table using the keyword --dropstore.

  1. Check the Amazon S3 path to verify both the table folder and associated manifest and metadata files have been removed.

You can also see the purged attribute for the entry in GlueHistoryDDB is now set to True, and update_time has been updated, which you can use if you ever need to look back and understand when a purge event occurred.

Considerations

The Lambda timeout may need to be increased for very large tables, because the object deletion operations may not complete in time.

To prevent accidental data deletion, it’s recommended to carefully limit which databases may participate (Lambda environment variable db_list) and to enable versioning on the Athena bucket path and set up Amazon S3 lifecycle policies to eventually remove older versions. For more information, see Deleting object versions. 

Conclusion

In this post, we demonstrated how to automate the process of  deleting Amazon S3 objects associated with dropped AWS Glue tables. Deleting Amazon S3 objects that are no longer associated with an AWS Glue table reduces ongoing storage expense, management overhead, and unnecessary exposure of potentially private data no longer needed within the organization, allowing you to meet regulatory requirements.

This serverless solution monitors Athena and AWS Glue table creation and drop events via CloudWatch, and triggers Lambda to perform Amazon S3 object deletion. We use DynamoDB to store the audit history of all AWS Glue tables that have been dropped over time. It’s strongly recommended to enable Amazon S3 bucket versioning to prevent accidental data deletion.

To restore the Amazon S3 objects for the deleted table, you first identify the s3_path value for the relevant table entry in GlueHistoryDDB and either copy or remove the delete marker from objects in that path. For more information, see How do I undelete a deleted S3 object?


About the Author

David Roberts is a Senior Solutions Architect at AWS. His passion is building efficient and effective solutions on the cloud, especially involving analytics and data lake governance. Besides spending time with his wife and two daughters, he likes drumming and watching movies, and is an avid video gamer.

Auditing, inspecting, and visualizing Amazon Athena usage and cost

Post Syndicated from Kalyan Janaki original https://aws.amazon.com/blogs/big-data/auditing-inspecting-and-visualizing-amazon-athena-usage-and-cost/

Amazon Athena is an interactive query service that makes it easy to analyze data directly in Amazon Simple Storage Service (Amazon S3) using standard SQL. It’s a serverless platform with no need to set up or manage infrastructure. Athena scales automatically—running queries in parallel—so results are fast, even with large datasets and complex queries. You pay only for the queries that you run and the charges are based on the amount of data scanned by each query. You’re not charged for data definition language (DDL) statements like CREATE, ALTER, or DROP TABLE, statements for managing partitions, or failed queries. Cancelled queries are charged based on the amount of data scanned.

Typically, multiple users within an organization operate under different Athena workgroups and query various datasets in Amazon S3. In such cases, it’s beneficial to monitor Athena usage to ensure cost-effectiveness, avoid any performance bottlenecks, and adhere to security best practices. As a result, it’s desirable to have metrics that provide the following details:

  • Amount of data scanned by individual users
  • Amount of data scanned by different workgroups
  • Repetitive queries run by individual users
  • Slow-running queries
  • Most expensive queries

In this post, we provide a solution that collects detailed statistics of every query run in Athena and stores it in your data lake for auditing. We also demonstrate how to visualize audit data collected for a few key metrics (data scanned per workgroup and data scanned per user) using Amazon QuickSight.

Solution overview

The following diagram illustrates our solution architecture.

The solution consists of the following high-level steps:

  1. We use an Amazon CloudWatch Events rule with the following event pattern to trigger an AWS Lambda function for every Athena query run:
    {
    "detail-type": [
    "Athena Query State Change"
    ],
    "source": [
    "aws.athena"
    ],
    "detail": {
    "currentState": [
    "SUCCEEDED",
    "FAILED",
    "CANCELED"
    ]
    }
    }

  1. The Lambda function queries the Athena API to get details about the query and publishes them to Amazon Kinesis Data Firehose.
  2. Kinesis Data Firehose batches the data and writes it to Amazon S3.
  3. We use a second CloudWatch event rule with the following event pattern to trigger another Lambda function for every Athena query being run:
    {
    "detail-type": [
    "AWS API Call via CloudTrail"
    ],
    "source": [
    "aws.athena"
    ],
    "detail": {
    "eventName": [
    "StartQueryExecution"
    ]
    }
    }

  1. The Lambda function extracts AWS Identity and Access Management (IAM) user details from the query and publishes them to Kinesis Data Firehose.
  2. Kinesis Data Firehose batches the data and writes it to Amazon S3.
  3. We create and schedule an AWS Glue crawler to crawl the data written by Kinesis Data Firehose in the previous steps to create and update the Data Catalog tables. The following code is the table schemas the crawler creates:
    CREATE EXTERNAL TABLE `athena_query_details`(
      `query_execution_id` string, 
      `query` string, 
      `workgroup` string, 
      `state` string, 
      `data_scanned_bytes` bigint, 
      `execution_time_millis` int, 
      `planning_time_millis` int, 
      `queryqueue_time_millis` int, 
      `service_processing_time_millis` int)
    ROW FORMAT SERDE 
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      's3://<S3location>/athena-query-details/'
    TBLPROPERTIES (
      'parquet.compression'='SNAPPY')
    
    CREATE EXTERNAL TABLE `athena_user_access_details`(
      `query_execution_id` string, 
      `account` string, 
      `region` string, 
      `user_detail` string, 
      `source_ip` string, 
      `event_time` string)
    ROW FORMAT SERDE 
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      's3://<S3location>/athena-user-access-details/'
    TBLPROPERTIES (
      'parquet.compression'='SNAPPY')

  1. Athena stores a detailed history of the queries run in the last 30 days.
  2. The solution deploys a Lambda function that queries AWS CloudTrail to get list of Athena queries run in the last 30 days and publishes the details to the Kinesis Data Firehose streams created in the previous steps. The historical event processor function only needs to run one time after deploying the solution using the provided AWS CloudFormation
  3. We use the data available in the tables athena_query_details and athena_user_access_details in QuickSight to build insights and create visual dashboards.

Setting up your resources

Click on the Launch Stack button to deploy the solution in your account.

After you deploy the CloudFormation template, manually invoke the athena_historicalquery_events_processor Lambda function. This step processes the Athena queries that ran before deploying this solution; you only need to perform this step one time. 

Reporting using QuickSight dashboards

In this section, we cover the steps to create QuickSight dashboards based on the athena_query_details and athena_user_access_details Athena tables. The first step is to create a dataset with the athena_audit_db database, which the CloudFormation template created.

  1. On the QuickSight console, choose Datasets.
  2. Choose New dataset.

  1. For Create a Data Set, choose Athena.

 

  1. For Data source name, enter a name (for example, Athena_Audit).
  2. For Athena workgroup, keep at its default [primary].
  3. Choose Create data source.

  1. In the Choose your table section, choose athena_audit_db.
  2. Choose Use custom SQL.

  1. Enter a name for your custom SQL (for example, Custom-SQL-Athena-Audit).
  2. Enter the following SQL query:
    SELECT a.query_execution_id, a.query, a.state, a.data_scanned_bytes, a.execution_time_millis, b.user_detail
    FROM "athena_audit_db"."athena_query_details" AS a FULL JOIN  "athena_audit_db"."athena_user_access_details" AS b
    ON a.query_execution_id=b.query_execution_id
    WHERE CAST(a.data_scanned_bytes AS DECIMAL) > 0

  1. Choose Confirm query.

This query does a full join between the athena_query_details and athena_user_access_details tables.

  1. Select Import to SPICE for quicker analytics.
  2. Choose Edit/Preview data.

  1. Confirm that the data_scanned_bytes and execution_time_millis column data type is set to Decimal.
  2. Choose Save & visualize.

We’re now ready to create visuals in QuickSight. For this post, we create the following visuals:

  • Data scanned per query
  • Data scanned per user 

Data scanned per query

To configure the chart for our first visual, complete the following steps:

  1. For Visual types, choose Horizontal bar chart.
  2. For Y axis¸ choose query_execution_id.
  3. For Value, chose data_scanned_bytes.
  4. For Group/Color, choose query.

If you’re interested in determining the total runtime of the queries, you can use execution_time_milis instead of data_scanned_bytes.

The following screenshot shows our visualization.

 

If you hover over one of the rows in the chart, the pop-out detail shows the Athena query execution ID, Athena query that ran, and the data scanned by the query in bytes.

Data scanned per user

To configure the chart for our second visual, complete the following steps:

  1. For Visual types, choose Horizontal bar chart.
  2. For Y axis, choose query_execution_id
  3. For Value, choose data_scanned_bytes.
  4. For Group/Color, choose user_details.

You can use execution_time_millis instead of data_scanned_bytes if you’re interested in determining the total runtime of the queries.

The following screenshot shows our visualization.

If you hover over one of the rows in the chart, the pop-out detail shows the Athena query execution ID, the IAM principal that ran the query, and the data scanned by the query in bytes. 

Conclusion

This solution queries CloudTrail for Athena query activity in the last 30 days and uses CloudWatch rules to capture statistics for future Athena queries. Furthermore, the solution uses the GetQueryExecution API to provide details about the amount of data scanned, which provides information about per-query costs and how many queries are run per user. This enables you to further understand how your organization is using Athena.

You can further improve upon this architecture by using a partitioning strategy. Instead of writing all query metrics as .csv files to one S3 bucket folder, you can partition the data using year, month, and day partition keys. This allows you to query data by year, month, or day. For more information about partitioning strategies, see Partitioning Data.

For more information about improving Athena performance, see Top 10 Performance Tuning Tips for Amazon Athena.


About the Authors

Kalyan Janaki is Senior Technical Account Manager with AWS. Kalyan enjoys working with customers and helping them migrate their workloads to the cloud. In his spare time, he tries to keep up with his 2-year-old.

 

 

 

Kapil Shardha is an AWS Solutions Architect and supports enterprise customers with their AWS adoption. He has background in infrastructure automation and DevOps.

 

 

 

Aravind Singirikonda is a Solutions Architect at Amazon Web Services. He works with AWS Enterprise customers to provide guidance and technical assistance, helping them improve the value of their solutions when using AWS.

Best practices for consuming Amazon Kinesis Data Streams using AWS Lambda

Post Syndicated from Dylan Qu original https://aws.amazon.com/blogs/big-data/best-practices-for-consuming-amazon-kinesis-data-streams-using-aws-lambda/

Many organizations are processing and analyzing clickstream data in real time from customer-facing applications to look for new business opportunities and identify security incidents in real time. A common practice is to consolidate and enrich logs from applications and servers in real time to proactively identify and resolve failure scenarios and significantly reduce application downtime. Internet of things (IOT) is also driving more adoption for real-time data processing. For example, a connected factory, connected cars, and smart spaces enable seamless sharing of information between people, machines, and sensors.

To help ingest real-time data or streaming data at large scales, you can use Amazon Kinesis Data Streams. Kinesis Data Streams can continuously capture gigabytes of data per second from hundreds of thousands of sources. The data collected is available in milliseconds, enabling real-time analytics. You can use an AWS Lambda function to process records in a Kinesis data stream.

This post discusses common use cases for Lambda stream processing and describes how to optimize the integration between Kinesis Data Streams and Lambda at high throughput with low system overhead and processing latencies.

Using Lambda to process a Kinesis data stream

Before diving into best practices, we discuss good use cases for Lambda stream processing and anti-patterns.

When to use Lambda for Kinesis data stream processing

Lambda integrates natively with Kinesis Data Streams. The polling, checkpointing, and error handling complexities are abstracted when you use this native integration. This allows the Lambda function code to focus on business logic processing. For example, one application can take in IP addresses from the streaming records and enrich them with geographic fields. Another application can take in all system logs from the stream and filter out non-critical ones. Another common use case is to take in text-based system logs and transform them into JSON format.

One key pattern the previous examples share is that the transformation works on a per-record basis. You can still receive batches of records, but the transformation of the records happens individually.

When not to use Lambda for Kinesis data stream processing

By default, Lambda invocates one instance per Kinesis shard. Lambda invokes your function as soon as it has gathered a full batch, or until the batch window expires, as shown in the following diagram.

This means each Lambda invocation only holds records from one shard, so each Lambda invocation is ephemeral and there can be arbitrarily small batch windows for any invocation. Therefore, the following use cases are challenging for Lambda stream processing:

  • Correlation of events of different shards
  • Stateful stream processing, such as windowed aggregations
  • Buffering large volumes of streaming data before writing elsewhere

For the first two use cases, consider using Amazon Kinesis Data Analytics. Kinesis Data Analytics allows you to transform and analyze streaming data in real time. You can build sophisticated streaming applications with Apache Flink. Apache Flink is an open-source framework and engine for processing data streams. Kinesis Data Analytics takes care of everything required to run streaming applications continuously, and scales automatically to match the volume and throughput of your incoming data.

For the third use case, consider using Amazon Kinesis Data Firehose. Kinesis Data Firehose is the easiest way to reliably load streaming data into data lakes, data stores, and analytics services. It can capture, transform, and deliver streaming data to Amazon Simple Storage Service (Amazon S3), Amazon Redshift, Amazon Elasticsearch Service (Amazon ES), generic HTTP endpoints, and service providers like Datadog, New Relic, MongoDB, and Splunk. Kinesis Data Firehose enables you to transform your data with Lambda before it’s loaded to data stores.

Developing a Lambda consumer with shared throughput or dedicated throughput

You can use Lambda in two different ways to consume data stream records: you can map a Lambda function to a shared-throughput consumer (standard iterator), or to a dedicated-throughput consumer with enhanced fan-out (EFO).

For standard iterators, Lambda service polls each shard in your stream one time per second for records using HTTP protocol. By default, Lambda invokes your function as soon as records are available in the stream. The invocated instances shares read throughput with other consumers of the shard. Each shard in a data stream provides 2 MB/second of read throughput. You can increase stream throughput by adding more shards. When it comes to latency, the Kinesis Data Streams GetRecords API has a five reads per second per shard limit. This means you can achieve 200-millisecond data retrieval latency for one consumer. With more consumer applications, propagation delay increases. For example, with five consumer applications, each can only retrieve records one time per second and each can retrieve less than 400 Kbps.

To minimize latency and maximize read throughput, you can create a data stream consumer with enhanced fan-out. An EFO consumer gets an isolated connection to the stream that provides a 2 MB/second outbound throughput. It doesn’t impact other applications reading from the stream. Stream consumers use HTTP/2 to push records to Lambda over a long-lived connection. Records can be delivered from producers to consumers in 70 milliseconds or better (a 65% improvement) in typical scenarios.

When to use shared throughput vs. dedicated throughput (EFO)

It’s advisable to use standard consumers when there are fewer (less than three) consuming applications and your use cases aren’t sensitive to latency. EFO is better for use cases that require low latency (70 milliseconds or better) for message delivery to consumer; this is achieved by automatic provisioning of an EFO pipe per consumer, which guarantees low latency irrespective of the number of consumers linked to the shard. EFO has cost dimensions associated with it; there is additional hourly charge per EFO consumer and charge for per GB of EFO data retrievals cost.

Monitoring ongoing stream processing

Kinesis Data Streams and Amazon CloudWatch are integrated so you can collect, view, and analyze CloudWatch metrics for your streaming application. It’s a best practice to make monitoring a priority to head off small problems before they become big ones. In this section, we discuss some key metrics to monitor.

Enhanced shard-level metrics

It’s a best practice to enable shard-level metrics with Kinesis Data Streams. As the name suggests, Kinesis Data Streams sends additional shard-level metrics to CloudWatch every minute. This can help you pinpoint failing consumers for a specific record or shards and identify hot shards. Enhanced shard-level metrics comes with additional cost. For information about pricing, see Amazon CloudWatch pricing.

IteratorAge

Make sure you keep a close eye on the IteratorAge (GetRecords.IteratorAgeMilliseconds) metric. Age is the difference between the current time and when the last record of the GetRecords call was written to the stream. If this value spikes, data processing from the stream is delayed. If the iterator age gets beyond your retention period, the expired records are permanently lost. Use CloudWatch alarms on the Maximum statistic to alert you before this loss is a risk.

The following screenshot shows a visualization of GetRecords.IteratorAgeMilliseconds.

In a single-source, multiple-consumer use case, each Lambda consumer reports its own IteratorAge metric. This helps identify the problematic consumer for further analysis.

You can find common causes and resolutions later in this post.

ReadProvisionedThroughputExceeded

The ReadProvisionedThroughputExceeded metric shows the count of GetRecords calls that have been throttled during a given time period. Use this metric to determine if your reads are being throttled due to exceeding your read throughput limits. If the Average statistic has a value other than 0, some of your consumers are throttled. You can add shards to the stream to increase throughput or use an EFO consumer to trigger your Lambda function.

Being aware of poison messages

A Lambda function is invoked for a batch of records from a shard and it checkpoints upon the success of each batch, so either a batch is processed successfully or entire batch is retried until processing is successful or records fall off the stream based on retention period. A poison message causes the failure of a batch process. It can create two possible scenarios: duplicates in the results, or delayed data processing and loss of data.

The following diagram illustrates when a poison message causes duplicates in the results. If there are 300 records in the data stream and batch size is 200, the Lambda instance is invoked to process the first 200 records. If processing fails at the eighty-third record, the entire batch is tried again, which can cause duplicates in the target for first 82 records depending on the target application.

The following diagram illustrates the problem of delayed data processing and data loss. If there are 300 records in the data stream and the batch size is 200, a Lambda instance is invoked to process the first 200 records until these records expire. This causes these records to be lost, and processing data in the queue is delayed significantly.

 

Addressing poison messages

There are two ways to handle failures gracefully. The first option is to implement logic in the Lambda function code to catch exceptions and log for offline analysis and return success to process the next batch. Exceptions can be logged to Amazon Simple Queue Service (Amazon SQS), CloudWatch Logs, Amazon S3, or other services.

 

The second (and recommended) option is to configure the following retry and failure behaviors settings with Lambda as the consumer for Kinesis Data Streams:

  • On-failure destination – Automatically send records to an SQS queue or Amazon Simple Notification Service (Amazon SNS) topic
  • Retry attempts – Control the maximum retries per batch
  • Maximum age of record – Control the maximum age of records to process
  • Split batch on error – Split every retry batch size to a narrow batch size that is retried to automatically home in on poison messages

Optimizing for performance

In this section, we discuss common causes for Lambda not being able to keep up with Kinesis Data Streams and how to fix it.

Lambda is hitting concurrency limit

Lambda has reached the maximum number of parallel runs within the account, which means that Lambda can’t instantiate additional instances of the function. To identify this, set up CloudWatch alarms on the Throttles metrics exposed by the function. To resolve this issue, consider assigning reserved concurrency to a particular function.

Lambda is throttled on egress throughput of a data stream

This can happen if there are more consumers for a data stream and not enough read provisioned throughput available. To identify this, monitor the ReadProvisionedThroughputExceeded metric and set up a CloudWatch alarm. One or more of the following options can help resolve this issue:

  • Add more shards and scale the data stream
  • Reduce the batch window to process messages more frequently
  • Use a consumer with enhanced fan-out 

Business logic in Lambda is taking too long

To address this issue, consider increasing memory assigned to the function or add shards to the data stream to increase parallelism.

Another approach is to enable concurrent Lambda invocations by configuring Parallelization Factor, a feature that allows more than one simultaneous Lambda invocation per shard. Lambda can process up to 10 batches in each shard simultaneously. Each parallelized batch contains messages with the same partition key. This means the record processing order is still maintained at the partition-key level. The following diagram illustrates this architecture.

For more information, see New AWS Lambda scaling controls for Kinesis and DynamoDB event sources.

Optimizing for cost

Kinesis Data Stream has the following cost components:

  • Shard hours
  • PUT payload units (charged for 25 KB per PUT into a data stream)
  • Extended data retention
  • Enhanced fan-out

One of the key components you can optimize is PUT payload limits. As mentioned earlier, you’re charged for each event you put in a data stream in 25 KB increments, so if you’re sending small messages, it’s advisable to aggregate messages to optimize cost. One of the ways to aggregate multiple small records into a large record is to use Kinesis Producer Library (KPL) aggregation.

The following is an example of a use case with and without record aggregation:

  • Without aggregation:
    • 1,000 records per second, with record size of 512 bytes each
    • Cost is $47.74 per month in us-east-1 Region (with $36.79 PUT payload units)
  • With aggregation:
    • 10 records per second, with records size of 50 kb each
    • Cost is $11.69 per month in us-east-1 Region (with $0.74 PUT payload units)

Another component to optimize is to increase batch windows, which fine-tunes Lambda invocation for cost-optimization.

Conclusion

In this post, we covered the following aspects of Kinesis Data Streams processing with Lambda:

  • Suitable use cases for Lambda stream processing
  • Shared throughput consumers vs. dedicated-throughput consumers (enhanced fan-out)
  • Monitoring
  • Error handling
  • Performance tuning
  • Cost-optimization

To learn more about Amazon Kinesis, see Getting Started with Amazon Kinesis. If you have questions or suggestions, please leave a comment.


About the Authors

Dylan Qu is an AWS solutions architect responsible for providing architectural guidance across the full AWS stack with a focus on Data Analytics, AI/ML and DevOps.

 

 

 

Vishwa Gupta is a Data and ML Engineer with AWS Professional Services Intelligence Practice. He helps customers implement big data and analytics solutions. Outside of work, he enjoys spending time with family, traveling, and playing badminton.

Optimizing Spark applications with workload partitioning in AWS Glue

Post Syndicated from Avijit Goswami original https://aws.amazon.com/blogs/big-data/optimizing-spark-applications-with-workload-partitioning-in-aws-glue/

AWS Glue provides a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs. This posts discusses a new AWS Glue Spark runtime optimization that helps developers of Apache Spark applications and ETL jobs, big data architects, data engineers, and business analysts scale their data processing and batch jobs running on AWS Glue automatically.

Customers use Spark for a wide variety of ETL and analytics workloads on datasets with diverse characteristics. They want to ensure fast and error-free execution of these workloads. Errors in Spark applications commonly arise from inefficient Spark scripts, distributed in-memory execution of large-scale transformations, and dataset abnormalities. Spark’s distributed execution uses a Master/Slave architecture with driver and executor processes perform parallel computation over partitions of input dataset. Inspite of this data-parallel architecture, Spark applications commonly run into out-of-memory (OOM) exceptions on driver and executors due to skew in input data, large number of input files, or large joins and shuffle operations.

In this blog post, we introduce a new Spark runtime optimization on Glue – Workload/Input Partitioning for data lakes built on Amazon S3. Customers on Glue have been able to automatically track the files and partitions processed in a Spark application using Glue job bookmarks. Now, this feature gives them another simple yet powerful construct to bound the execution of their Spark applications. Bounded execution allows customers to partition their workloads by limiting the maximum number of files or dataset size processed incrementally within Glue Spark applications that can be orchestrated sequentially or in parallel.

Specifically, this feature makes it easy for customers to make their complex ETL pipelines significantly more resilient to errors. This is achieved by breaking down the monolithic Spark applications processing a large backlog of tens to hundreds of millions of files into simpler modular Spark applications that can process a bounded number of files or dataset size incrementally.

This Spark runtime optimization also works together with existing Glue features such as push down predicates, AWS Glue S3 lister, grouping, exclusions for S3 paths, and other optimizations .

Setup and Use Cases

One of the common use cases of data warehousing is processing a large number of records from a fact table (employees, sales or items) and joining the same with multiple dimension tables (departments, stores, catalog), and loading the output to the final destination. The following diagram illustrates an ETL architecture used commonly by several customers.

 

ETL pipelines using Apache Spark applications for this use case or similar backlog ingestion can encounter 3 common errors. First, the Spark driver can run out-of-memory while listing millions of files in S3 for the fact table. Second, the Spark executors can run out-of-memory if there is skew in the dataset resulting in imbalanced shuffles or join operations across the different partitions of the fact table. Third, any data abnormality or malformed records can cause the Spark application to fail during any of the three stages – read from S3, application of join transform, or write to S3. In this blog post, we would show how workload partitioning can help you mitigate these errors by bounding the execution of the Spark application, and also detect abnormalities or skews in your data.

Our setup uses a fact table consisting of employee badge access data stored in S3 with 1.34 million objects and files, and a record count of 1.3 billion. This dataset is joined with two other datasets (dimension tables – employee and badge data), which are smaller in size, one with 107 records and another with a record count of 12,249 in 10 files. We use native Spark 2.4 and Python 3. We will monitor the memory profile of Spark driver and executors over time. We find that both the Spark driver and executors get prone to OOM exceptions. We would use the AWS Glue Workload Partitioning feature to show how we can automatically mitigate those errors automatically with minimal changes to the Spark application.

We enable AWS Glue job bookmarks with the use of AWS Glue Dynamic Frames as it helps to incrementally load unprocessed data from S3. Vanilla Spark applications using Spark Dataframes do not support Glue job bookmarks and therefore can not incrementally load data out-of-the-box. We find that Spark applications using both Glue Dynamic Frames and Spark Dataframes can run into the above 3 error scenarios while loading tables with large number of input files or distributed transformations such as join resulting in large shuffles. Following is the code snippet of the Spark application used for our setup.

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
## @params: [JOB_NAME]
## args = getResolvedOptions(sys.argv, ['JOB_NAME', 'year_partition_key'])
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
## @type: DataSource
## @inputs: []
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "spark-oom-test", table_name = "oom_test_3_agoswami0915_glue_spark_oom_1", transformation_ctx = "datasource0")
##datasource0 schema : |-- BadgeID|-- EmployeeID|-- Date-Month|-- Date-Day|-- Date-Year|-- Hours_Logged|-- partition_2|-- partition_1|-- partition_3|-- partition_0
datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "employee-productivity-database", table_name = "lake-formation-workshop_hr_employees", transformation_ctx = "datasource0")
##datasource1 schema: |-- job_id|-- employee_id|-- salary|-- hire_date|-- department_id|-- last_name|-- email|-- phone_number|-- first_name|-- manager_id|-- commission_pct
datasource2 = glueContext.create_dynamic_frame.from_catalog(database = "employee-productivity-database", table_name = "dynamodb", transformation_ctx = "datasource2")
##datasource2 schema:|-- col_dateyear|— col_dateday|-- employeeid|-- badgeid|-- hours_logged|-- col_datemonth
## ApplyMappings to check and convert the data types to avoid type mismatch during join operation
datasource_0 = ApplyMapping.apply(frame = datasource0, mappings = [("badgeid", "string", "badgeid", "string"), ("employeeid", "long", "employeeid", "long"), ("date-month", "string", "date-month", "string"), ("date-day", "int", "date-day", "int"), ("date-year", "int", "date-year", "int"), ("hours_logged", "int", "hours_logged", "int"), ("partition_0", "string", "partition_0", "string"), ("partition_1", "string", "partition_1", "string"), ("partition_2", "string", "partition_2", "string"), ("partition_3", "string", "partition_3", "string")], transformation_ctx = "applymapping1")
datasource_1 = ApplyMapping.apply(frame = datasource1, mappings = [("job_id", "string", "job_id", "string"), ("employee_id", "int", "employee_id", "int"), ("salary", "double", "salary", "double"), ("hire_date", "string", "hire_date", "string"), ("department_id", "long", "department_id", "long"), ("last_name", "string", "last_name", "string"), ("email", "string", "email", "string"), ("phone_number", "string", "phone_number", "string"), ("first_name", "string", "first_name", "string"), ("commission_pct", "double", "commission_pct", "double"), ("manager_id", "long", "manager_id", "long")], transformation_ctx = "applymapping1")
datasource_2 = ApplyMapping.apply(frame = datasource2, mappings = [("col_dateyear", "int", "col_dateyear", "int"), ("col_dateday", "int", "col_dateday", "int"), ("employeeid", "int", "employeeid", "int"), ("badgeid", "string", "badgeid", "string"), ("hours_logged", "int", "hours_logged", "int"), ("col_datemonth", "string", "col_datemonth", "string")], transformation_ctx = "applymapping1")
## Apply Join and drop fields that we don't need in target dataset
datasource3 = Join.apply(datasource_0, Join.apply(datasource_1, datasource_2, 'employee_id', 'employeeid'), 'badgeid','badgeid').drop_fields(['job_id', 'employee_id', 'salary', 'hire_date', 'department_id', 'last_name', 'email', 'phone_number', 'first_name', 'commission_pct', 'manager_id', 'col_dateyear', 'col_dateday',  'col_datemonth',  'partition_2', 'partition_1', 'partition_3', 'partition_0'])
## @type: ApplyMapping
## @return: applymapping1
## @inputs: [frame = datasource3]
applymapping1 = ApplyMapping.apply(frame = datasource3, mappings = [("badgeid", "decimal(19,0)", "badgeid", "decimal(19,0)"), ("employeeid", "long", "employeeid", "long"), ("date-month", "string", "date-month", "string"), ("date-day", "int", "date-day", "int"), ("date-year", "int", "date-year", "int"), ("hours_logged", "int", "hours_logged", "int")], transformation_ctx = "applymapping1")
## @type: DataSink
## @args: [connection_type = "s3", connection_options = {"path": "s3://agoswami0915-spark-oom-test-oct12/target-tablle"}, format = "json", transformation_ctx = "datasink2"]
## @return: datasink2
## @inputs: [frame = applymapping1]
datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://agoswami0915-spark-oom-test-oct12/target-tablle"}, format = "json", transformation_ctx = "datasink2")
job.commit()

We have used AWS Glue crawlers to infer the schema of the datasets and create the AWS Glue Data Catalog objects referred in the Spark application. The sample Spark code creates DynamicFrames for each dataset in an S3 bucket, joins the three DynamicFrames, and writes the transformed data to a target location in an S3 bucket.

Spark application without bounded execution

When we ran the Spark application to join three datasets with their common keys, it ran for about 4 hours to read and iterate over the large dataset. It eventually failed with a Spark driver OOM error:

Exception in thread "spark-listener-group-appStatus" 
java.lang.OutOfMemoryError: Java heap space

When checking the memory profile of the driver and executors (see the following graph) using Glue job metrics, it’s apparent that the driver memory utilization gradually increases over the 50% threshold as it reads data from a large data source, and finally goes out of memory while trying to join with the two smaller datasets.

Rerunning the Spark application with bounded execution

To overcome this Spark driver OOM, we modified the previous code to use workload partitioning by simply including the boundedFiles parameter as an additional_options (see the following code). In this changed code, we used the job to process 100,000 files from datasource0. Bounded execution works in conjunction with job bookmarks. Job bookmarks tracks processed files and partitions based on timestamp and path hashes. In addition, bounded execution applies filters to track files and partitions with a specified bound on the number of files or the dataset size.

datasource0 = glueContext.create_dynamic_frame.from_catalog(database =
"spark-oom-test", table_name =
"oom_test_3_agoswami0915_glue_spark_oom_1", transformation_ctx =
"datasource0", additional_options = {"boundedFiles" : "100000"})

After this change, the driver memory utilization stayed consistently low, with a peak utilization of about 26%, as seen in the following graph (blue line). However, the job encountered heavy memory usage by the executors during the join operations resulting from the shuffle (different colored lines showing high executor memory usage). This caused the job to eventually fail after four retries with an executor OOM.

Detecting OOM issues: Data skews and straggler tasks

In many cases, customer’s Spark jobs can run for hours before finally failing with errors. Instead of waiting for the jobs to fail after running for long hours and then analyze the root cause, we can check the job progress using Glue’s job metrics available through Amazon CloudWatch, or the Spark UI to identify straggler tasks that could potentially cause failures.

With Spark UI, we examined the Spark execution timeline and found that some of the executors are straggling with long-running tasks, resulting in eventual failures of those executors (Executor IDs 19, 11, 6, and 22 in the following event timeline graph)

Looking into the executor summary details, it was evident that these four executors contributed to many failed tasks during the job.

Diving deep into the executors revealed that the tasks are straggling during the shuffle phase, taking the longest runtime, and contributing to most of the job runtime. The following event timeline shows a consistent pattern of failures for all four executors performing straggler tasks that started with Executor 19.

In this scenario, the job ran for more than 10 hours before finally failing due to an executor OOM. Looking into the trend of the job from Spark UI or memory profiles from CloudWatch shows that executors in this job were involved in straggler tasks and this job was potentially on a path to failure. Instead of waiting for the job to run for hours and waste valuable resources, the job can be cancelled after looking at these trends after Executor 19 failed or automatically after a job-level timeout.

The first failed stage from the Spark UI shows Executor 19 was involved in many failed tasks and finally timed out and was replaced by another executor by the Spark driver.

Finally, investigating the details of the final stage of the job that failed showed that Executor 22, like the other three executors (19,11, and 6), was involved in straggler tasks during the shuffle phase and eventually failed with an OOM error.

Rerunning the job with a tighter bound

Now, we chang the boundedFiles parameter value to process 50,000 files:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database =
"spark-oom-test", table_name = 
"oom_test_3_agoswami0915_glue_spark_oom_1", transformation_ctx = 
"datasource0", additional_options = {"boundedFiles" : "50000"})

The job ran successfully without any driver or executor memory issues.

Considering that each input file is about 1 MB size in our use case, we concluded that we can process about 50 GB of data from the fact dataset and join the same with two other datasets that have 10 additional files.

You can further convert AWS Glue DynamicFrames to Spark DataFrames and also use additional Spark transformations.

Running jobs in parallel on different partitions with tighter bounds

In production scenarios, data engineering pipelines generally have strict SLAs to complete data processing with ETL. For example, if we need to complete our job in 1.5 hours and process 50,000 files from the input dataset, the previous job would miss the SLA easily because the job takes more than 2 hours to complete. Another scenario could be if we have to process 100,000 input files, which might take more than 4 hours to finish if we run the same job sequentially, with each run processing 50,000 files with bounded execution.

To address these issues, we can optimize the pipeline by creating multiple copies of the job. We can use Glue’s push down predicates to process a subset of the data from different S3 partitions with bounded execution. In the following code, we create two copies of the same job that we ran earlier, but with the same boundedFiles parameter for both jobs to process 50,000 files. In one of the jobs, we pass a push down predicate with an even number as the partition value. In the other job, we process odd numbered partition values.

The following code shows the job with an even partition value:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "spark-oom-
test", table_name = "oom_test_3_agoswami0915_glue_spark_oom_1", transformation_ctx 
= "datasource0", push_down_predicate = "(partition_0 == '2020')", 
additional_options = {"boundedFiles" : "50000"})

The following code shows the job with an odd partition value:

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "spark-oom-
test", table_name = "oom_test_3_agoswami0915_glue_spark_oom_1", transformation_ctx 
= "datasource0", push_down_predicate = "(partition_0 == '2019')", 
additional_options = {"boundedFiles" : "50000"})

On the AWS Glue console, we can create an AWS Glue Workflow to run both jobs in parallel. Because our input files have unique keys, even when running the jobs in parallel, the output doesn’t have any duplicates. If the input data can have duplicate keys, but the downstream application expects only unique records, we need to create a successor data deduplication job in the workflow to meet the business requirement. The following screenshot shows our workflow running both jobs in parallel.

After running the workflow, we can go to the AWS Glue console and CloudWatch page to check the progress of the jobs triggered by the workflow.

We find that both jobs started and ended at the same time (within 2 hours), and were triggered by the same workflow trigger, bounded-exec-parallel-run-1. Both of them had safe Spark driver and executor memory usage throughout the job execution.

Conclusion

AWS Glue effectively manages Spark memory while running Spark applications. The workload partitioning feature provides the ability to bound execution of Spark applications and effectively improve the reliability of ETL pipelines susceptible to encounter errors arising due to large input sources, large-scale transformations, and data skews or abnormalities. Combining this feature with other optimization mechanisms, including push down predicates, can help avoid these issues and meet data pipeline SLAs for your ETL jobs.


About the Authors

Avijit Goswami is a Principal Solutions Architect at AWS, helping startup customers become tomorrow’s enterprises using AWS services. He is part of the Analytics Specialist community at AWS. When not at work, Avijit likes to cook, travel, hike, watch sports, and listen to music.

 

 

Xiaorun Yu is a Software Development Engineer at AWS Glue who works on Glue Spark runtime. When not at work, Xiaorun enjoys hiking around the Bay Area and trying local restaurants.

 

 

 

Mohit Saxena is a Technical Lead Manager at AWS Glue. His team works on Glue’s Spark runtime to enable new customer use cases for efficiently managing data lakes on AWS and optimize Apache Spark for performance and reliability.

Orchestrating analytics jobs by running Amazon EMR Notebooks programmatically

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/orchestrating-analytics-jobs-by-running-amazon-emr-notebooks-programmatically/

Amazon EMR is a big data service offered by AWS to run Apache Spark and other open-source applications on AWS in a cost-effective manner. Amazon EMR Notebooks is a managed environment based on Jupyter Notebook that allows data scientists, analysts, and developers to prepare and visualize data, collaborate with peers, build applications, and perform interactive analysis using EMR clusters.

EMR notebook APIs are available on Amazon EMR release version 5.18.0 or later and can be used to run EMR notebooks via a script or command line. The ability to start, stop, list, and describe EMR notebook runs without the Amazon EMR console enables you to programmatically control running an EMR notebook. Using a parameterized notebook cell allows you to pass different parameter values to a notebook without having to create a copy of the notebook for each new set of parameter values. With this feature, you can schedule running EMR notebooks with cron scripts, chain multiple EMR notebooks, and use orchestration services such as AWS Step Functions or Apache Airflow to build pipelines. If you want to use EMR notebooks in a non-interactive manner, this enables you to run ETL workloads, especially in production.

In this post, we show how to orchestrate analytics jobs by running EMR Notebooks programmatically with the following two use cases:

For our data source, we use the open-source, real-time COVID-19 US daily case reports provided by Johns Hopkins University CSSE in the following GitHub repo.

Prerequisites

Before getting started, you must have the following prerequisites:

Record the notebook ID (for example, <e-*************************>); you use this later for our examples later. Organize the notebook files in the Jupyter UI as follows:

  • /demo_pyspark.ipynb
  • /experiment/trailing_N_day.ipynb

See Creating a Notebook for more information on how to create an EMR notebook.

Use case 1: Scheduling an EMR notebook to run via crontab and the AWS CLI

We use demo_pyspark.ipynb as the input notebook file, as mentioned in the prerequisites. In this use case, we use the AWS CLI to call the EMR Notebooks Execution API to run a notebook using some parameters that we pass in. We then download the notebook output and visualize it using the local Jupyter server.

First, we use the AWS CLI to run an example notebook using the EMR Notebooks Execution API.

demo_pyspark.ipynb is a Python script. The following parameters are defined in the first cell:

  • DATE – The date used when the notebook job is started.
  • TOP_K – The top k US states with confirmed COVID-19 cases. We use this to plot Graph
  • US_STATES – The names of the specific US states being checked for the fatality rates of COVID-19 patients. We use this plot Graph b.

Running this notebook plots two graphs:

  • Graph a – Visualizes the top k US states with most COVID-19 cases on a given date
  • Graph b – Visualizes the fatality rates among specific US states on a given date

The parameters in the first cell can be passed to the EMR Notebooks StartNotebookExecution API, which you can call via the AWS CLI or SDK. The following code is an example of the EMR notebook first cell, containing parameters with corresponding values in JSON format. It means the notebook uses the date 10-13-2020. For Graph a, we visualize the top five US states with confirmed COVID-19 cases on October 13, 2020. For Graph b, we visualize the fatality rates of COVID-19 patients in Alabama, California, and Arizona on October 13, 2020. See the following code:

{"DATE": "10-13-2020",
 "TOP_K": 5,
"US_STATES": ["Alabama", "California", "Arizona"]}

For this example, the parameters can be any of the Python Data Types.

Run the notebook using the following new set of parameters:

{"DATE": "10-15-2020",
 "TOP_K": 6,
"US_STATES": ["Wisconsin", "Texas", "Nevada"]}

Running an EMR notebook with the AWS CLI

Run the following command (replace <e-*************************> with the ID of the EMR notebook and <j-*************> with the EMR cluster ID as mentioned in the prerequisites):

% aws emr --region us-west-2 start-notebook-execution \
--editor-id <e-*************************> \
--notebook-params '{"DATE":"10-15-2020", "TOP_K": 6, "US_STATES": ["Wisconsin", "Texas", "Nevada"]}' \
--relative-path demo_pyspark.ipynb \
--notebook-execution-name demo \
--execution-engine '{"Id" : "<j-*************>"}' \
--service-role EMR_Notebooks_DefaultRole

The start-notebook-execution command returns an output similar to the following JSON document:

{
 "NotebookExecutionId": "ex-*****************************"
}

Record the value of NotebookExecutionId; you use in the next step.

Running the describe-notebook-execution command

Run the following command (replace <ex-*****************************> with the value of NotebookExecutionId from the previous step):

% aws emr --region us-west-2 describe-notebook-execution \
--notebook-execution-id <ex-*****************************>

The describe-notebook-execution command returns an output similar to the following JSON document:

{
  "NotebookExecution": {
    "NotebookExecutionId": "ex-*****************************",
    "EditorId": "e-*************************",
    "ExecutionEngine": {
      "Id": "<j-*************>",
      "Type": "EMR",
      "MasterInstanceSecurityGroupId": "sg-********"
    },
    "NotebookExecutionName": "demo",
    "NotebookParams": "{\"DATE\":\"10-15-2020\", \"TOP_K\": 6, \"US_STATES\": [\"Wisconsin\", \"Texas\", \"Nevada\"]}",
    "Status": "FINISHED",
    "StartTime": "2020-10-18T19:46:01.125000-07:00",
    "EndTime": "2020-10-18T19:47:24.014000-07:00",
    "Arn": "arn:aws:elasticmapreduce:us-west-2:123456789012:notebook-execution/ex-*****************************",
    "OutputNotebookURI": "s3://<notebook_bucket_location>/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb",
    "LastStateChangeReason": "Execution is finished for cluster j-*************.",
    "NotebookInstanceSecurityGroupId": "sg-********",
    "Tags": []
  }
}

You can pass different parameter values to the same notebook without having to create a copy of the notebook for each new set of parameter values or log in to the Jupyter Notebooks UI via the Amazon EMR console.

Downloading the output file and visualizing the output with a local Jupyter server

EMR notebooks use Papermill to run the notebook. When it runs, a new notebook file is created with input parameters so as not to overwrite the existing file. The notebook is then started, and the output notebook can be found in s3://<Notebook bucket location>/<editor id>/executions/<Execution id>/<input file name>.

We run the following s3 cp command to download the EMR notebook output file to a local directory (replace <notebook_bucket_location> with the S3 location specified for the notebook during creation, <e-*************************> with the EMR Notebook ID, and <ex-*****************************> with the value of NotebookExecutionId from the previous step):

% aws s3 cp s3://<notebook_bucket_location>/<e-*************************>/executions/<ex-*****************************>/demo_pyspark.ipynb

In the same directory where we downloaded the EMR notebook output file, run the following command to start a local Jupyter server:

% jupyter lab

The URL http://localhost:8888/lab automatically opens in your web browser, as shown in the following screenshot.

Choose demo_pyspark.ipynb to view the output file. In the output, it plots two graphs. Graph a shows the top six US states with confirmed COVID-19 cases on a given date.

Graph b shows the fatality rates of COVID-19 patients in Texas, Wisconsin, and Nevada on a given date.

Scheduling to run a notebook daily using crontab

We have completed running the EMR notebook using the AWS CLI. Now, we demonstrate how to schedule running a notebook daily using crontab. We use the same notebook input file with the same parameters as the previous example. On a daily basis, it generates Graph a with the top six US states with confirmed COVID-19 cases, and Graph b with the fatality rates of COVID-19 patients in Texas, Wisconsin, and Nevada.

We start by creating a bash script named run_notebook_daily.sh. The script starts an EMR notebook, waits for the notebook to either finish running or fail, and copies the output file to the local directory ~/daily_reports/.

The following code is the content of run_notebook_daily.sh (replace <e-*************************> with the ID of EMR Notebook and <j-*************> with the EMR cluster ID):

# Generate a report for day before yesterday
day_before_yesterday=`date -v-2d +'%m-%d-%Y'`

# Start an execution
execution_id=`aws emr start-notebook-execution \
--editor-id <e-*****************************> \
--notebook-params '{"DATE":"'"$day_before_yesterday"'", "TOP_K": 6, "US_STATES": ["Wisconsin", "Texas", "Nevada"]}' \
--relative-path demo_pyspark.ipynb \
--notebook-execution-name demo \
--execution-engine '{"Id" : "<j-*********">}' \
--service-role EMR_Notebooks_DefaultRole | jq -r .'NotebookExecutionId'`

echo "Started an execution for the date $day_before_yesterday. Execution id: $execution_id"

# Poll for execution to finish
while
    execution_status=`aws emr describe-notebook-execution --notebook-execution-id $execution_id | jq -r .'NotebookExecution.Status'`
    echo "Execution Status: $execution_status"
    
    if [ $execution_status == "FINISHED" ] || [ $execution_status == "FAILED" ]; then
        # Copy the output file to local directory
        output_file=`aws emr describe-notebook-execution --notebook-execution-id $execution_id | jq -r .'NotebookExecution.OutputNotebookURI'`
        mkdir -p daily_reports
        aws s3 cp "$output_file" daily_reports/
       break
    fi
    sleep 15s
do true; done

Next, we add this script to a crontab to run our EMR notebook job daily at 9:00 AM:

% crontab
0 9 * * * bash /folder/path/run_notebook_daily.sh >/tmp/stdout.log 2>/tmp/stderr.log

This is a simple example of how to schedule running an EMR notebook with a crontab.

Use case 2: Chaining EMR notebooks with Step Functions triggered by CloudWatch Events

We use demo_pyspark.ipynb and trailing_N_day.ipynb as the input notebook files for this use case. We also provide a CloudFormation template as a general guide. Please review and customize it as needed. Be aware that some of the resources deployed by this stack incur costs when they remain in use.

The following diagram illustrates the resources that the CloudFormation template creates.

The template first creates a step function to run a chain of EMR notebooks, which takes care of the following tasks:

  • Runs notebook demo_pyspark.ipynb with given parameters and waits until it’s complete. It plots a graph of the top k US states with most COVID-19 cases yesterday.
  • Runs notebook input trailing_N_day.ipynb using the output from the first task. It takes the US state with the most confirmed COVID-19 cases nationally yesterday as the input, and plots a 30-day confirmed COVID-19 case number graph, showing the case growth trend of that state until yesterday.

The template also creates a CloudWatch event that periodically triggers the step function according to the given schedule expression.

Launching the CloudFormation template

To launch your stack and provision your resources, complete the following steps:

  1. Choose Launch Stack:

This automatically launches AWS CloudFormation in your AWS account with a template. It may prompt you to sign in as needed. You can view the template on the AWS CloudFormation console as required. Make sure that you create the stack in your intended Region.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The following table describes the parameters:

Parameter Description Default Value
Stack name Enter a meaningful name for the stack, for example, emrRunnableNotebookDemo. None
ClusterId The unique ID of the EMR cluster that runs the notebook (j-*************). None
NotebookARelativePath The path and file name of the notebook input file A (demo_pyspark.ipynb), relative to the path specified for the EMR notebook. For more information, see Notebook execution CLI command samples. demo_pyspark.ipynb
NotebookBRelativePath The path and file name of the notebook input file B (trailing_N_day.ipynb), relative to the path specified for the EMR notebook. experiment/trailing_N_day.ipynb
NotebookId The unique ID of the EMR notebook to use for running the notebook (e-*****************************). None
ScheduleExpression How the notebook is scheduled to run. For more information, see Schedule Expressions for Rules. rate(1 day)
StorageLocation The Amazon S3 path where the EMR notebook is stored (s3://aws-emr-resources-************-us-west-2/notebooks/e-*************************). None
TopK The value of one of the parameters used to run notebook A. In this example, it checks the top k US states with confirmed COVID-19 cases and plots a graph for it. 20

 

  1. Enter the parameter values from the preceding table.
  2. Review the Capabilities section and select the check boxes confirming AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create Stack.

Stack creation only takes a few minutes. When the stack is complete, on the Resources tab, you can find the resources created as shown in the following screenshot.

Checking the notebook output files

When a step function is complete, you can find the execution IDs in the step function output.

We run the following command to view the output files (replace <notebook_bucket_location> with the Amazon S3 location specified for the notebook during creation and <e-*************************> with the EMR notebook ID):

% aws s3 ls --recursive s3://<notebook_bucket_location>/<e-*************************>/executions/

The aws s3 ls --recursive command returns an output similar to the following:

2020-10-16 16:39:02     267780 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:44:14     267780 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb
2020-10-16 17:00:37      18600 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:49:08     267781 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb
2020-10-16 16:59:01     267780 notebooks/e-*************************/executions/ex-*****************************/demo_pyspark.ipynb
2020-10-16 16:54:06     267780 notebooks/e-*************************/executions/ex-*****************************/trailing_N_day.ipynb.ipynb

Downloading and visualizing the results

Follow the same steps in the first use case to download and visualize the results.

The following screenshot is the graph plotted in the notebook input file A (demo_pyspark.ipynb ) output file. It shows the top 20 US states with confirmed COVID-19 cases yesterday.

The output of input file B (trailing_N_day.ipynb) plots the graph as shown in the following screenshot. It takes the US state with the most confirmed COVID-19 cases nationally yesterday as the input and plots a 30-day confirmed COVID-19 case number graph, showing the case growth trend of that state until yesterday.

This example step function is the orchestration for running two notebook input files: the second notebook uses the result from the first. It also monitors the first notebook until it is complete, and populates the Amazon S3 file location in the outputs. You can achieve more sophisticated orchestration by adding more states in the step function.

Cleaning up

To avoid ongoing charges, delete the CloudFormation stack, the EMR cluster, and any files in Amazon S3 that were created by running the examples in this post.

Conclusion

This post showed how you can schedule running an EMR notebook using crontab and the AWS CLI, and how to chain EMR notebooks with Step Functions triggered by CloudWatch events. The EMR Notebooks Execution API enables the parameterization for EMR notebooks. With this feature, you can also use orchestration services such as Apache Airflow to build ETL pipelines.


About the Authors

Fei Lang is a senior big data architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

 

Ray Liu is a software development engineer at AWS. Besides work, he enjoys traveling and spending time with family.

 

 

 

Palaniappan Nagarajan is a Software Development Engineer at Amazon EMR working mainly on EMR Notebooks. In his spare time, he likes to hike, try out different cuisines, and scan the night sky with his telescope.

 

 

Shuang Li is a senior product manager for Amazon EMR at AWS. She holds a doctoral degree in Computer Science and Engineering from Ohio State University.

Creating a source to Lakehouse data replication pipe using Apache Hudi, AWS Glue, AWS DMS, and Amazon Redshift

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/creating-a-source-to-lakehouse-data-replication-pipe-using-apache-hudi-aws-glue-aws-dms-and-amazon-redshift/

Most customers have their applications backed by various sql and nosql systems on prem and on cloud. Since the data is in various independent systems, customers struggle to derive meaningful info by combining data from all of these sources. Hence, customers create data lakes to bring their data in a single place.

Typically, a replication tool such as AWS Database Migration Service (AWS DMS) can replicate the data from your source systems to Amazon Simple Storage Service (Amazon S3). When the data is in Amazon S3, customers process it based on their requirements. A typical requirement is to sync the data in Amazon S3 with the updates on the source systems. Although it’s easy to apply updates on a relational database management system (RDBMS) that backs an online source application, it’s tough to apply this change data capture (CDC) process on your data lakes. Apache Hudi is a good way to solve this problem. Currently, you can use Hudi on Amazon EMR to create Hudi tables.

In this post, we use Apache Hudi to create tables in the AWS Glue Data Catalog using AWS Glue jobs. AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. This post enables you to take advantage of the serverless architecture of AWS Glue while upserting data in your data lake, hassle-free.

To write to Hudi tables using AWS Glue jobs, we use a JAR file created using open-source Apache Hudi. This JAR file is used as a dependency in the AWS Glue jobs created through the AWS CloudFormation template provided in this post. Steps to create the JAR file are included in the appendix.

The following diagram illustrates the architecture the CloudFormation template implements.

Prerequisites

The CloudFormation template requires you to select an Amazon Elastic Compute Cloud (Amazon EC2) key pair. This key is configured on an EC2 instance that lives in the public subnet. We use this EC2 instance to get to the Aurora cluster that lives in the private subnet. Make sure you have a key in the Region where you deploy the template. If you don’t have one, you can create a new key pair.

Solution overview

The following are the high-level implementation steps:

  1. Create a CloudFormation stack using the provided template.
  2. Connect to the Amazon Aurora cluster used as a source for this post.
  3. Run InitLoad_TestStep1.sql, in the source Amazon Aurora cluster, to create a schema and a table.

AWS DMS replicates the data from the Aurora cluster to the raw S3 bucket. AWS DMS supports a variety of sources.
The CloudFormation stack creates an AWS Glue job (HudiJob) that is scheduled to run at a frequency set in the ScheduleToRunGlueJob parameter of the CloudFormation stack. This job reads the data from the raw S3 bucket, writes to the Curated S3 bucket, and creates a Hudi table in the Data Catalog. The job also creates an Amazon Redshift external schema in the Amazon Redshift cluster created by the CloudFormation stack.

  1. You can now query the Hudi table in Amazon Athena or Amazon Redshift. Visit Creating external tables for data managed in Apache Hudi or Considerations and Limitations to query Apache Hudi datasets in Amazon Athena for details.
  2. Run IncrementalUpdatesAndInserts_TestStep2.sql on the source Aurora cluster.

This incremental data is also replicated to the raw S3 bucket through AWS DMS. HudiJob picks up the incremental data, using AWS Glue bookmarks, and applies it to the Hudi table created earlier.

  1. You can now query the changed data.

Creating your CloudFormation stack

Click on the Launch Stack button to get started and provide the following parameters:

Parameter Description
VpcCIDR CIDR range for the VPC.
PrivateSubnet1CIDR CIDR range for the first private subnet.
PrivateSubnet2CIDR CIDR range for the second private subnet.
PublicSubnetCIDR CIDR range for the public subnet.
AuroraDBMasterUserPassword Primary user password for the Aurora cluster.
RedshiftDWMasterUserPassword Primary user password for the Amazon Redshift data warehouse.
KeyName The EC2 key pair to be configured in the EC2 instance on the public subnet. This EC2 instance is used to get to the Aurora cluster in the private subnet. Select the value from the dropdown.
ClientIPCIDR Your IP address in CIDR notation. The CloudFormation template creates a security group rule that grants ingress on port 22 to this IP address. On a Mac, you can run the following command to get your IP address: curl ipecho.net/plain ; echo /32
EC2ImageId The image ID used to create the EC2 instance in the public subnet to be a jump box to connect to the source Aurora cluster. If you supply your image ID, the template uses it to create the EC2 instance.
HudiStorageType This is used by the AWS Glue job to determine if you want to create a CoW or MoR storage type table. Enter MoR if you want to create MoR storage type tables.
ScheduleToRunGlueJob The AWS Glue job runs on a schedule to pick the new files and load to the curated bucket. This parameter sets the schedule of the job.
DMSBatchUnloadIntervalInSecs AWS DMS batches the inputs from the source and loads the output to the taw bucket. This parameter defines the frequency in which the data is loaded to the raw bucket.
GlueJobDPUs The number of DPUs that are assigned to the two AWS Glue jobs.

To simplify running the template, your account is given permissions on the key used to encrypt the resources in the CloudFormation template. You can restrict that to the role if desired.

Granting Lake Formation permissions

AWS Lake Formation enables customers to set up fine grained access control for their Datalake. Detail steps to set up AWS Lake Formation can be found here.

Setting up AWS Lake Formation is out of scope for this post. However, if you have Lake Formation configured in the Region where you’re deploying this template, grant Create database permission to the LakeHouseExecuteGlueHudiJobRole role after the CloudFormation stack is successfully created.

This will ensure that you don’t get the following error while running your AWS Glue job.

org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Insufficient Lake Formation permission(s) on global_temp

Similarly grant Describe permission to the LakeHouseExecuteGlueHudiJobRole role on default database.

This will ensure that you don’t get the following error while running your AWS Glue job.

AnalysisException: 'java.lang.RuntimeException: MetaException(message:Unable to verify existence of default database: com.amazonaws.services.glue.model.AccessDeniedException: Insufficient Lake Formation permission(s) on default (Service: AWSGlue; Status Code: 400; Error Code: AccessDeniedException;

Connecting to source Aurora cluster

To connect to source Aurora cluster using SQL Workbench, complete the following steps:

  1. On SQL Workbench, under File, choose Connect window.

  1. Choose Manage Drivers.

  1. Choose PostgreSQL.
  2. For Library, use the driver JAR file.
  3. For Classname, enter org.postgresql.Driver.
  4. For Sample URL, enter jdbc:postgresql://host:port/name_of_database.

  1. Click the Create a new connection profile button.
  2. For Driver, choose your new PostgreSQL driver.
  3. For URL, enter lakehouse_source_db after port/.
  4. For Username, enter postgres.
  5. For Password, enter the same password that you used for the AuroraDBMasterUserPassword parameter while creating the CloudFormation stack.
  6. Choose SSH.
  7. On the Outputs tab of your CloudFormation stack, copy the IP address next to PublicIPOfEC2InstanceForTunnel and enter it for SSH hostname.
  8. For SSH port, enter 22.
  9. For Username, enter ec2-user.
  10. For Private key file, enter the private key for the public key chosen in the KeyName parameter of the CloudFormation stack.
  11. For Local port, enter any available local port number.
  12. On the Outputs tab of your stack, copy the value next to EndpointOfAuroraCluster and enter it for DB hostname.
  13. For DB port, enter 5432.
  14. Select Rewrite JDBC URL.


Checking the Rewrite JDBC URL checkbox will automatically feed in the value of host and port in the URL text box as shown below.

  1. Test the connection and make sure that you get a message that the connection was successful.

 

Troubleshooting

Complete the following steps if you receive this message: Could not initialize SSH tunnel: java.net.ConnectException: Operation timed out (Connection timed out)

  1. Go to your CloudFormation stack and search for LakeHouseSecurityGroup under Resources .
  2. Choose the link in the Physical ID.

  1. Select your security group.
  2. From the Actions menu, choose Edit inbound rules.

  1. Look for the rule with the description:Rule to allow connection from the SQL client to the EC2 instance used as jump box for SSH tunnel
  2. From the Source menu, choose My IP.
  3. Choose Save rules.

  1. Test the connection from your SQL Workbench again and make sure that you get a successful message.

Running the initial load script

You’re now ready to run the InitLoad_TestStep1.sql script to create some test data.

  1. Open InitLoad_TestStep1.sql in your SQL client and run it.

The output shows that 11 statements have been run.

AWS DMS replicates these inserts to your raw S3 bucket at the frequency set in the DMSBatchUnloadIntervalInSecs parameter of your CloudFormation stack.

  1. On the AWS DMS console, choose the lakehouse-aurora-src-to-raw-s3-tgt task:
  2. On the Table statistics tab, you should see the seven full load rows of employee_details have been replicated.

The lakehouse-aurora-src-to-raw-s3-tgt replication task has the following table mapping with transformation to add a schema name and a table name as additional columns:

{
   "rules":[
      {
         "rule-type":"selection",
         "rule-id":"1",
         "rule-name":"1",
         "object-locator":{
            "schema-name":"human_resources",
            "table-name":"%"
         },
         "rule-action":"include",
         "filters":[
            
         ]
      },
      {
         "rule-type":"transformation",
         "rule-id":"2",
         "rule-name":"2",
         "rule-target":"column",
         "object-locator":{
            "schema-name":"%",
            "table-name":"%"
         },
         "rule-action":"add-column",
         "value":"schema_name",
         "expression":"$SCHEMA_NAME_VAR",
         "data-type":{
            "type":"string",
            "length":50
         }
      },
      {
         "rule-type":"transformation",
         "rule-id":"3",
         "rule-name":"3",
         "rule-target":"column",
         "object-locator":{
            "schema-name":"%",
            "table-name":"%"
         },
         "rule-action":"add-column",
         "value":"table_name",
         "expression":"$TABLE_NAME_VAR",
         "data-type":{
            "type":"string",
            "length":50
         }
      }
   ]
}

These settings put the name of the source schema and table as two additional columns in the output Parquet file of AWS DMS.
These columns are used in the AWS Glue HudiJob to find out the tables that have new inserts, updates, or deletes.

  1. On the Resources tab of the CloudFormation stack, locate RawS3Bucket.
  2. Choose the Physical ID link.

  1. Navigate to human_resources/employee_details.

The LOAD00000001.parquet file is created under human_resources/employee_details. (The name of your raw bucket is different from the following screenshot).

You can also see the time of creation of this file. You should have at least one successful run of the AWS Glue job (HudiJob) after this time for the Hudi table to be created. The AWS Glue job is configured to load this data into the curated bucket at the frequency set in the ScheduleToRunGlueJob parameter of your CloudFormation stack. The default is 5 minutes.

AWS Glue job HudiJob

The following code is the script for HudiJob:

import sys
import os
import json

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import concat, col, lit, to_timestamp

from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame

import boto3
from botocore.exceptions import ClientError

args = getResolvedOptions(sys.argv, ['JOB_NAME'])

spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
glueContext = GlueContext(spark.sparkContext)
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
logger = glueContext.get_logger()

logger.info('Initialization.')
glueClient = boto3.client('glue')
ssmClient = boto3.client('ssm')
redshiftDataClient = boto3.client('redshift-data')

logger.info('Fetching configuration.')
region = os.environ['AWS_DEFAULT_REGION']

curatedS3BucketName = ssmClient.get_parameter(Name='lakehouse-curated-s3-bucket-name')['Parameter']['Value']
rawS3BucketName = ssmClient.get_parameter(Name='lakehouse-raw-s3-bucket-name')['Parameter']['Value']
hudiStorageType = ssmClient.get_parameter(Name='lakehouse-hudi-storage-type')['Parameter']['Value']

dropColumnList = ['db','table_name','Op']

logger.info('Getting list of schema.tables that have changed.')
changeTableListDyf = glueContext.create_dynamic_frame_from_options(connection_type = 's3', connection_options = {'paths': ['s3://'+rawS3BucketName], 'groupFiles': 'inPartition', 'recurse':True}, format = 'parquet', format_options={}, transformation_ctx = 'changeTableListDyf')

logger.info('Processing starts.')
if(changeTableListDyf.count() > 0):
    logger.info('Got new files to process.')
    changeTableList = changeTableListDyf.toDF().select('schema_name','table_name').distinct().rdd.map(lambda row : row.asDict()).collect()

    for dbName in set([d['schema_name'] for d in changeTableList]):
        spark.sql('CREATE DATABASE IF NOT EXISTS ' + dbName)
        redshiftDataClient.execute_statement(ClusterIdentifier='lakehouse-redshift-cluster', Database='lakehouse_dw', DbUser='rs_admin', Sql='CREATE EXTERNAL SCHEMA IF NOT EXISTS ' + dbName + ' FROM DATA CATALOG DATABASE \'' + dbName + '\' REGION \'' + region + '\' IAM_ROLE \'' + boto3.client('iam').get_role(RoleName='LakeHouseRedshiftGlueAccessRole')['Role']['Arn'] + '\'')

    for i in changeTableList:
        logger.info('Looping for ' + i['schema_name'] + '.' + i['table_name'])
        dbName = i['schema_name']
        tableNameCatalogCheck = ''
        tableName = i['table_name']
        if(hudiStorageType == 'MoR'):
            tableNameCatalogCheck = i['table_name'] + '_ro' #Assumption is that if _ro table exists then _rt table will also exist. Hence we are checking only for _ro.
        else:
            tableNameCatalogCheck = i['table_name'] #The default config in the CF template is CoW. So assumption is that if the user hasn't explicitly requested to create MoR storage type table then we will create CoW tables. Again, if the user overwrites the config with any value other than 'MoR' we will create CoW storage type tables.
        isTableExists = False
        isPrimaryKey = False
        isPartitionKey = False
        primaryKey = ''
        partitionKey = ''
        try:
            glueClient.get_table(DatabaseName=dbName,Name=tableNameCatalogCheck)
            isTableExists = True
            logger.info(dbName + '.' + tableNameCatalogCheck + ' exists.')
        except ClientError as e:
            if e.response['Error']['Code'] == 'EntityNotFoundException':
                isTableExists = False
                logger.info(dbName + '.' + tableNameCatalogCheck + ' does not exist. Table will be created.')
        try:
            table_config = json.loads(ssmClient.get_parameter(Name='lakehouse-table-' + dbName + '.' + tableName)['Parameter']['Value'])
            try:
                primaryKey = table_config['primaryKey']
                isPrimaryKey = True
                logger.info('Primary key:' + primaryKey)
            except KeyError as e:
                isPrimaryKey = False
                logger.info('Primary key not found. An append only glueparquet table will be created.')
            try:
                partitionKey = table_config['partitionKey']
                isPartitionKey = True
                logger.info('Partition key:' + partitionKey)
            except KeyError as e:
                isPartitionKey = False
                logger.info('Partition key not found. Partitions will not be created.')
        except ClientError as e:    
            if e.response['Error']['Code'] == 'ParameterNotFound':
                isPrimaryKey = False
                isPartitionKey = False
                logger.info('Config for ' + dbName + '.' + tableName + ' not found in parameter store. Non partitioned append only table will be created.')

        inputDyf = glueContext.create_dynamic_frame_from_options(connection_type = 's3', connection_options = {'paths': ['s3://' + rawS3BucketName + '/' + dbName + '/' + tableName], 'groupFiles': 'none', 'recurse':True}, format = 'parquet',transformation_ctx = tableName)
        
        inputDf = inputDyf.toDF().withColumn('update_ts_dms',to_timestamp(col('update_ts_dms')))
        
        targetPath = 's3://' + curatedS3BucketName + '/' + dbName + '/' + tableName

        morConfig = {'hoodie.datasource.write.storage.type': 'MERGE_ON_READ', 'hoodie.compact.inline': 'false', 'hoodie.compact.inline.max.delta.commits': 20, 'hoodie.parquet.small.file.limit': 0}

        commonConfig = {'className' : 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc':'false', 'hoodie.datasource.write.precombine.field': 'update_ts_dms', 'hoodie.datasource.write.recordkey.field': primaryKey, 'hoodie.table.name': tableName, 'hoodie.consistency.check.enabled': 'true', 'hoodie.datasource.hive_sync.database': dbName, 'hoodie.datasource.hive_sync.table': tableName, 'hoodie.datasource.hive_sync.enable': 'true'}

        partitionDataConfig = {'hoodie.datasource.write.partitionpath.field': partitionKey, 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.partition_fields': partitionKey}
                     
        unpartitionDataConfig = {'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
        
        incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 20, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10}
        
        initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 3, 'hoodie.datasource.write.operation': 'bulk_insert'}
        
        deleteDataConfig = {'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.EmptyHoodieRecordPayload'}

        if(hudiStorageType == 'MoR'):
            commonConfig = {**commonConfig, **morConfig}
            logger.info('MoR config appended to commonConfig.')
        
        combinedConf = {}

        if(isPrimaryKey):
            logger.info('Going the Hudi way.')
            if(isTableExists):
                logger.info('Incremental load.')
                outputDf = inputDf.filter("Op != 'D'").drop(*dropColumnList)
                if outputDf.count() > 0:
                    logger.info('Upserting data.')
                    if (isPartitionKey):
                        logger.info('Writing to partitioned Hudi table.')
                        outputDf = outputDf.withColumn(partitionKey,concat(lit(partitionKey+'='),col(partitionKey)))
                        combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig}
                        outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
                    else:
                        logger.info('Writing to unpartitioned Hudi table.')
                        combinedConf = {**commonConfig, **unpartitionDataConfig, **incrementalConfig}
                        outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
                outputDf_deleted = inputDf.filter("Op = 'D'").drop(*dropColumnList)
                if outputDf_deleted.count() > 0:
                    logger.info('Some data got deleted.')
                    if (isPartitionKey):
                        logger.info('Deleting from partitioned Hudi table.')
                        outputDf_deleted = outputDf_deleted.withColumn(partitionKey,concat(lit(partitionKey+'='),col(partitionKey)))
                        combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig, **deleteDataConfig}
                        outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
                    else:
                        logger.info('Deleting from unpartitioned Hudi table.')
                        combinedConf = {**commonConfig, **unpartitionDataConfig, **incrementalConfig, **deleteDataConfig}
                        outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
            else:
                outputDf = inputDf.drop(*dropColumnList)
                if outputDf.count() > 0:
                    logger.info('Inital load.')
                    if (isPartitionKey):
                        logger.info('Writing to partitioned Hudi table.')
                        outputDf = outputDf.withColumn(partitionKey,concat(lit(partitionKey+'='),col(partitionKey)))
                        combinedConf = {**commonConfig, **partitionDataConfig, **initLoadConfig}
                        outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
                    else:
                        logger.info('Writing to unpartitioned Hudi table.')
                        combinedConf = {**commonConfig, **unpartitionDataConfig, **initLoadConfig}
                        outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
        else:
            if (isPartitionKey):
                logger.info('Writing to partitioned glueparquet table.')
                sink = glueContext.getSink(connection_type = 's3', path= targetPath, enableUpdateCatalog = True, updateBehavior = 'UPDATE_IN_DATABASE', partitionKeys=[partitionKey])
            else:
                logger.info('Writing to unpartitioned glueparquet table.')
                sink = glueContext.getSink(connection_type = 's3', path= targetPath, enableUpdateCatalog = True, updateBehavior = 'UPDATE_IN_DATABASE')
            sink.setFormat('glueparquet')
            sink.setCatalogInfo(catalogDatabase = dbName, catalogTableName = tableName)
            outputDyf = DynamicFrame.fromDF(inputDf.drop(*dropColumnList), glueContext, 'outputDyf')
            sink.writeFrame(outputDyf)

job.commit()

Hudi tables need a primary key to perform upserts. Hudi tables can also be partitioned based on a certain key. We get the names of the primary key and the partition key from AWS Systems Manager Parameter Store.

The HudiJob script looks for an AWS Systems Manager Parameter with the naming format lakehouse-table-<schema_name>.<table_name>. It compares the name of the parameter with the name of the schema and table columns, added by AWS DMS, to get the primary key and the partition key for the Hudi table.

The CloudFormation template creates lakehouse-table-human_resources.employee_details AWS Systems Manager Parameter, as shown on the Resources tab.

If you choose the Physical ID link, you can locate the value of the AWS Systems Manager Parameter. The AWS Systems Manager Parameter has {"primaryKey": "emp_no", "partitionKey": "department"} value in it.

Because of the value in the lakehouse-table-human_resources.employee_details AWS Systems Manager Parameter, the AWS Glue script creates a human_resources.employee_details Hudi table partitioned on the department column for the employee_details table created in the source using the InitLoad_TestStep1.sql script. The HudiJob also uses the emp_no column as the primary key for upserts.

If you reuse this CloudFormation template and create your own table, you have to create an associated AWS Systems Manager Parameter with the naming convention lakehouse-table-<schema_name>.<table_name>. Keep in mind the following:

  • If you don’t create a parameter, the script creates an unpartitioned glueparquet append-only table.
  • If you create a parameter that only has the primaryKey part in the value, the script creates an unpartitioned Hudi table.
  • If you create a parameter that only has the partitionKey part in the value, the script creates a partitioned glueparquet append-only table.

If you have too many tables to replicate, you can also store the primary key and partition key configuration in Amazon DynamoDB or Amazon S3 and change the code accordingly.

In the InitLoad_TestStep1.sql script, replica identity for human_resources.employee_details table is set to full. This makes sure that AWS DMS transfers the full delete record to Amazon S3. Having this delete record is important for the HudiJob script to delete the record from the Hudi table. A full delete record from AWS DMS for the human_resources.employee_details table looks like the following:

{ "Op": "D", "update_ts_dms": "2020-10-25 07:57:48.589284", "emp_no": 3, "name": "Jeff", "department": "Finance", "city": "Tokyo", "salary": 55000, "schema_name": "human_resources", "table_name": "employee_details"}

The schema_name, and table_name columns are added by AWS DMS because of the task configuration shared previously.update_ts_dms has been set as the value for TimestampColumnName S3 setting in AWS DMS S3 Endpoint.Op is added by AWS DMS for cdc and it indicates source DB operations in migrated S3 data.

We also set spark.serializer in the script. This setting is required for Hudi.

In HudiJob script, you can also find a few Python dict that store various Hudi configuration properties. These configurations are just for demo purposes; you have to adjust them based on your workload. For more information about Hudi configurations, see Configurations.

HudiJob is scheduled to run every 5 minutes by default. The frequency is set by the ScheduleToRunGlueJob parameter of the CloudFormation template. Make sure that you successfully run HudiJob at least one time after the source data lands in the raw S3 bucket. The screenshot in Step 6 of Running the initial load script section confirms that AWS DMS put the LOAD00000001.parquet file in the raw bucket at 11:54:41 AM and following screenshot confirms that the job execution started at 11:55 AM.

The job creates a Hudi table in the AWS Glue Data Catalog (see the following screenshot). The table is partitioned on the department column.

Granting AWS Lake Formation permissions

If you have AWS Lake Formation enabled, make sure that you grant Select permission on the human_resources.employee_details table to the role/user used to run Athena query. Similarly, you also have to grant Select permission on the human_resources.employee_details table to the LakeHouseRedshiftGlueAccessRole role so you can query human_resources.employee_details in Amazon Redshift.

Grant Drop permission on the human_resources database to LakeHouseExecuteLambdaFnsRole so that the template can delete the database when you delete the template. Also, the CloudFormation template does not roll back any AWS Lake Formation grants or changes that are manually applied.

Granting access to KMS key

The curated S3 bucket is encrypted by lakehouse-key, which is an AWS Key Management Service (AWS KMS) customer managed key created by AWS CloudFormation template.

To run the query in Athena, you have to add the ARN of the role/user used to run the Athena query in the Allow use of the key section in the key policy.

This will ensure that you don’t get com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; error while running your Athena query.

You might not have to execute the above KMS policy change if you have kept the default of granting access to the AWS account and the role/user used to run Athena query has the necessary KMS related policies attached to it.

Confirming job completion

When HudiJob is complete, you can see the files in the curated bucket.

  1. On the Resources tab, search for CuratedS3Bucket.
  2. Choose the Physical ID link.

The following screenshot shows the timestamp on the initial load.

  1. Navigate to the department=Finance prefix and select the Parquet file.
  2. Choose Select from.
  1. For File format, select Parquet.
  2. Choose Show file preview.

You can see the value of the timestamp in the update_ts_dms column.

Querying the Hudi table

You can now query your data in Amazon Athena or Amazon Redshift.

Querying in Amazon Athena

Query the human_resources.employee_details table in Amazon Athena with the following code:

SELECT emp_no,
         name,
         city,
         salary,
         department,
         from_unixtime(update_ts_dms/1000000,'America/Los_Angeles') update_ts_dms_LA,
         from_unixtime(update_ts_dms/1000000,'UTC') update_ts_dms_UTC         
FROM "human_resources"."employee_details"
ORDER BY emp_no

The timestamp for all the records matches the timestamp in the update_ts_dms column in the earlier screenshot.

Querying in Redshift Spectrum

Read query your table in Redshift Spectrum for Apache Hudi support in Amazon Redshift.

  1. On the Amazon Redshift console, locate lakehouse-redshift-cluster.
  2. Choose Query cluster.

  1. For Database name, enter lakehouse_dw.
  2. For Database user, enter rs_admin.
  3. For Database password, enter the password that you used for the RedshiftDWMasterUserPassword parameter in the CloudFormation template.

  1. Enter the following query for the human_resources.employee_details table:
    SELECT emp_no,
             name,
             city,
             salary,
             department,
             (TIMESTAMP 'epoch' + update_ts_dms/1000000 * interval '1 second') AT TIME ZONE 'utc' AT TIME ZONE 'america/los_angeles' update_ts_dms_LA,
             (TIMESTAMP 'epoch' + update_ts_dms/1000000 * interval '1 second') AT TIME ZONE 'utc' update_ts_dms_UTC
    FROM human_resources.employee_details
    ORDER BY emp_no 

The following screenshot shows the query output.

Running the incremental load script

We now run the IncrementalUpdatesAndInserts_TestStep2.sql script. The output shows that 6 statements were run.

AWS DMS now shows that it has replicated the new incremental changes. The changes are replicated at a frequency set in DMSBatchUnloadIntervalInSecs parameter of the CloudFormation stack.

This creates another Parquet file in the raw S3 bucket.

The incremental updates are loaded into the Hudi table according to the chosen frequency to run the job (the ScheduleToRunGlueJob parameter). The HudiJobscript uses job bookmarks to find out the incremental load so it only processes the new files brought in through AWS DMS.

Confirming job completion

Make sure that HudiJob runs successfully at least one time after the incremental file arrives in the raw bucket. The previous screenshot shows that the incremental file arrived in the raw bucket at 1:18:38 PM and the following screenshot shows that the job started at 1:20 PM.

Querying the changed data

You can now check the table in Athena and Amazon Redshift. Both results show that emp_no 3 is deleted, 8 and 9 have been added, and 2 and 5 have been updated.

The following screenshot shows the results in Athena.

The following screenshot shows the results in Redshift Spectrum.

AWS Glue Job HudiMoRCompactionJob

The CloudFormation template also deploys the AWS Glue job HudiMoRCompactionJob. This job is not scheduled; you only use it if you choose the MoR storage type. To execute the pipe for MoR storage type instead of CoW storage type, delete the CloudFormation stack and create it again. After creation, replace CoW in lakehouse-hudi-storage-type AWS Systems Manager Parameter with MoR.

If you use MoR storage type, the incremental updates are stored in log files. You can’t see the updates in the _ro (read optimized) view, but can see them in the _rt view. Amazon Athena documentation and Amazon Redshift documentation gives more details about support and considerations for Apache Hudi.

To see the incremental data in the _ro view, run the HudiMoRCompactionJob job. For more information about Hudi storage types and views, see Hudi Dataset Storage Types and Storage Types & Views. The following code is an example of the CLI command used to run HudiMoRCompactionJob job:

aws glue start-job-run --job-name HudiMoRCompactionJob --arguments="--DB_NAME=human_resources","--TABLE_NAME=employee_details","--IS_PARTITIONED=true"

You can decide on the frequency of running this job. You don’t have to run the job immediately after the HudiJob. You should run this job when you want the data to be available in the _ro view. You have to pass the schema name and the table name to this script so it knows the table to compact.

Additional considerations

The JAR file we use in this post has not been tested for AWS Glue streaming jobs. Additionally, there are some hardcoded Hudi options in the HudiJob script. These options are set for the sample table that we create for this post. Update the options based on your workload. 

Conclusion

In this post, we created AWS Glue 2.0 jobs that moved the source upserts and deletes into Hudi tables. The code creates tables in the AWS GLue Data Catalog and updates partitions so you don’t have to run the crawlers to update them.

This post simplified your LakeHouse code base by giving you the benefits of Apache Hudi along with serverless AWS Glue. We also showed how to create an source to LakeHouse replication system using AWS Glue, AWS DMS, and Amazon Redshift with minimum overhead.


Appendix

We can write to Hudi tables because of the hudi-spark.jar file that we downloaded to our DependentJarsAndTempS3Bucket S3 bucket with the CloudFormation template. The path to this file is added as a dependency in both the AWS Glue jobs. This file is based on open-source Hudi. To create the JAR file, complete the following steps:

  1. Get Hudi 0.5.3 and unzip it using the following code:
    wget https://github.com/apache/hudi/archive/release-0.5.3.zip
    unzip hudi-release-0.5.3.zip

  2. Edit Hudi pom.xml:
    vi hudi-release-0.5.3/pom.xml

    1. Remove the following code to make the build process faster:
      <module>packaging/hudi-hadoop-mr-bundle</module>
      <module>packaging/hudi-hive-bundle</module>
      <module>packaging/hudi-presto-bundle</module>
      <module>packaging/hudi-utilities-bundle</module>
      <module>packaging/hudi-timeline-server-bundle</module>
      <module>docker/hoodie/hadoop</module>
      <module>hudi-integ-test</module>

    2. Change the versions of all three dependencies of httpcomponents to 4.4.1. The following is the original code:
      <!-- Httpcomponents -->
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>fluent-hc</artifactId>
              <version>4.3.2</version>
            </dependency>
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpcore</artifactId>
              <version>4.3.2</version>
            </dependency>
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpclient</artifactId>
              <version>4.3.6</version>
            </dependency>

      The following is the replacement code:

      <!-- Httpcomponents -->
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>fluent-hc</artifactId>
              <version>4.4.1</version>
            </dependency>
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpcore</artifactId>
              <version>4.4.1</version>
            </dependency>
            <dependency>
              <groupId>org.apache.httpcomponents</groupId>
              <artifactId>httpclient</artifactId>
              <version>4.4.1</version>
            </dependency>

  3. Build the JAR file:
    mvn clean package -DskipTests -DskipITs -f <Full path of the hudi-release-0.5.3 dir>

  4. You can now get the JAR from the following location:
hudi-release-0.5.3/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.5.3-rc2.jar

The other JAR dependency used in the AWS Glue jobs is spark-avro_2.11-2.4.4.jar.


About the Author

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with the customers on their use cases, architects a solution to solve their business problems and helps the customers build an scalable prototype. Prior to his journey in AWS, Vishal helped customers implement BI, DW and DataLake projects in US and Australia.

Normalize data with Amazon Elasticsearch Service ingest pipelines

Post Syndicated from Vijay Injam original https://aws.amazon.com/blogs/big-data/normalize-data-with-amazon-elasticsearch-service-ingest-pipelines/

Amazon Elasticsearch Service (Amazon ES) is a fully managed service that makes it easy for you to deploy, secure, and run Elasticsearch cost-effectively at scale. Search and log analytics are the two most popular use cases for Amazon ES. In log analytics at scale, a common pattern is to create indexes from multiple sources. In these use cases, how can you ensure that all the incoming data follows a specific, predefined format if it’s operationally not feasible to apply checks in each data source? You can use Elasticsearch ingest pipelines to normalize all the incoming data and create indexes with the predefined format.

What’s an ingest pipeline?

An ingest pipeline lets you use some of your Amazon ES domain processing power to apply to a set of processors during indexing. Ingest pipeline applies processors in order, the output of one processor moving to the next processor in the pipe. You define a pipeline with the Elasticsearch _ingest API. The following screenshot illustrates this architecture.

To find the available ingest processors in your Amazon ES domain, enter the following code:

GET _ingest/pipeline/

Solution overview

In this post, we discuss three log analytics use cases where data normalization is a common technique.

We create three pipelines and normalize the data for each use case. The following diagram illustrates this architecture.

Use case 1

In this first use case, Amazon ES domain has three sources: logstash, Fluentd, and AWS Lambda. Your logstash source sends the data to an index with the name index-YYYY.MM.DD.HH (hours in the end). When you have an error in the Fluentd source, it creates the index named index-YYYY.MM.DD (missing the hours). Your domain creates indexes for both the formats, which is not what you intended.

One way to correct the index name is to calculate the hours of the ingested data and assign the value to the index. If you can’t identify any pattern, or identify further issues to the indexing name, you need to segregate the data to a different index (for example, format_error) for further analysis.

Use case 2

If your application uses time-series data and analyzes data from fixed time windows, your data sources can sometimes send data from a prior time window. In this use case, you need to check for the incoming data and discard data that doesn’t fit in the current time window.

Use case 3

In some use cases, the value for a key can contain large strings with common prefixes. End-users typically use wild card characters (*) with the prefix to search on these fields. If your application or Kibana dashboards contain several wild card queries, it can increase CPU utilization and overall search lateness. You can address this by identifying the prefixes from the values and creating a new field with the data type as a keyword. You can use Term queries for the keywords and improve search performance.

Pipeline 1: pipeline_normalize_index

The default pipeline for incoming data is pipeline_normalize_index. This pipeline performs the following actions:

  • Checks if the incoming data belongs to the current date.
  • Checks if the data has any errors in the index name.
  • Segregates the data:
    • If it doesn’t find any errors, it pushes the data to pipeline_normalize_data.
    • If it finds errors, it pushes the pipeline to pipeline_fix_index.

Checking the index date

In this step, you can create an index pipeline using a script processor, which lets you create a script and execute within the pipeline.

Use the Set processor to add _ingest.timestamp to doc_received_date and compare the index date to the document received date. The script processor lets you create a script using painless scripts. You can create a script to check if the index date matches the doc_received_date. The script processor let you access the ingest document using the ctx variable. See the following code:

       "set":{
            "field":"doc_received_date",
            "value":"{{_ingest.timestamp}}"
         }
      },
      {
         "script":{
            "lang":"painless",
            "source": """
                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd");
                    String dateandhour = ctx._index.substring(ctx._index.indexOf('-') + 1);
                    LocalDate indexdate = LocalDate.parse(dateandhour.substring(0, 10), formatter);
                    ZonedDateTime zonedDateTime = ZonedDateTime.parse(ctx.doc_received_date, DateTimeFormatter.ISO_DATE_TIME);
                    LocalDate doc_received_date = zonedDateTime.toLocalDate();
                    if (doc_received_date.isEqual(indexdate)) {
                        ctx.index_purge = "N";
                    } else {
                        ctx.index_purge = "Y";
                    }
                    if (dateandhour.length() > 10) {
                        ctx.indexformat_error = "N";
                    } else {
                        ctx.indexformat_error = "Y";
                    }
        """,

Checking for index name errors

You can use the same script processor from the previous step to check if the index name matches the format index-YYYY.MM.DD.HH or index-YYYY.MM.DD. See the following code:

if (dateandhour.length() > 10) {
                        ctx.indexformat_error = "N";
                    } else {
                        ctx.indexformat_error = "Y";
                    }

Segregating the data

If the index date doesn’t match the _ingest.timestamp, you can drop the request using the drop processor. If the index name doesn’t match the format index-YYYY.MM.DD, you can segregate the data to pipeline pipeline_verify_index_date and proceed to the pipeline pipeline_normalize_data. If conditions aren’t met, you can proceed to the pipeline pipeline_indexformat_errors or assign a default index indexing_errors. If no are issues found, you proceed to the pipeline pipeline_normalize_data. See the following code:

 "pipeline":{
            "if":"ctx.indexformat_error == 'Y'",
            "name":"pipeline_fix_index_name"
         }
      },
      {
         "remove":{
            "field":[
               "doc_received_date",
               "index_purge",
               "indexformat_error"
            ],
            "ignore_missing":true
         }
      },
      {
         "pipeline":{
            "name":"pipeline_normalize_data"

The following code is an example pipeline:

PUT _ingest/pipeline/pipeline_normalize_index
{
   "description":"pipeline_normalize_index",
   "processors":[
      {
         "set":{
            "field":"doc_received_date",
            "value":"{{_ingest.timestamp}}"
         }
      },
      {
         "script":{
            "lang":"painless",
            "source": """
                    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy.MM.dd");
                    String dateandhour = ctx._index.substring(ctx._index.indexOf('-') + 1);
                    LocalDate indexdate = LocalDate.parse(dateandhour.substring(0, 10), formatter);
                    ZonedDateTime zonedDateTime = ZonedDateTime.parse(ctx.doc_received_date, DateTimeFormatter.ISO_DATE_TIME);
                    LocalDate doc_received_date = zonedDateTime.toLocalDate();
                    if (doc_received_date.isEqual(indexdate)) {
                        ctx.index_purge = "N";
                    } else {
                        ctx.index_purge = "Y";
                    }
                    if (dateandhour.length() > 10) {
                        ctx.indexformat_error = "N";
                    } else {
                        ctx.indexformat_error = "Y";
                    }
        """,
         "on_failure":[
            {
               "set":{
                  "field":"Amazon_es_PipelineError",
                  "value":"at Script processor - Purge older Index or Index date error"
               }
            },
            {
               "set":{
                  "field":"_index",
                  "value":"indexing_errors"
               }
            }
         ]
      },
      {
         "drop":{
            "if":"ctx.index_purge == 'Y'"
         }
      },
      {
         "pipeline":{
            "if":"ctx.indexformat_error == 'Y'",
            "name":"pipeline_fix_index_name"
         }
      },
      {
         "remove":{
            "field":[
               "doc_received_date",
               "index_purge",
               "indexformat_error"
            ],
            "ignore_missing":true
         }
      },
      {
         "pipeline":{
            "name":"pipeline_normalize_data"
         }
      }
   ]
}

Pipeline 2: pipeline_normalize_data

The pipeline pipeline_normalize_data fixes index data. It extracts the prefix from the defined field and creates a new field. You can use the new field for Term queries.

In this step, you can use a grok processor to extract prefixes from the existing fields and create a new field that you can use for term queries. The output of this pipeline creates the index. See the following code of an example pipeline:

PUT _ingest/pipeline/pipeline_normalize_data
{
  "description":"pipeline_normalize_data",
  "version":1,
  "processors":[
     {
        "grok":{
           "field":"application_type",
           "patterns":[
              "%{WORD:application_group}"
           ],
           "ignore_missing":true,
           "on_failure":[
              {
                 "set":{
                    "field":"Amazon_es_PipelineError",
                    "value":"application_type error"
                 }
              }
           ]
        }
     }
  ]
}

Pipeline 3: pipeline_fix_index

This pipeline fixes the index name. The indexing errors identified in pipeline_normalize_Index are the incoming data points for this pipeline. pipeline_fix_index extracts the hours from the _ingest.timestamp and appends it to the index name.

The index name errors identified from Pipeline 1 are the data source for this pipeline. You can use the script processor to write a painless script. The script extracts hours (HH) from the _ingest.timestamp and appends it to the _index. See the following code of the example pipeline:

PUT _ingest/pipeline/pipeline_fix_index_name
    {
      "description":"pipeline_fix_index_name",
      "processors":[
         {
            "set":{
               "field":"doc_received_date",
               "value":"{{_ingest.timestamp}}"
            }
         },
         {
            "script":{
               "lang":"painless",
               "source": 
               """
               ZonedDateTime zonedDateTime = ZonedDateTime.parse(ctx.doc_received_date, DateTimeFormatter.ISO_DATE_TIME);
               LocalDate doc_received_date = zonedDateTime.toLocalDate();
               String receiveddatehour = zonedDateTime.getHour().toString();
               if (zonedDateTime.getHour() < 10) {
                    receiveddatehour = "0" + zonedDateTime.getHour();
               }
               ctx._index = ctx._index + "." + receiveddatehour;
               """,
               "on_failure":[
                  {
                     "set":{
                        "field":"Amazon_es_PipelineError",
                        "value":"at Script processor - Purge older Index or Index date error"
                     }
                  },
                  {
                     "set":{
                        "field":"_index",
                        "value":"indexformat_errors"
                     }
                  }
               ]
            }
         },
         {
            "remove":{
               "field":[
                  "doc_received_date"
               ],
               "ignore_missing":true
            }
         },
         {
            "pipeline":{
               "name":"pipeline_normalize_data"
            }
         }
      ]
   }

Adding the default pipeline to the index template

After creating all the pipelines, add the default pipeline to the index template. See the following code:

"default_pipeline" : "pipeline_normalize_index"

Summary

You can normalize data, fix indexing errors, and segregate operation data and anomalies by using ingest pipelines. Although you can use one pipeline with several processors (depending on the use case), indexing pipelines provides an efficient way to utilize compute resources and operational resources by eliminating unwanted indexes.

 


About the Author

Vijay Injam is a Data Architect with Amazon Web Services.

 

 

 

 

 

Kevin Fallis is an AWS specialist search solutions architect. His passion at AWS is to help customers leverage the correct mix of AWS services to achieve success for their business goals. His after-work activities include family, DIY projects, carpentry, playing drums, and all things music.

Best practices using AWS SCT and AWS Snowball to migrate from Teradata to Amazon Redshift

Post Syndicated from Ajinkya Puranik original https://aws.amazon.com/blogs/big-data/best-practices-using-aws-sct-and-aws-snowball-to-migrate-from-teradata-to-amazon-redshift/

This is a guest post from ZS. In their own words, “ZS is a professional services firm that works closely with companies to help develop and deliver products and solutions that drive customer value and company results. ZS engagements involve a blend of technology, consulting, analytics, and operations, and are targeted toward improving the commercial experience for clients.”

This blog is about the approaches evaluated and eventually chosen for ZS’s cloud transformation journey specifically for adoption of Amazon Redshift from prior Teradata based data warehousing solution.

ZS, a professional services firm that works side by side with companies to help develop and deliver products that drive customer value and company results. We leverage our extensive industry expertise, leading-edge analytics, technology and strategies to create solutions that work in the real world. With more than 35 years of experience and over 7,500 ZS employees in 24 offices worldwide, we are passionately committed to helping companies and their customers thrive.

ZS used Teradata as the primary data warehouse solution for several years. Part due to high ownership and operating cost, we started looking for an optimal solution which could provide scaling flexibility, lower maintenance liability and access accelerated innovation in the industry. This was achievable through solutions hosted on a cloud platform like AWS which ZS has already been using for numerous business workloads over the years.

Considerations for migration

Following were the three key areas which were critical for our Teradata to Amazon Redshift migration planning.

Table structures

The process included migrating the database schema first and then migrating the actual data from the databases. The schema on Amazon Redshift needed to be ready before loading the data from Amazon Simple Storage Service (Amazon S3).

AWS Schema Conversion Tool (SCT) helped in migrating table structures to Amazon Redshift, which converted the data types used for columns in Teradata tables into the corresponding Amazon Redshift data types. The AWS SCT tool also helped convert the table definition from Teradata to Amazon Redshift to include the appropriate keys, such as the Distribution Key/Sort Key. How to use the AWS SCT has been explained in the later sections of this blog

Database objects and data types

Teradata databases can hold a variety of database objects apart from tables like views, stored procedures, macros, User Defined Functions (UDF) and so on. The data types of the columns used in Teradata tables needed to be converted into the appropriate data types on Amazon Redshift. For other objects like views, stored procedures, the definitions from Teradata were exported and fresh objects were created in Amazon Redshift with appropriate changes in the new definitions. The AWS SCT can help in identifying the objects that need rework while migrating to Amazon Redshift.

Transferring data to AWS

Third and one of the major considerations was migrating the actual data to AWS. ZS’s use cases and isolation requirements were such that neither was Direct connect used in general nor were all AWS VPCs connected to corporate / on-premises network via VPN Tunnels. Data once exported out of Teradata gets uncompressed and expands approximately 4x resulting in requirement for data storage on local staging servers. Each ZS client workload had its respective warehouse on source and destination which also varied in size and had respective isolate change management timelines. Given these considerations we designed two use case specific approaches for transferring the exported data from the Teradata database to Amazon S3:

  • AWS Snowball – For databases larger than 4TB, we chose to transfer the data using AWS Snowball. Once the data was exported out of Teradata it was pushed to AWS Snowball periodically in batches. Resulting in optimal use of the storage space on the staging servers.
  • AWS CLI upload – For databases smaller than 4TB, data sets were exported from Teradata to staging servers and uploaded to Amazon S3 over the internet using AWS Command Line Interface (AWS CLI). These data sets were uploaded during non-business hours to minimize the impact on the ZS on-premises data center network bandwidth

The following diagram illustrates this architecture

Challenges and constraints

Exporting the data

The amount of data that had to be exported from the Teradata systems was 100+ TB (compressed). When exported, this would potentially expand to 500+TB. We needed a solution that could export this scale efficiently. Staging such large data volumes before migrating to AWS was a challenge due to limited on-premises SAN storage capacity. The mitigation chosen was to export in batches such that the exported data could be moved away from the staging server in a rolling fashion thus keeping space available throughout the migration. For certain datasets, due to volumes we further re-compressed the exported data before migration to Amazon S3.

Transferring the data

ZS had 150+ databases within our Teradata systems used across numerous ZS client initiatives. For certain projects, the data even had to be transferred to the client’s AWS account requiring respective unique processes while technology foundation was reusable. As alluded to earlier, due to varying dataset sizes per client workload, respective nuanced approaches were designed.

Initial approach for the solution

A cross functional team comprising of expertise across data warehousing, storage, network, cloud native technologies, business was formed at ZS which was also supported by AWS experts brought in via ZS’s AWS Partnership.

Primary focus at beginning was placed on finalizing data migration approaches. One such method that we tried was to use the AWS SCT to copy the schema onto Amazon Redshift and transfer the data to Amazon S3 using SCT Migration mode extract and upload. We also looked at file interface of AWS Snowball Edge to eliminate the need of having local storage for migration and directly exporting the Teradata exports on AWS Snowball Edge.

Approach constraints

While choosing a final approach, we came across the following challenges:

  1. Data export speeds were a major factor, considering the huge amount of data to migrate. We adopted the Teradata Parallel Transporter (TPT) approach because it showed better runtimes.
  2. Teradata holds up to 4X compressed data, which gets uncompressed post export. Holding such large datasets on a staging server was not feasible due to storage constraints.
  3. AWS Snowball Edge was evaluated instead of AWS Snowball to test the advantages of attaching it as a direct NFS to staging servers. However, since maximum file size supported by snowball edge NFS interface is 150 GB, we decided to continue with AWS Snowball.

TPT scripts method

Teradata Parallel Transporter (TPT) scripts were leveraged to export the data since it provided faster export speeds from Teradata servers compared to alternatives. We prepared the Teradata Parallel Transporter (TPT) scripts and launched these through Linux servers. Before starting the export, we had to ensure that enough free space was available on the server(s) to accommodate the export dumps.

The advantages of using TPT scripts to export data from Teradata tables were as follows:

  • Parallel processing to export data, which provided faster runtimes
  • Exporting varied data types into text format, which could be loaded into Amazon Redshift

Then the data was exported on the same servers where the TPT scripts were run. From here the data was copied either to the Amazon S3 bucket through the AWS CLI that was installed on the same server or to the Snowball device.

Final architecture

The hybrid cloud architecture we zeroed in on is depicted in picture below comprising of ZS’s on-premises data center hosting Teradata appliance, AWS destination environments and intermediary staging as well as shipping and data transfer networks. AWS SCT was leveraged for Schema migration and TPT exports for the data migration. The TPT export scripts were executed on the staging servers and the data was exported onto shared storage which was attached to staging servers. After the exports were completed the data was copied to AWS S3 using either AWS CLI for S3 or was pushed to AWS Snowball depending on the data size. The Snowball device was configured within the same network as the staging servers to ensure optimal transfer latency. Once data was copied completely onto AWS Snowball, it was shipped to AWS where data was transferred into the corresponding Amazon S3 bucket. On the AWS side, we had the S3 bucket for the corresponding Amazon Redshift cluster that held the data before loading into it.

Exporting the data

The TPT script is very effective when exporting huge amounts of data from the proprietary Teradata systems. You can prepare and deploy export scripts on a server within the same network as the Teradata appliance, which enables high export speeds.

The TPT export script is a combination of 1) Declaration section 2) Loop with built-in commands. Export dump with logs are generated as outputs.

Declaration section

The declaration section is where we initialize all the parameters, like the system identifier known as the tdpid, login user name, and delimiter that are used in the output files. See the following code that sets up shell variables:

#!/bin/ksh  
 split_file_no=3  
 SourceTdpId=<cop alias entries from hosts file or IP>  
 SourceUserName=<user id having read access on the DB tables>  
 SourceUserPassword=  
 DDLPrivateLogName=ddlprivate.log  
 ExportPrivateLogName=exportprivate.log  
 TargetErrorList=3807  
 TargetFormat=delimited  
 TargetTextDelimiter=^ (can be decided based on the column values)  
 TargetOpenMode=write  
 SpoolMode=NoSpool  
 MaxDecimalDigits=31  

Loop with built-in commands:

The values for the required variables were passed from three input files:

  • <databasename>.<tablename>
  • Definition of the TPT export operator
  • Job variables file (this file gets removed at the end of export)

See the following shell script that uses shell and TPT utility commands:

  fn_read_table_schema()   
  {   
  while read database table   
  do   
    SourceWorkingDatabase=${database}   
    TargetFileName=${database}.${table}   
    echo "SourceTdpId = ""'"${SourceTdpId}"'" $'\n' ",""SourceLogonMech = ""'"${SourceLogonMech}"'" $'\n' ","   
       "SourceUserName = ""'"${SourceUserName}"'" $'\n' "," "SourceUserPassword = ""'"${SourceUserPassword}"'" $'\n' ","   
       "SourceWorkingDatabase = ""'"${SourceWorkingDatabase}"'" $'\n' "," "DDLPrivateLogName = ""'"${DDLPrivateLogName}"'" $'\n' ","   
       "ExportPrivateLogName = ""'"${ExportPrivateLogName}"'" $'\n' "," "TargetErrorList = ""[""'"${TargetErrorList}"'""]" $'\n' ","   
       "TargetFileName = ""'"${TargetFileName}".dat""'" $'\n' "," "TargetFormat = ""'"${TargetFormat}"'" $'\n' ","   
       "TargetTextDelimiter = ""'"${TargetTextDelimiter}"'" $'\n' "," "TargetOpenMode = ""'"${TargetOpenMode}"'" $'\n' ","   
       "SpoolMode = ""'"${SpoolMode}"'" $'\n' "," "SelectStmt = ""'""select * from ${database}""."${table}"'" $'\n' >> jobvar.txt   
    chmod 777 jobvar.txt   
    tbuild -j ${table} -f tpt2test_2.tpt -v jobvar.txt   
    rm -rf jobvar.txt   
    log_total_records "${TargetFileName}"   
  done<tablename   
  }  

Export dump and logs

The data exported from the Teradata system through the TPT scripts was placed on the staging server. To ensure the quality of the exported data, we verified that the record counts in the log file, created during the TPT export, matched with the table row counts.

Table row count in Teradata

TPT exported dataset row count

The TPT scripts generated one file for every Teradata table. The file format of these files was text with the .dat extension. See the following screenshot.

You can optimize data loading into Amazon Redshift tables by splitting the corresponding file (dataset) into subsets of equal sizes. The number of such subsets should ideally be equal to or a multiple of the number of slices for the Amazon Redshift node type configured in the cluster. We chose to split the TPT output files using the Linux split command on the TPT server:

‘split -C 20m --numeric-suffixes input_filename output_prefix’

For more information efficiently loading the Amazon Redshift tables, see Top 8 Best Practices for High-Performance ETL Processing Using Amazon Redshift and Best Practices for Micro-Batch Loading on Amazon Redshift.

Transferring data to S3 buckets

ZS leveraged AWS account level isolation for many of our client solutions to align with respective compliance controls. AWS Snowball is associated with a single AWS account, and to achieve full client data isolation, separate devices were shipped for each large use case. As indicated above, we adopted two methods to transfer the data based on the export size for each client workload:

  • AWS CLI – Use when databases are smaller than 4TB.
  • Snowball – Use when databases are bigger than 4TB or when data needed to be loaded to a ZS owned Client Dedicated account.

Transferring data through the AWS CLI

Transferring the data via the AWS CLI includes the following steps:

  1. Install and configure the AWS CLI utility on ZS on-premises Linux (staging) server
  2. Export datasets out from Teradata on the staging server.
  3. Copy the exported datasets to Amazon S3 using the AWS CLI:

aws s3 cp filename.txt s3://aws-s3-bucket-name/foldername/

Transferring data through Snowball

To transfer the data with Snowball, complete the following steps:

  1. Create a Snowball job on the AWS Management Console and order the Snowball device.
  2. Configure the Snowball on ZS’s on-premises data center network and install the Snowball client on the staging server.
  3. Unlock the Snowball device by downloading the manifest file and an unlock code from the console, as shown in the following code:

snowball start -i XX.XX.XX.XX -m /home/abcd/XXXXXXXXX_manifest.bin -u XXXXXXXXXXX

  1. Use the Snowball CLI to list the S3 Bucket associated with Snowball.

snowball s3 ls

  1. Copy the files to Snowball:

snowball cp /location/of/the/exported/files s3://Bucket_name/Target/

Transferring the table structure to Amazon Redshift

There are a few differences in the table definition format between Amazon Redshift and Teradata. The AWS SCT tool helps convert the Teradata table structure into an appropriate Amazon Redshift table structure.

To transfer the Teradata table structure to Amazon Redshift, complete the following steps:

  1. Connect to the on-premises Teradata systems and the Amazon Redshift cluster endpoint.

  1. Select the specific table from Teradata and right-click the option Convert schema. This converts the table definition into the Amazon Redshift equivalent.

  1. In the Amazon Redshift section of the AWS SCT console, choose Apply to database when the table conversion is complete to create the table structure on Amazon Redshift.

Pushing the data to the tables

After you migrate the required data to the appropriate S3 bucket, convert the tables as per usability, and apply the tables to Amazon Redshift, you can push the data to these tables via the COPY command:

copy AXXXX_MAIN.table1  
 from 's3://aws-s3-bucket-name/AXXXX_MAIN.table1.dat'  
 iam_role 'arn:aws:iam::XXXXXXX:role/aws-iam-role '  
 delimiter '|'  
 region 'us-XXXX-1'; 

The naming convention we used for the exported datasets was <databasename>.<tablename>. The table structures (DDLs) were migrated through AWS SCT and the table names matched the dataset names. Therefore, when we created the COPY commands, we simply had to match the target table name in Amazon Redshift with that of the datasets on Amazon S3. For more information about this process, see Using the COPY command to load from Amazon S3.

Conclusion

In this blog, we intended to convey our journey and options evaluated before zeroing on one to transform on-premise Teradata data warehouse workloads onto Amazon Redshift at scale. Process built around multiple tools including AWS SCT, Teradata Parallel Transporter, and AWS Snowball facilitated our transformation

For more information about AWS SCT, see Introducing AWS Schema Conversion Tool Version 1.0.502. For more information about Snowball, see AWS Import/Export Snowball – Transfer 1 Petabyte Per Week Using Amazon-Owned Storage Appliances.

Disclaimer: The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.

 


About the Authors

Ajinkya Puranik is a Cloud Database Lead within Cloud Centre of Excellence at ZS Associates. He has years of experience managing, administrating, optimizing and adopting evolving data warehousing solutions. He played an instrumental role in ZS’s Teradata to Redshift transformation journey. His personal interests involve cricket and traveling.

 

 

 

 

Sushant Jadhav is a Senior Cloud Administrator within Cloud Center of Excellence at ZS Associates. He is a results-oriented professional with technology experience predominantly in the storage and backup industry. He has worked on many migration projects where he helped customers migrate from on-premises to AWS. Sushant enjoys working on all the AWS services and tries to bridge the gap between technology and business. He is always keen on learning new technologies and is always evolving in his role. Apart from work, he enjoys playing football.

How to delete user data in an AWS data lake

Post Syndicated from George Komninos original https://aws.amazon.com/blogs/big-data/how-to-delete-user-data-in-an-aws-data-lake/

General Data Protection Regulation (GDPR) is an important aspect of today’s technology world, and processing data in compliance with GDPR is a necessity for those who implement solutions within the AWS public cloud. One article of GDPR is the “right to erasure” or “right to be forgotten” which may require you to implement a solution to delete specific users’ personal data.

In the context of the AWS big data and analytics ecosystem, every architecture, regardless of the problem it targets, uses Amazon Simple Storage Service (Amazon S3) as the core storage service. Despite its versatility and feature completeness, Amazon S3 doesn’t come with an out-of-the-box way to map a user identifier to S3 keys of objects that contain user’s data.

This post walks you through a framework that helps you purge individual user data within your organization’s AWS hosted data lake, and an analytics solution that uses different AWS storage layers, along with sample code targeting Amazon S3.

Reference architecture

To address the challenge of implementing a data purge framework, we reduced the problem to the straightforward use case of deleting a user’s data from a platform that uses AWS for its data pipeline. The following diagram illustrates this use case.

We’re introducing the idea of building and maintaining an index metastore that keeps track of the location of each user’s records and allows us locate to them efficiently, reducing the search space.

You can use the following architecture diagram to delete a specific user’s data within your organization’s AWS data lake.

For this initial version, we created three user flows that map each task to a fitting AWS service:

Flow 1: Real-time metastore update

The S3 ObjectCreated or ObjectDelete events trigger an AWS Lambda function that parses the object and performs an add/update/delete operation to keep the metadata index up to date. You can implement a simple workflow for any other storage layer, such as Amazon Relational Database Service (RDS), Amazon Aurora, or Amazon Elasticsearch Service (ES). We use Amazon DynamoDB and Amazon RDS for PostgreSQL as the index metadata storage options, but our approach is flexible to any other technology.

Flow 2: Purge data

When a user asks for their data to be deleted, we trigger an AWS Step Functions state machine through Amazon CloudWatch to orchestrate the workflow. Its first step triggers a Lambda function that queries the metadata index to identify the storage layers that contain user records and generates a report that’s saved to an S3 report bucket. A Step Functions activity is created and picked up by a Lambda Node JS based worker that sends an email to the approver through Amazon Simple Email Service (SES) with approve and reject links.

The following diagram shows a graphical representation of the Step Function state machine as seen on the AWS Management Console.

The approver selects one of the two links, which then calls an Amazon API Gateway endpoint that invokes Step Functions to resume the workflow. If you choose the approve link, Step Functions triggers a Lambda function that takes the report stored in the bucket as input, deletes the objects or records from the storage layer, and updates the index metastore. When the purging job is complete, Amazon Simple Notification Service (SNS) sends a success or fail email to the user.

The following diagram represents the Step Functions flow on the console if the purge flow completed successfully.

For the complete code base, see step-function-definition.json in the GitHub repo.

Flow 3: Batch metastore update

This flow refers to the use case of an existing data lake for which index metastore needs to be created. You can orchestrate the flow through AWS Step Functions, which takes historical data as input and updates metastore through a batch job. Our current implementation doesn’t include a sample script for this user flow.

Our framework

We now walk you through the two use cases we followed for our implementation:

  • You have multiple user records stored in each Amazon S3 file
  • A user has records stored in homogenous AWS storage layers

Within these two approaches, we demonstrate alternatives that you can use to store your index metastore.

Indexing by S3 URI and row number

For this use case, we use a free tier RDS Postgres instance to store our index. We created a simple table with the following code:

CREATE UNLOGGED TABLE IF NOT EXISTS user_objects (
				userid TEXT,
				s3path TEXT,
				recordline INTEGER
			);

You can index on user_id to optimize query performance. On object upload, for each row, you need to insert into the user_objects table a row that indicates the user ID, the URI of the target Amazon S3 object, and the row that corresponds to the record. For instance, when uploading the following JSON input, enter the following code:

{"user_id":"V34qejxNsCbcgD8C0HVk-Q","body":"…"}
{"user_id":"ofKDkJKXSKZXu5xJNGiiBQ","body":"…"}
{"user_id":"UgMW8bLE0QMJDCkQ1Ax5Mg","body ":"…"}

We insert the tuples into user_objects in the Amazon S3 location s3://gdpr-demo/year=2018/month=2/day=26/input.json. See the following code:

(“V34qejxNsCbcgD8C0HVk-Q”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 0)
(“ofKDkJKXSKZXu5xJNGiiBQ”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 1)
(“UgMW8bLE0QMJDCkQ1Ax5Mg”, “s3://gdpr-demo/year=2018/month=2/day=26/input.json”, 2)

You can implement the index update operation by using a Lambda function triggered on any Amazon S3 ObjectCreated event.

When we get a delete request from a user, we need to query our index to get some information about where we have stored the data to delete. See the following code:

SELECT s3path,
                ARRAY_AGG(recordline)
                FROM user_objects
                WHERE userid = ‘V34qejxNsCbcgD8C0HVk-Q’
                GROUP BY;

The preceding example SQL query returns rows like the following:

(“s3://gdpr-review/year=2015/month=12/day=21/review-part-0.json“, {2102,529})

The output indicates that lines 529 and 2102 of S3 object s3://gdpr-review/year=2015/month=12/day=21/review-part-0.json contain the requested user’s data and need to be purged. We then need to download the object, remove those rows, and overwrite the object. For a Python implementation of the Lambda function that implements this functionality, see deleteUserRecords.py in the GitHub repo.

Having the record line available allows you to perform the deletion efficiently in byte format. For implementation simplicity, we purge the rows by replacing the deleted rows with an empty JSON object. You pay a slight storage overhead, but you don’t need to update subsequent row metadata in your index, which would be costly. To eliminate empty JSON objects, we can implement an offline vacuum and index update process.

Indexing by file name and grouping by index key

For this use case, we created a DynamoDB table to store our index. We chose DynamoDB because of its ease of use and scalability; you can use its on-demand pricing model so you don’t need to guess how many capacity units you might need. When files are uploaded to the data lake, a Lambda function parses the file name (for example, 1001-.csv) to identify the user identifier and populates the DynamoDB metadata table. Userid is the partition key, and each different storage layer has its own attribute. For example, if user 1001 had data in Amazon S3 and Amazon RDS, their records look like the following code:

{"userid:": 1001, "s3":{"s3://path1", "s3://path2"}, "RDS":{"db1.table1.column1"}}

For a sample Python implementation of this functionality, see update-dynamo-metadata.py in the GitHub repo.

On delete request, we query the metastore table, which is DynamoDB, and generate a purge report that contains details on what storage layers contain user records, and storage layer specifics that can speed up locating the records. We store the purge report to Amazon S3. For a sample Lambda function that implements this logic, see generate-purge-report.py in the GitHub repo.

After the purging is approved, we use the report as input to delete the required resources. For a sample Lambda function implementation, see gdpr-purge-data.py in the GitHub repo.

Implementation and technology alternatives

We explored and evaluated multiple implementation options, all of which present tradeoffs, such as implementation simplicity, efficiency, critical data compliance, and feature completeness:

  • Scan every record of the data file to create an index – Whenever a file is uploaded, we iterate through its records and generate tuples (userid, s3Uri, row_number) that are then inserted to our metadata storing layer. On delete request, we fetch the metadata records for requested user IDs, download the corresponding S3 objects, perform the delete in place, and re-upload the updated objects, overwriting the existing object. This is the most flexible approach because it supports a single object to store multiple users’ data, which is a very common practice. The flexibility comes at a cost because it requires downloading and re-uploading the object, which introduces a network bottleneck in delete operations. User activity datasets such as customer product reviews are a good fit for this approach, because it’s unexpected to have multiple records for the same user within each partition (such as a date partition), and it’s preferable to combine multiple users’ activity in a single file. It’s similar to what was described in the section “Indexing by S3 URI and row number” and sample code is available in the GitHub repo.
  • Store metadata as file name prefix – Adding the user ID as the prefix of the uploaded object under the different partitions that are defined based on query pattern enables you to reduce the required search operations on delete request. The metadata handling utility finds the user ID from the file name and maintains the index accordingly. This approach is efficient in locating the resources to purge but assumes a single user per object, and requires you to store user IDs within the filename, which might require InfoSec considerations. Clickstream data, where you would expect to have multiple click events for a single customer on a single date partition during a session, is a good fit. We covered this approach in the section “Indexing by file name and grouping by index key” and you can download the codebase from the GitHub repo.
  • Use a metadata file – Along with uploading a new object, we also upload a metadata file that’s picked up by an indexing utility to create and maintain the index up to date. On delete request, we query the index, which points us to the records to purge. A good fit for this approach is a use case that already involves uploading a metadata file whenever a new object is uploaded, such as uploading multimedia data, along with their metadata. Otherwise, uploading a metadata file on every object upload might introduce too much of an overhead.
  • Use the tagging feature of AWS services – Whenever a new file is uploaded to Amazon S3, we use the Put Object Tagging Amazon S3 operation to add a key-value pair for the user identifier. Whenever there is a user data delete request, it fetches objects with that tag and deletes them. This option is straightforward to implement using the existing Amazon S3 API and can therefore be a very initial version of your implementation. However, it involves significant limitations. It assumes a 1:1 cardinality between Amazon S3 objects and users (each object only contains data for a single user), searching objects based on a tag is limited and inefficient, and storing user identifiers as tags might not be compliant with your organization’s InfoSec policy.
  • Use Apache Hudi – Apache Hudi is becoming a very popular option to perform record-level data deletion on Amazon S3. Its current version is restricted to Amazon EMR, and you can use it if you start to build your data lake from scratch, because you need to store your as Hudi datasets. Hudi is a very active project and additional features and integrations with more AWS services are expected.

The key implementation decision of our approach is separating the storage layer we use for our data and the one we use for our metadata. As a result, our design is versatile and can be plugged in any existing data pipeline. Similar to deciding what storage layer to use for your data, there are many factors to consider when deciding how to store your index:

  • Concurrency of requests – If you don’t expect too many simultaneous inserts, even something as simple as Amazon S3 could be a starting point for your index. However, if you get multiple concurrent writes for multiple users, you need to look into a service that copes better with transactions.
  • Existing team knowledge and infrastructure – In this post, we demonstrated using DynamoDB and RDS Postgres for storing and querying the metadata index. If your team has no experience with either of those but are comfortable with Amazon ES, Amazon DocumentDB (with MongoDB compatibility), or any other storage layer, use those. Furthermore, if you’re already running (and paying for) a MySQL database that’s not used to capacity, you could use that for your index for no additional cost.
  • Size of index – The volume of your metadata is orders of magnitude lower than your actual data. However, if your dataset grows significantly, you might need to consider going for a scalable, distributed storage solution rather than, for instance, a relational database management system.

Conclusion

GDPR has transformed best practices and introduced several extra technical challenges in designing and implementing a data lake. The reference architecture and scripts in this post may help you delete data in a manner that’s compliant with GDPR.

Let us know your feedback in the comments and how you implemented this solution in your organization, so that others can learn from it.

 


About the Authors

George Komninos is a Data Lab Solutions Architect at AWS. He helps customers convert their ideas to a production-ready data product. Before AWS, he spent 3 years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.

 

 

 

 

Sakti Mishra is a Data Lab Solutions Architect at AWS. He helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives. Outside of work, Sakti enjoys learning new technologies, watching movies, and travel.

Stream CDC into an Amazon S3 data lake in Parquet format with AWS DMS

Post Syndicated from Viral Shah original https://aws.amazon.com/blogs/big-data/stream-cdc-into-an-amazon-s3-data-lake-in-parquet-format-with-aws-dms/

Most organizations generate data in real time and ever-increasing volumes. Data is captured from a variety of sources, such as transactional and reporting databases, application logs, customer-facing websites, and external feeds. Companies want to capture, transform, and analyze this time-sensitive data to improve customer experiences, increase efficiency, and drive innovations. With increased data volume and velocity, it’s imperative to capture the data from source systems as soon as they are generated and store them on a secure, scalable, and cost-efficient platform.

AWS Database Migration Service (AWS DMS) performs continuous data replication using change data capture (CDC). Using CDC, you can determine and track data that has changed and provide it as a stream of changes that a downstream application can consume and act on. Most database management systems manage a transaction log that records changes made to the database contents and metadata. AWS DMS reads the transaction log by using engine-specific API operations and functions and captures the changes made to the database in a nonintrusive manner.

Amazon Simple Storage Service (Amazon S3) is the largest and most performant object storage service for structured and unstructured data and the storage service of choice to build a data lake. With Amazon S3, you can cost-effectively build and scale a data lake of any size in a secure environment where data is protected by 99.999999999% of durability.

AWS DMS offers many options to capture data changes from relational databases and store the data in columnar format (Apache Parquet) into Amazon S3:

The second option helps you build a flexible data pipeline to ingest data into an Amazon S3 data lake from several relational and non-relational data sources, compared to just relational data sources support in the former option. Kinesis Data Firehose provides pre-built AWS Lambda blueprints for converting common data sources such as Apache logs and system logs to JSON and CSV formats or writing your own custom functions. It can also convert the format of incoming data from JSON to Parquet or Apache ORC before storing the data in Amazon S3. Data stored in columnar format gives you faster and lower-cost queries with downstream analytics services like Amazon Athena.

In this post, we focus on the technical challenges outlined in the second option and how to address them.

As shown in the following reference architecture, data is ingested from a database into Parquet format in Amazon S3 via AWS DMS integrating with Kinesis Data Streams and Kinesis Data Firehose.

Our solution provides flexibility to ingest data from several sources using Kinesis Data Streams and Kinesis Data Firehose with built-in data format conversion and integrated data transformation capabilities before storing data in a data lake. For more information about data ingestion into Kinesis Data Streams, see Writing Data into Amazon Kinesis Data Streams. You can then query Parquet data in Amazon S3 efficiently with Athena.

Implementing the architecture

AWS DMS can migrate data to and from most widely used commercial and open-source databases. You can migrate and replicate data directly to Amazon S3 in CSV and Parquet formats, and store data in Amazon S3 in Parquet because it offers efficient compression and encoding schemes. Parquet format allows compression schemes on a per-column level, and is future-proofed to allow adding more encodings as they are invented and implemented.

AWS DMS supports Kinesis Data Streams as a target. Kinesis Data Streams is a massively scalable and durable real-time data streaming service that can collect and process large streams of data records in real time. AWS DMS service publishes records to a data stream using JSON. For more information about configuration details, see Use the AWS Database Migration Service to Stream Change Data to Amazon Kinesis Data Streams.

Kinesis Data Firehose can pull data from Kinesis Data Streams. It’s a fully managed service that delivers real-time streaming data to destinations such as Amazon S3, Amazon Redshift, Amazon Elasticsearch Service (Amazon ES), and Splunk. Kinesis Data Firehose can convert the format of input data from JSON to Parquet or ORC before sending it to Amazon S3. It needs reference schema to interpret the AWS DMS streaming data in JSON and convert into Parquet. In this post, we use AWS Glue, a fully managed ETL service, to create a schema in the AWS Glue Data Catalog for Kinesis Data Firehose to reference.

When AWS DMS migrates records, it creates additional fields (metadata) for each migrated record. The metadata provides additional information about the record being migrated, such as source table name, schema name, and type of operation. Most metadata fields add – in their field names (for example, record-type, schema-name, table-name, transaction-id). See the following code:

{
        "data": {
            "MEET_CODE": 5189459,
            "MEET_DATE": "2020-02-21T19:20:04Z",
            "RACE_CODE": 5189459,
            "LAST_MODIFIED_DATE": "2020-02-24T19:20:04Z",
            "RACE_ENTRY_CODE": 11671651,
            "HORSE_CODE": 5042811
        },
        "metadata": {
            "transaction-id": 917505,
            "schema-name": "SH",
            "operation": "insert",
            "table-name": "RACE_ENTRY",
            "record-type": "data",
            "timestamp": "2020-02-26T00:20:07.482592Z",
            "partition-key-type": "schema-table"
        }
    }

Additional metadata added by AWS DMS leads to an error during the data format conversion phase in Kinesis Data Firehose. Kinesis Data Firehose follows Hive style formatting and therefore doesn’t recognize the – character in the metadata field names during data conversion from JSON into Parquet and returns an error message: expected at the position 30 of ‘struct’ but ‘-’ is found. For example, see the following code:

{
	"deliveryStreamARN": "arn:aws:firehose:us-east-1:1234567890:deliverystream/abc-def-KDF",
	"destination": "arn:aws:s3:::abc-streaming-bucket",
	"deliveryStreamVersionId": 13,
	"message": "The schema is invalid. Error parsing the schema:
	 Error: : expected at the position 30 of 'struct<timestamp:string,record-type:string,operation:string,partition-key-type:string,schema-name:string,table-name:string,transaction-id:int>' but '-' is found.",
	"errorCode": "DataFormatConversion.InvalidSchema"
}

You can resolve the issue by making the following changes: specifying JSON key mappings and creating a reference table in AWS Glue before configuring Kinesis Data Firehose.

Specifying JSON key mappings

In your Kinesis Data Firehose configuration, specify JSON key mappings for fields with – in their names. Mapping transforms these specific metadata fields names to _ (for example, record-type changes to record_type).

Use AWS Command Line Interface (AWS CLI) to create Kinesis Data Firehose with the JSON key mappings. Modify the parameters to meet your specific requirements.

Kinesis Data Firehose configuration mapping is only possible through the AWS CLI or API and not through the AWS Management Console.

The following code configures Kinesis Data Firehose with five columns with – in their field names mapped to new field names with _”:

"S3BackupMode": "Disabled",
                    "DataFormatConversionConfiguration": {
                        "SchemaConfiguration": {
                            "RoleARN": "arn:aws:iam::123456789012:role/sample-firehose-delivery-role",
                            "DatabaseName": "sample-db",
                            "TableName": "sample-table",
                            "Region": "us-east-1",
                            "VersionId": "LATEST"
                        },
                        "InputFormatConfiguration": {
                            "Deserializer": {
                                "OpenXJsonSerDe": {
                                "ColumnToJsonKeyMappings":
                                {
                                 "record_type": "record-type","partition_key_type": "partition-key-type","schema_name":"schema-name","table_name":"table-name","transaction_id":"transaction-id"
                                }
                                }

Creating a reference table in AWS Glue

Because Kinesis Data Firehose uses the Data Catalog to reference schema for Parquet format conversion, you must first create a reference table in AWS Glue before configuring Kinesis Data Firehose. Use Athena to create a Data Catalog table. For instructions, see CREATE TABLE. In the table, make sure that the column name uses _ in their names, and manually modify it in advance through the Edit schema option for the referenced table in AWS Glue, if needed.

Use Athena to query the results of data ingested by Kinesis Data Firehose into Amazon S3.

This solution is only applicable in the following use cases:

  • Capturing data changes from your source with AWS DMS
  • Converting data into Parquet with Kinesis Data Firehose

If you want to store data in non-Parquet format (such CSV or JSON) or ingest into Kinesis through other routes, then you don’t need to modify your Kinesis Data Firehose configuration.

Conclusion

This post demonstrated how to convert AWS DMS data into Parquet format and specific configurations to make sure metadata follows the expected format of Kinesis Data Streams and Kinesis Data Firehose. We encourage you to try this solution and take advantage of all the benefits of using AWS DMS with Kinesis Data Streams and Kinesis Data Firehose. For more information, see Getting started with AWS Database Migration Service and Setting up Amazon Kinesis Firehose.

If you have questions or suggestions, please leave a comment.

 


About the Author

Viral Shah is a Data Lab Architect with Amazon Web Services. Viral helps our customers architect and build data and analytics prototypes in just four days in the AWS Data Lab. He has over 20 years of experience working with enterprise customers and startups primarily in the Data and Database space.

 

 

Fast and predictable performance with serverless compilation using Amazon Redshift

Post Syndicated from Kiran Chinta original https://aws.amazon.com/blogs/big-data/fast-and-predictable-performance-with-serverless-compilation-using-amazon-redshift/

Amazon Redshift is a fast, fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing business intelligence (BI) tools. Customers tell us that they want extremely fast query response times so they can make equally fast decisions.

This post presents the recently launched, massively scalable serverless compilation capability for Amazon Redshift, which can now concurrently compile query segments with additional compute resources at no extra cost. We also share how our customers have enjoyed faster performance (in several cases, twice as fast) because of this new capability.

Amazon Redshift query compilation

When a query is sent to Amazon Redshift, the query processing engine parses it into multiple segments and compiles these segments to produce optimized object files that are processed during query execution. When similar or same queries are sent to Amazon Redshift, the corresponding segments are present in the cluster code compilation cache. Query segments that use already compiled code in the cache run faster because there’s no overhead of query compilation.

You can also accelerate your workloads of one-time and first-time queries, which don’t have query segments compiled in the cache. Depending on the query’s complexity, Amazon Redshift usually compiles those queries within seconds. However, some mission-critical workloads require even faster response time. This is where the massively scalable serverless compilation capability in Amazon Redshift makes a big difference.

Amazon Redshift serverless query compilation

Amazon Redshift breaks down a query into a set of segments, and each segment is a set of operations, such as SCAN or BUILD HASH TABLE. With the launch of the massively scalable serverless compilation capability, Amazon Redshift can now compile the query segments faster and in parallel because the compilation isn’t limited by the specific cluster being used and its available CPU and memory resources.

The Amazon Redshift compilation capability is managed with an external resource that your Amazon Redshift cluster uses based on your workload. During query processing, Amazon Redshift generates query segments and sends the segments that aren’t present in the cluster’s local cache to the external compilation farm to be compiled with massive parallelism. At the time of running the query, the segments are quickly fetched from the compilation service and saved in the cluster’s local cache for future processing. This makes sure that one-time and first-time queries are processed with high performance in a transparent way, without any additional cost.

Design and usage

The massively scalable serverless compilation capabilities benefit you whenever you need query compilation, especially with complex and highly concurrent workloads. The following are some specific use cases where this capability helps:

  • Dashboard applications that require fast query performance experience lower query compilation time, leading to improved user experience.
  • Dynamic one-time queries with new query segments that aren’t present in the code cache can be processed faster.
  • Scheduled ETL or reporting jobs with a strict SLA benefit from lower query compilation times.
  • Highly complex and concurrent workloads run with high performance without impacting the overall cluster performance.
  • Clusters that are resized, upgraded, or paused and resumed use the external code cache. No warmup is needed.

The following diagram illustrates the architecture of the Amazon Redshift serverless compilation.

Compilation improvements

Although the serverless compilation has already been improving query performance significantly since its launch, the Amazon Redshift team is working to further improve its effectiveness and performance. More recently, we announced an unlimited cache size to store compiled objects and increase cache hits across the Amazon Redshift fleet from 99.60% to 99.95%.

The following graph shows the percent cache hit that’s improved beyond the local cache over the releases.

Faster performance

During a standard maintenance window, an Amazon Redshift patch flushes the compilation cache. Before we launched the new compilation capabilities, your cluster’s performance was impacted after being patched during maintenance periods. Now, that performance impact is almost unnoticeable with this feature.

Many Amazon Redshift customers are benefiting from these performance improvements and saving time and cost for their Amazon Redshift environments. In this section, we share the stories of two organizations.

Aptos

Aptos is the largest provider of enterprise software focused exclusively on retail. They use Amazon Redshift to power the analytics solution for retail clients. Jonathan Strohl, a cloud engineer on the Aptos team, shared this anecdote with us:

“Prior to last week’s Redshift maintenance, we sent our clients the typical notification letting them know to expect performance delays the following morning due to the object cache being flushed during the maintenance. However, the morning after the maintenance, a couple of our clients emailed back asking whether the maintenance had actually occurred, because there had been no noticeable delay. The performance delays they had previously noticed were now eliminated due to the serverless compilation recently released by Amazon Redshift. This is the best result we could have hoped for—our clients were unable to tell that a cache-flushing maintenance had even occurred!”

Manthan

Manthan delivers BI, analytics, and artificial intelligence solutions to more than 200 leading retailers across 22 countries. Vijay Chidambaram, Head of Cloud Engineering at Manthan, shared the following with us:

“The normal ETL runtimes are around 90–100 minutes. The ETL runtime would go to around 290 minutes post an upgrade without the serverless compilation feature. That value has come down to about 150 minutes, which is a 2X improvement. Across the clusters, there is no increase in the ETL wall clock runtime compared to normal runtimes on day two and beyond.”

Intentwise

Intentwise is an Amazon Advertising optimization platform that empowers brands, sellers, and agencies with insights, automation, and expertise. They use Amazon Redshift to power the analytics for their SaaS offering. Raghavendra, a Software Architect at Intentwise, shared the following with us:

“The new serverless compilation feature improves the query compilation time by 3x. This makes Amazon Redshift an even more powerful data warehouse for our analytical platform because it continues to innovate to offer better performance and lower costs, all with no efforts on our end.”

Summary

This post explained how the massively scalable serverless compilation capability for Amazon Redshift works and gave examples of the benefits you can expect from the performance improvements. The capability is free and automatically enabled on all new and existing Amazon Redshift clusters.

For more information about Amazon Redshift query planning and workflow, see Query planning and execution workflow. For more information about improving query performance, see Factors affecting query performance.


About the Authors

Kiran Chinta is a Senior Software Development Engineer at Amazon Redshift. He has been working on distributed databases for over 13 years and has focused on high availability, disaster recovery, SQL language features and performance features for on-prem and cloud databases. In his spare time, he enjoys reading and playing various sports.

 

 

 

 

Naresh Chainani is a Senior Software Development Manager at Amazon Redshift. He leads Query Processing, Query Performance, Distributed Systems and Workload Management with a strong team. Naresh is passionate about building high-performance databases to enable customers to gain timely insights and make critical business decisions. In his spare time, Naresh enjoys reading and playing tennis.

 

 

 

 

Maor Kleider is a product and database engineering leader for Amazon Redshift. Maor is passionate about collaborating with customers and partners, learning about their unique big data use cases and making their experience even better. In his spare time, Maor enjoys traveling and exploring new restaurants with his family.

 

 

 

 

Quan Li is a Senior Database Engineer at Amazon Redshift. His focus is enabling customers to deliver maximum business value. Quan is passionate about optimizing high performance analytical databases. During his spare time, he enjoys traveling and experiencing different types of cuisines with his family.

How Aruba Networks built a cost analysis solution using AWS Glue, Amazon Redshift, and Amazon QuickSight

Post Syndicated from Siddharth Thacker original https://aws.amazon.com/blogs/big-data/how-aruba-networks-built-a-cost-analysis-solution-using-aws-glue-amazon-redshift-and-amazon-quicksight/

This is a guest post co-written by Siddharth Thacker and Swatishree Sahu from Aruba Networks.

Aruba Networks is a Silicon Valley company based in Santa Clara that was founded in 2002 by Keerti Melkote and Pankaj Manglik. Aruba is the industry leader in wired, wireless, and network security solutions. Hewlett-Packard acquired Aruba in 2015, making it a wireless networking subsidiary with a wide range of next-generation network access solutions.

Aruba Networks provides cloud-based platform called Aruba Central for network management and AI Ops. Aruba cloud platform supports thousands of workloads to support customer facing production environment and also a separate development platform for Aruba engineering.

The motivation to build the solution presented in this post was to understand the unit economics of the AWS resources used by multiple product lines across different organization pillars. Aruba wanted a faster, effective, and reliable way to analyze cost and usage data and visualize that into a dashboard. This solution has helped Aruba in multiple ways, including:

  • Visibility into costs – Multiple Aruba teams can now analyze the cost of their application via data surfaced with this solution
  • Cost optimization – The solution helps teams identify new cost-optimization opportunities by making them aware of the higher-cost resources with low utilization so they can optimize accordingly
  • Cost management – The Cloud DevOps organization, the group who built this solution, can effectively plan at the application level and have a direct positive impact on gross margins
  • Cost savings – With daily cost data available, engineers can see the monetary impact of right-sizing compute and other AWS resources almost immediately
  • Big picture as well as granular – Users can visualize cost data from the top down and track cost at a business level and a specific resource level

Overview of the solution

This post describes how Aruba Networks automated the solution, from generating the AWS Cost & Usage Report (AWS CUR) to its final visualization on Amazon QuickSight. In this solution, they start by configuring the CUR on their primary payer account, which publishes the billing reports to an Amazon Simple Storage Service (Amazon S3) bucket. Then they use an AWS Glue crawler to define and catalog the CUR data. As the new CUR data is delivered daily, the data catalog is updated, and the data is loaded into an Amazon Redshift database using Amazon Redshift Spectrum and SQL. The reporting and visualization layer is built using QuickSight. Finally, the entire pipeline is automated by using AWS Data Pipeline.

The following diagram illustrates this architecture.

Aruba prefers the AWS CUR Report to AWS Cost Explorer because AWS Cost Explorer provides usage information at a high level, and not enough granularity for detailed operations, such as data transfer cost. AWS CUR provides the most detailed information available about your AWS costs and usage at an hourly granularity. This allows the Aruba team to drill down the costs by the hour or day, product or product resource, or custom tags, enabling them to achieve their goals.

Aruba implemented the solution with the following steps:

  1. Set up the CUR delivery to a primary S3 bucket from the billing dashboard.
  2. Use Amazon S3 replication to copy the primary payer S3 bucket to the analytics bucket. Having a separate analytics account helps prevent direct access to the primary account.
  3. Create and schedule the crawler to crawl the CUR data. This is required to make the metadata available in the Data Catalog and update it quickly when new data arrives.
  4. Create respective Amazon Redshift schema and tables.
  5. Orchestrate an ETL flow to load data to Amazon Redshift using Data Pipeline.
  6. Create and publish dashboards using QuickSight for executives and stakeholders.

Insights generated

The Aruba DevOps team built various reports that provide the cost classifications on AWS services, weekly cost by applications, cost by product, infrastructure, resource type, and much more using the detailed CUR data as shown by the following screenshot.

For example, using the following screenshot, Aruba can conveniently figure out that compute cost is the biggest contributor compared to other costs. To reduce the cost, they can consider using various cost-optimization methods like buying reserved instances, savings plans, or Spot Instances wherever applicable.

Similarly, the following screenshot highlights the cost doubled compared to the first week of April. This helps Aruba to identify anomalies quickly and make informed decisions.

Setting up the CUR delivery

For instructions on setting up a CUR, see Creating Cost and Usage Reports.

To reduce complexity in the workflow, Aruba chose to create resources in the same region with hourly granularity, mainly to see metrics more frequently.

To lower the storage costs for data files and maximize the effectiveness of querying data with serverless technologies like Amazon Athena, Amazon Redshift Spectrum, and Amazon S3 data lake, save the CUR in Parquet format. The following screenshot shows the configuration for delivery options.

The following table shows some example CUR data.

bill_payer_account_id line_item_usage_account_id line_item_usage_start_date line_item_usage_end_date line_item_product_code line_item_usage_type line_item_operation
123456789 111222333444 00:00.0 00:00.0 AmazonEC2 USW2-EBS:VolumeP-IOPS.piops CreateVolume-P-IOPS
123456789 111222333444 00:00.0 00:00.0 AmazonEC2 USW2-APN1-AWS-In-Bytes LoadBalancing-PublicIP-In
123456789 111222333444 00:00.0 00:00.0 AmazonEC2 USW2-DataProcessing-Bytes LoadBalancing
123456789 111222333444 00:00.0 00:00.0 AmazonEC2 USW2-EBS:SnapshotUsage CreateSnapshot
123456789 555666777888 00:00.0 00:00.0 AmazonEC2 USW2-EBS:SnapshotUsage CreateSnapshot
123456789 555666777888 00:00.0 00:00.0 AmazonEC2 USW2-EBS:SnapshotUsage CreateSnapshot
123456789 555666777888 00:00.0 00:00.0 AmazonEC2 USW2-DataTransfer-Regional-Bytes InterZone-In
123456789 555666777888 00:00.0 00:00.0 AmazonS3 USW2-Requests-Tier2 ReadLocation
123456789 555666777888 00:00.0 00:00.0 AmazonEC2 USW2-DataTransfer-Regional-Bytes InterZone-In

Replicating the CUR data to your analytics account

For security purposes, other teams aren’t allowed to access the primary (payer) account, and therefore can’t access CUR data generated from that account. Aruba replicated the data to their analytics account and build the cost analysis solution there. Other teams can access the cost data without getting access permission for the primary account. The data is replicated across accounts by adding an Amazon S3 replication rule in the bucket. For more information, see Adding a replication rule when the destination bucket is in a different AWS account.

Cataloging the data with a crawler and scheduling it to run daily

Because AWS delivers all daily reports in a report date range report-prefix/report-name/yyyymmdd-yyyymmdd folder, Aruba uses AWS Glue crawlers to crawl through the data and update the catalog.

AWS Glue is a fully managed ETL service that makes it easy to prepare and load the data for analytics. Once the AWS Glue is pointed to the data stored on AWS, it discovers the data and stores the associated metadata (such as table definition and schema) in the Data Catalog. After the data is cataloged, the data is immediately searchable, queryable, and available for ETL. For more information, see Populating the AWS Glue Data Catalog.

The following screenshot shows the crawler created on Amazon S3 location of the CUR data.

The following code is an example table definition populated by the crawler.:

CREATE EXTERNAL TABLE `cur_parquet`(
  `identity_line_item_id` string, 
  `identity_time_interval` string, 
  `bill_invoice_id` string, 
………
………
  `resource_tags_user_infra_role` string)

PARTITIONED BY ( 
  `year` string, 
  `month` string )

ROW FORMAT SERDE  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://curS3bucket/Parquet/'

Transforming and loading using Amazon Redshift

Next in the analytics service, Aruba chose Amazon Redshift over Athena. Aruba has a use case to integrate cost data together with other tables already present in Amazon Redshift and hence using the same service makes it easy to integrate with their existing data. To further filter and transform data at the same time, and simplify the multi-step ETL, Aruba chose Amazon Redshift Spectrum. It helps to efficiently query and load CUR data from Amazon S3. For more information, see Getting started with Amazon Redshift Spectrum.

Use the following query to create an external schema and map it to the AWS Glue database created earlier in the Data Catalog:

--Choose a schema name of your choice, cur_redshift_external_schema name is just an example--
 create external schema cur_redshift_spectrum_external_schema from data catalog database 
 'aruba_curr_db' iam_role 'arn:aws:iam::xxxxxxxxxxxxx:role/redshiftclusterrole' 
 create external database if not exists;

The table created in the Data Catalog appears under the Amazon Redshift Spectrum schema. The schema, table, and records created can be verified with the following SQL code:

SELECT Count(*) 
FROM   cur_redshift_spectrum_external_schema.<TABLE>; 

--Query the right partition, year=2020 and month=2 is used an example
SELECT Count(*) 
FROM   cur_redshift_spectrum_external_schema.<TABLE> 
WHERE  year=2020 
AND    month=2;

Next, transform and load the data into the Amazon Redshift table. Aruba started by creating an Amazon Redshift table to contain the data. The following SQL code can be used to create the production table with the desired columns:

CREATE TABLE redshift_schema.redshift_table 
  ( 
     usage_start_date TIMESTAMP, 
     usage_end_date   TIMESTAMP, 
     service_region   VARCHAR (256), 
     service_az       VARCHAR (256), 
     aws_resource_id  VARCHAR (256), 
     usage_amount     FLOAT (17), 
     charge_currency  VARCHAR (256), 
     aws_product_name VARCHAR (256), 
     instance_family  VARCHAR (256), 
     instance_type    VARCHAR (256), 
     unblended_cost   FLOAT (17), 
     usage_cost       FLOAT (17)
  ); 

CUR is dynamic in nature, which means that some columns may appear or disappear with each update. When creating the table, we take static columns only. For more information, see Line item details.

Next, insert and update to ingest the data from Amazon S3 to the Amazon Redshift table. Each CUR update is cumulative, which means that each version of the CUR includes all the line items and information from the previous version.

The reports generated throughout the month are estimated and subject to change during the rest of the month. AWS finalizes the report at the end of each month. Finalized reports have the calculations for the blended and unblended costs, and cover all the usage for the month. For this use case, Aruba updates the last 45 days of data to make sure the finalized cost is captured. The below sample query can be used to verify the updated data:

-- Create Table Statement
 INSERT INTO redshift_schema.redshift_table
            (usage_start_date, 
             usage_end_date, 
             service_region, 
             service_az, 
             aws_resource_id, 
             usage_amount, 
             charge_currency, 
             aws_product_name, 
             instance_family, 
             instance_type, 
             unblended_cost,
             Usage_Cost ) 
 SELECT line_item_usage_start_date, 
       line_item_usage_end_date, 
       line_item_operation, 
       line_item_availability_zone, 
       line_item_resource_id, 
       line_item_usage_amount, 
       line_item_currency_code, 
       product_product_name, 
       product_instance_family, 
       product_instance_type, 
       line_item_unblended_cost,
       case when line_item_type='Usage' then line_item_unblended_cost
            else 0
            end as usage_cost 
 FROM   cur_redshift_external_schema.cur_parquet_parquet
 WHERE  line_item_usage_start_date >= date_add('day', -45, getdate()) 
       AND line_item_usage_start_date < date_add('day', 1, getdate()); 

Using Data Pipeline to orchestrate the ETL workflow

To automate this ETL workflow, Aruba chose Data Pipeline. Data Pipeline helps to reliably process and move data between different AWS compute and storage services, as well as on-premises data sources. With Data Pipeline, Aruba can regularly access their data where it’s stored, transform and process it at scale, and efficiently transfer the results to AWS services such as Amazon S3, Amazon Relational Database Service (Amazon RDS), Amazon DynamoDB, and Amazon EMR. Although the detailed steps of setting up this pipeline are out of scope for this blog, there is a sample workflow definition JSON file, which can be imported after making the necessary changes.

Data Pipeline workflow

The following screenshot shows the multi-step ETL workflow using Data Pipeline. Data Pipeline is used to run the INSERT query daily, which inserts and updates the latest CUR data into our Amazon Redshift table from the external table.

In order to copy data to Amazon Redshift,  RedshiftDataNode and RedshiftCopyActivity can be used, and then scheduled to run periodically.

Sharing metrics and creating visuals with QuickSight

To share the cost and usage with other teams, Aruba choose QuickSight using Amazon Redshift as the data source. QuickSight is a native AWS service that seamlessly integrates with other AWS services such as Amazon Redshift, Athena, Amazon S3, and many other data sources.

As a fully managed service, QuickSight lets Aruba to easily create and publish interactive dashboards that include ML Insights. In addition to building powerful visualizations, QuickSight provides data preparation tools that makes it easy to filter and transform the data into the exact needed dataset. As a cloud-native service, dashboards can be accessed from any device and embedded into applications and portals, allowing other teams to monitor their resource usage easily. For more information about creating a dataset, see Creating a Dataset from a Database. Quicksight Visuals can then be created from this dataset.

The following screenshot shows a visual comparison of device cost and count to help find the cost per device. This visual helped Aruba quickly identify the cost per device increase in April and take necessary actions.

Similarly, the following visualization helped Aruba identify an increase in data transfer cost and helped them decide to invest in rearchitecting their application.

The following visualization classifies the cost spend per resource.

Conclusion

In this post, we discussed how Aruba Networks was able to successfully achieve the following:

  • Generate CUR and use AWS Glue to define data, catalog the data, and update the metadata
  • Use Amazon Redshift Spectrum to transform and load the data to Amazon Redshift tables
  • Query, visualize, and share the data stored using QuickSight
  • Automate and orchestrate the entire solution using Data Pipeline

Aruba use this solution to automatically generate a daily cost report and share it with their stakeholders, including executives and cloud operations team.

 


About the Authors

Siddharth Thacker works in Business & Finance Strategy in Cloud Software division at Aruba Networks. Siddharth has Master’s in Finance with experience in industries like banking, investment management, cloud software and focuses on business analytics, margin improvement and strategic partnerships at Aruba. In his spare time, he likes exploring outdoors and participate in team sports.

Swatishree Sahu is a Technical Data Analyst at Aruba Networks. She has lived and worked in India for 7 years as an SME for SOA-based integration tools before coming to US to pursue her master’s in Business Analytics from UT Dallas. Breaking down and analyzing data is her passion. She is a Star Wars geek, and in her free time, she loves gardening, painting, and traveling.

Ritesh Chaman is a Technical Account Manager at Amazon Web Services. With 10 years of experience in the IT industry, Ritesh has a strong background in Data Analytics, Data Management, and Big Data systems. In his spare time, he loves cooking (spicy Indian food), watching sci-fi movies, and playing sports.

 

 

 

Kunal Ghosh is a Solutions Architect at AWS. His passion is to build efficient and effective solutions on the cloud, especially involving Analytics, AI, Data Science, and Machine Learning. Besides family time, he likes reading and watching movies, and is a foodie.

Build a self-service environment for each line of business using Amazon EMR and AWS Service Catalog

Post Syndicated from Tanzir Musabbir original https://aws.amazon.com/blogs/big-data/build-a-self-service-environment-for-each-line-of-business-using-amazon-emr-and-aws-service-catalog/

Enterprises often want to centralize governance and compliance requirements, and provide a common set of policies on how Amazon EMR instances should be set up. You can use AWS Service Catalog to centrally manage commonly deployed Amazon EMR cluster configurations, and this helps you achieve consistent governance and meet your compliance requirements, while at the same time enabling your end users to quickly deploy only the approved EMR cluster configurations on a self-service basis.

In this post, we will demonstrate how enterprise administrators can use AWS Service Catalog to create and manage catalogs, that data engineers and data scientists use to quickly discover and deploy clusters using a self-service environment. With AWS Service Catalog you can control which EMR release versions are available, cluster configuration, and permission access by individual, group, department, or cost center.

The following are a few key AWS Service Catalog concepts:

  • An AWS Service Catalog product is a blueprint for building the AWS resources that you want available for deployment. You create your products by importing AWS CloudFormation templates.
  • A portfolio is a collection of products. With AWS Service Catalog, you can create a customized portfolio for each type of user in your organization and selectively grant access to the appropriate portfolio.
  • A provisioned product is a collection of resources that result from instantiating an AWS CloudFormation

Use cases

You can use AWS Service Catalog to provide Amazon EMR as a self-serve Extract, Transform, Load (ETL) platform at scale while hiding all the security and network configurations from end users.

As an administrator in AWS Service Catalog, you can create one or more Service Catalog products that define different configurations to be used for EMR clusters. In those Service Catalog products, you can define the security and network configurations to be used for the EMR cluster, you can define auto-scaling rules, instance configurations, different purchase options, or you can preconfigure EMR to run different EMR Step jobs. On the other hand, as a user in Service Catalog, you can browse through different EMR templates through Service Catalog products and provision the product based on your requirement. By following this approach, you can make your EMR usage self-serviceable, reduce the EMR learning curve for your users, and ensure adherence to security standards and best practices.

The following image illustrates how the interactions look between Amazon EMR administrators and end-users when using AWS Service Catalog to provision EMR clusters.

The use cases in this post have three AWS Identity and Access Management (IAM) users with different access permissions:

  • emr-admin: This user is the administrator and has access to all the resources. This user creates EMR clusters for their end-users based on their requirements.
  • emr-data-engineer: The data engineer uses Spark and Hive most of the time. They run different ETL scripts on Hive and Spark to process, transform, and enrich their datasets.
  • emr-data-analyst: This user is very familiar with SQL and mostly uses Hue to submit queries to Hive.

You can solve several Amazon EMR operational use cases using AWS Service Catalog. The following sections discuss three different use cases. Later in this post, you walk through each of the use cases with a solution.

Use case 1: Ensuring least privilege and appropriate access

The administrator wants to enforce a few organizational standards. The first one is no default EMR_EC2_ROLE for any EMR cluster. Instead, the administrator wants to have a role that has limited access to Amazon Simple Storage Service (Amazon S3) and assigns that role automatically every time an EMR cluster is launched. Second, end-users sometimes forget to add appropriate tags to their resources. Because of that, often times it is hard for the administrator to identify their resources and allocate cost appropriately. So, the administrator wants to have a mechanism that assigns tags to EMR clusters automatically when they launch.

Use case 2: Providing Amazon EMR as a self-serve ETL platform with Spark and Hive

Data engineers use Spark and Hive applications, and they prefer to have a platform where they just submit their jobs without spending time creating the cluster. They also want to try out different Amazon EMR versions to see how their jobs run on different Spark or Hive versions. They don’t want to spend time learning AWS or Amazon EMR. Additionally, the administrator doesn’t want to give full Amazon EMR access to all users.

Use case 3: Automatically scaling the Hive cluster for analysts

Data analysts have strong SQL backgrounds, so they typically use Hue to submit their Hive queries. They run queries against a large dataset, so they want to have an EMR cluster that can scale when needed. They also don’t have access to the Amazon EMR console and don’t know how to configure automatic scaling for Amazon EMR.

Solution overview

Service Catalog, self-serve your Amazon EMR users, enforce best practices and compliance, and speed up the adoption process.

At a high level, the solution includes the following steps:

  1. Configuring the AWS environment to run this solution.
  2. Creating a CloudFormation template.
  3. Setting up AWS Service Catalog products and portfolios.
  4. Managing access to AWS Service Catalog and provisioning products.
  5. Demonstrating the self-service Amazon EMR platform for users.
  6. Enforcing best practices and compliance through AWS Service Catalog.
  7. Executing ETL workloads on Amazon EMR using AWS Service Catalog.
  8. Optionally, setting up AWS Service Catalog and launching Amazon EMR products through the AWS Command Line Interface (AWS CLI).

The following section looks at the CloudFormation template, which you use to set up the AWS environment to run this solution.

Setting up the AWS environment

To set up this solution, you need to create a few AWS resources. The CloudFormation template provided in this post creates all the required AWS resources. This template requires you to pass the following parameters during the launch:

  • A password for your test users.
  • An Amazon Compute Cloud (Amazon EC2) key pair.
  • The latest AMI ID for the EC2 helper instance. This instance configures the environment and sets up the required files and templates for this solution.

This template is designed only to show how you can use Amazon EMR with AWS Service Catalog. This setup isn’t intended for production use without modification.

To launch the CloudFormation stack, choose Launch Stack:

Launching this stack creates several AWS resources. The following resources shown in the AWS CloudFormation output are the ones you need in the next step:

Key Description
ConsoleLoginURL URL you use to switch between multiple users
EMRSCBlogBucket Name of the S3 bucket to store blog-related files
UserPassword Password to use for all the test users
DataAdminUsername IAM user name for the administrator user
DataEngineerUsername IAM user name for the data engineer user
DataAnalystUsername IAM user name for the data analyst user
HiveScriptURL Amazon S3 path for the Hive script
HiveETLInputParameter Path for the Hive input parameter
HiveETLOutputParameter Path for the Hive output parameter
SparkScriptURL Amazon S3 path for the Spark script
SparkETLInputParameter Path for the Spark input parameter
SparkETLOutputParameter Path for the Spark output parameter

When the CloudFormation template is complete, record the outputs listed on the Outputs tab on the AWS CloudFormation console. See the following screenshot.

(Optional) Configuring the AWS CLI

The AWS CLI is a unified tool to manage your AWS services. In the optional step, you use the AWS CLI to create AWS Service Catalog products and portfolios. Installation of AWS CLI isn’t required for this solution. For instructions on configuring the AWS CLI in your environment, see Configuring the AWS CLI.

Provisioning EMR clusters through AWS Service Catalog

You can create AWS Service Catalog products from the existing CloudFormation template and use those products to provision a variety of EMR clusters. You can create an EMR cluster and consume the cluster’s services without having access to the cluster, which improves the Amazon EMR adoption process.

The following CloudFormation template creates an EMR cluster. This template takes two parameters:

  • Cluster size – You select how many core nodes you want to have in the EMR cluster
  • Compute type – Based on the compute type you choose; the template selects the respective EC2 instance type

As an account administrator, you can define the internal configuration for the EMR cluster. End users are not required to know all the security groups, subnet ID, key pair, and other information. They also don’t need to access the EMR cluster or spend time setting up your cluster. As an administrator, you define a template for the cluster; enforce all the compliance, versions, applications, automatic scaling rules through the CloudFormation template, and expose this template as a product through AWS Service Catalog.

The following section walks you through the solution for each use case.

Use cases walkthrough

The CloudFormation template already configured AWS Service Catalog portfolios and products. You can review these on the AWS Service Catalog console.

  1. Use the ConsoleLoginURL from the AWS CloudFormation console Outputs tab and sign in as an emr-admin user.
  2. On the AWS Service Catalog console, you can see two portfolios for engineers and analysts. In each of those portfolios, you can see two products.

The Data Analysts Stack contains products for the analyst and is assigned to the user emr-data-analyst. The Data Engineering Stack contains products for engineers and is assigned to the emr-data-engineer user. Upon logging in, they can see their respective products and portfolios.

Use case 1: Ensuring least privilege and appropriate access

The cluster administrator creates the least privilege IAM role for their users and associated that role through the Service Catalog product. Similarly, the administrator also assigns appropriate tags for each product. When data engineers or analysts launch an EMR cluster using any of their assigned products, the cluster has the least privilege access and resources are tagged automatically. To confirm this access is in place, complete the following steps:

  1. Sign in to the AWS Management Console as either emr-data-engineer user or emr-data-analyst user.

Your console looks slightly different because the end-user does not manage the products, they just use the product to launch the clusters or execute jobs on the cluster.

  1. Choose Default EMR and provision this product by choosing Launch Product.
  2. For the name of the provisioned product, enter SampleEMR.

The next screen shows a list of allowed parameters your administrator thinks you may need.

  1. Leave all parameters as default.
  2. For the cluster name, enter Sample EMR.
  3. Review all the information and launch the product.

It takes few minutes to spin up the cluster. When the cluster is ready, the status changes to Succeeded. The provision product page also shows you a list of outputs your product owner wants you to see. For example, using output values, your product owner can share Master DNS Address, Resource Manager URL, and Hue URL as shown in the following figure.

To verify if this launched EMR cluster has the expected IAM role and tags, sign in as emr-admin user and go to the AWS EMR Console to review the service role for EC2 instances and tags.

Use case 2: Providing Amazon EMR as a self-serve ETL platform with Spark and Hive

For this use case, data engineers have two different ETL scripts:

  • A Spark script that reads Amazon reviews stored in Amazon S3 and converts them into Parquet before writing back to Amazon S3
  • A Hive script that reads Amazon reviews data from Amazon S3 and finds out the top toys based on customer ratings.

The administrator creates a product to self-serve these users; the product defines the job type and the job parameters. End users selects the job type and passes script, input and output locations.

  1. Sign in as emr-data-engineer.
  2. Select the EMR ETL Engine product.
  3. Choose Launch.

The next page shows if the product has multiple versions. Because the engineer wants to try out two different Amazon EMR versions, the administrator provided both options through the product version. You can launch the EMR cluster with the required version by selecting your preferred product version.

  1. Enter the name of the product.
  2. For this post, select EMR 5.29.0.
  3. Choose Next.

  1. For JobType, choose Spark.
  2. For JobArtifacts, enter the following value (you can get these values from the AWS CloudFormation output):
s3://blog-emr-sc-<account-id>/scripts/spark_converter.py s3://amazon-reviews-pds/tsv/amazon_reviews_us_Toys_v1_00.tsv.gz s3://blog-emr-sc-<account-id>/spark/
  1. Choose Next.

Based on your configuration, an EMR cluster launches. When the cluster is ready, the Spark job runs.

  1. In a different browser, sign in as emr-admin using the ConsoleLoginURL (from the AWS CloudFormation output).

You can see the cluster status, job status, and output path from the Amazon EMR console.

Now, go to Amazon S3 console to check the output path:

The Parquet files are written inside the Spark folder.

  1. To test the Hive job, go back to the first browser where you already signed in as emr-data-engineer.
  2. Choose Provisioned products list.
  3. Choose the product options menu (right-click) and choose Update provisioned product.

  1. On the next page, you can select a different version or the same version.
  2. In the Parameters section, choose Hive.
  3. In the JobArtifacts field, enter the following Hive parameters:
s3://blog-emr-sc-<account-id>/scripts/hive_converter.sql -d INPUT=s3://amazon-reviews-pds/tsv/ -d OUTPUT=s3://blog-emr-sc-<account-id>/hive/
  1. Choose Update.

If you select the same version, AWS Service Catalog compares the old provisioned product with the updated product and only runs the portion that you changed. For this post, I chose the same Amazon EMR version and only updated the job type and parameters. You can see that the same EMR cluster is still there, but on the Steps tab, a new step is executed for Hive.

  1. On the Amazon S3 console using the second browser, verify that a new folder hive is created with data that represents top toys based on Amazon reviews.

To recap, you saw how to use AWS Service Catalog to provide a product to run your ETL jobs. Your data engineers can focus on their ETL scripts and your platform can self-serve them to run their ETL jobs on the EMR cluster.

Use case 3: Automatically scaling the Hive cluster for data analysts

To automatically scale the Hive cluster for data analysts, complete the following steps:

  1. Using the console login URL from the AWS CloudFormation output, and sign in as emr-data-analyst and go to AWS Service Catalog console.

You can see a different set of products for this user.

For this use case, your data analysts want to have an automatically scaling EMR cluster with Hive application. The administrator set up the Auto-scaling EMR product with preconfigured rules.

  1. Choose Auto-scaling EMR.
  2. Enter a provisioned product name.
  3. Select Hive Auto-scaling.
  4. Choose Next.
  5. In the Parameters section, leave the options at their default and enter a cluster name.
  6. Launch the product.

The product owner also provided a client URL (for example, Hue URL) through the product output so business analysts can connect to it.

  1. Sign in as emr-admin and validate if this new cluster is configured with the expected automatic scaling rules.
  2. On the Amazon EMR console, choose the cluster.

You can see the configuration on the Hardware tab.

In this use case, you learned how to use AWS Service Catalog to provide business analyst users a preconfigured, automatically scaled EMR cluster.

(Optional) Setting up AWS Service Catalog for Amazon EMR using AWS CLI

In the previous section, I demonstrated the solution using the AWS Service Catalog console. In the following section, I will show you how you use AWS Service Catalog using the AWS CLI. You can create AWS Service Catalog products and portfolios, assign IAM principals, and launch products.

  1. Create a portfolio named CLI – Stack for the user emr-admin. See the following command:
aws --region us-east-1 servicecatalog create-portfolio --display-name "CLI - Stack" --provider-name "@emr-admin" --description "Sample stack for pre-defined EMR clusters"

You receive a JSON output.

  1. Record the portfolio id port-xxxxxxxx from the output to use later.

The emr-admin user is the provider for this portfolio. The user is created with power user access, so the user can see the full-service catalog console and can manage products and portfolios.

You can associate this portfolio with multiple users. By assigning them to a portfolio, they can use the portfolio, browse through its products, and provision new products. For this use case, you associate a portfolio to emr-admin and the AWS CLI user name (the name of the user that you used to configure your AWS CLI). Make sure to update the portfolio and AWS account ID.

  1. Enter the following code:
aws --region us-east-1 servicecatalog associate-principal-with-portfolio --portfolio-id port-xxxxxxxxxx --principal-type IAM --principal-arn arn:aws:iam::xxxxx:user/emr-admin

aws --region us-east-1 servicecatalog associate-principal-with-portfolio --portfolio-id port-xxxxxxxxxx --principal-type IAM --principal-arn arn:aws:iam::xxxxx:user/<aws-cli-user-name>
  1. To verify the portfolio to the user’s association, enter the following command with the portfolio ID:
aws --region us-east-1 servicecatalog list-principals-for-portfolio --portfolio-id port-xxxxxxxxx

It will list out the associated principals for the above portfolio as shown in this following figure:

The CloudFormation template already copied the Amazon EMR template into your Amazon S3 account at the path s3://blog-emr-sc-<account-id>/products.

  1. To create the product CLI - Sample EMR using that template from Amazon S3, enter the following command:
aws --region us-east-1 servicecatalog create-product --name "CLI - Sample EMR" --owner "@emr-admin" --description "Sample EMR cluster with default" --product-type CLOUD_FORMATION_TEMPLATE --provisioning-artifact-parameters '{"Name": "Initial revision", "Description": "", "Info":{"LoadTemplateFromURL":"https://s3.amazonaws.com/blog-emr-sc-<account-id>/products/sample-cluster.template"},"Type":"CLOUD_FORMATION_TEMPLATE"}'

  1. Record the product ID and provision ID from the JSON output.

You now have a product and a portfolio. A portfolio can have one to many products, and each product can have multiple versions.

  1. To assign the CLI -Sample EMR product to the portfolio you created in Step 1, enter the following command:
aws --region us-east-1 servicecatalog associate-product-with-portfolio --product-id prod-xxxxxx --portfolio-id port-xxxxxx

A launch constraint specifies the IAM role that AWS Service Catalog assumes when an end-user launches a product. With a launch constraint, you can control end-user access to your AWS resources and limit usage.

The CloudFormation template already created the role Blog-SCLaunchRole; create a launch constraint using that IAM role. Use the portfolio and product IDs that you collected from the previous step and your AWS account ID.

  1. To create the launch constraint, enter the following command:
aws --region us-east-1 servicecatalog create-constraint --type LAUNCH --portfolio-id port-xxxxxx --product-id prod-xxxxxx --parameters '{"RoleArn" : "arn:aws:iam::<account-id>:role/Blog-SCLaunchRole"}'

  1. Record the launch constraint ID to use later.

You now have an AWS Service Catalog product that you can use to provision an EMR cluster. The CloudFormation template that you used to create the CLI - Sample EMR product takes three parameters (ClusterName, ComputeRequirements, ClusterSize).

  1. To pass those three parameters as a key value pair, enter the following command (use the product ID and provision ID that you recorded earlier):
aws --region us-east-1 servicecatalog provision-product --product-id prod-xxxxxx --provisioning-artifact-id pa-xxxxx --provisioned-product-name cli-emr --provisioning-parameters Key=ClusterName,Value=cli-emr-cluster Key=ComputeRequirements,Value=CPU Key=ClusterSize,Value=2

  1. Check the provisioned product’s status by using the provisioned product ID:
aws --region us-east-1 servicecatalog describe-provisioned-product --id pp-xxxxx

To recap, in this section you learned how to use AWS Service Catalog CLI to configure AWS Service Catalog products and portfolios, and how to provision an EMR cluster through AWS Service Catalog product.

Cleaning up

To clean up the resources you created, complete the following steps:

  1. Terminate the product that you provisioned in the previous step:
aws --region us-east-1 servicecatalog terminate-provisioned-product --provisioned-product-id pp-xxxxx
  1. Disassociate the product CLI – Sample EMR from the portfolio CLI – Stack:
aws --region us-east-1 servicecatalog disassociate-product-from-portfolio --product-id prod-xxxxx --portfolio-id port-xxxxx
  1. Disassociate IAM principals from the portfolio CLI – Stack:
aws --region us-east-1 servicecatalog disassociate-principal-from-portfolio --portfolio-id port-xxxxx --principal-arn arn:aws:iam::xxxxxx:user/emr-admin

aws --region us-east-1 servicecatalog disassociate-principal-from-portfolio --portfolio-id port-xxxxx --principal-arn arn:aws:iam::xxxxxx:user/<aws-cli-user-name> 
  1. Delete the launch constraint created in the previous step:
aws --region us-east-1 servicecatalog delete-constraint --id cons-xxxxx
  1. Delete the product CLI – Sample EMR:
aws --region us-east-1 servicecatalog delete-product --id prod-xxxxx
  1. Delete the portfolio CLI – Stack:
aws --region us-east-1 servicecatalog delete-portfolio --id port-xxxxx

Cleaning up additional resources

You must also clean up the resources you created with the CloudFormation template.

  1. On the AWS Service Catalog console, choose Provisioned products list.
  2. Terminate each product that you provisioned for these use cases.
  3. Check each of the users and their provisioned products to make sure they’re terminated.
  4. On the Amazon S3 console, empty the bucket blog-emr-sc-<account-id>.
  5. If you are using the AWS CLI, delete the objects in the blog-emr-sc-<account-id> bucket with the following command (make sure you’re running this command on the correct bucket):
aws S3 s3://blog-emr-sc-<account-id> --recursive
  1. If you ran the optional AWS CLI section, make sure you follow the cleanup process mentioned in that section.
  2. On the AWS CloudFormation console or AWS CLI, delete the stack named Blog-EMR-Service-Catalog.

Next steps

To enhance this solution, you can explore the following options:

  • In this post, I enforced resource tagging through AWS CloudFormation. You can also use the AWS Service Catalog TagOptions library to provide a consistent taxonomy and tagging of AWS Service Catalog resources. During a product launch (provisioning), AWS Service Catalog aggregates the associated portfolio and product TagOptions and applies them to the provisioned product.
  • This solution demonstrates the usage of launch constraints and how you can provide limited access to your AWS resources to your users. You can also use template constraints to manage parameters. Template constraints make sure that end-users only have options that you allow them when launching products. This can help you maintain your organization’s compliance requirements.
  • You can integrate AWS Budgets with AWS Service Catalog. By associating AWS Budgets with your products and portfolios, you can track your usage and service costs. You can set a custom budget for each of the portfolios and trigger alerts when your costs exceed your threshold.

Summary

In this post, I showed you how you can simplify your Amazon EMR provisional process using the AWS Service Catalog, how to make Amazon EMR a self-service platform for your end-users, and how you can enforce best practices and compliance to your EMR clusters. You also walked through three different use cases and implemented solutions with AWS Service Catalog. Give this solution a try and share your experience with us!

 


About the Author

Tanzir Musabbir is a Data & Analytics Architect with AWS. At AWS, he works with our customers to provide them architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena & AWS Glue. Tanzir is a big Real Madrid fan and he loves to travel in his free time.

Top 10 performance tuning techniques for Amazon Redshift

Post Syndicated from Matt Scaer original https://aws.amazon.com/blogs/big-data/top-10-performance-tuning-techniques-for-amazon-redshift/

Customers use Amazon Redshift for everything from accelerating existing database environments, to ingesting weblogs for big data analytics. Amazon Redshift is a fully managed, petabyte-scale, massively parallel data warehouse that offers simple operations and high performance. Amazon Redshift provides an open standard JDBC/ODBC driver interface, which allows you to connect your existing business intelligence (BI) tools and reuse existing analytics queries.

Amazon Redshift can run any type of data model, from a production transaction system third-normal-form model to star and snowflake schemas, data vault, or simple flat tables.

This post takes you through the most common performance-related opportunities when adopting Amazon Redshift and gives you concrete guidance on how to optimize each one.

What’s new

This post refreshes the Top 10 post from early 2019. We’re pleased to share the advances we’ve made since then, and want to highlight a few key points.

Query throughput is more important than query concurrency.

Configuring concurrency, like memory management, can be relegated to Amazon Redshift’s internal ML models through Automatic WLM with Query Priorities. On production clusters across the fleet, we see the automated process assigning a much higher number of active statements for certain workloads, while a lower number for other types of use-cases. This is done to maximize throughput, a measure of how much work the Amazon Redshift cluster can do over a period of time. Examples are 300 queries a minute, or 1,500 SQL statements an hour. It’s recommended to focus on increasing throughput over concurrency, because throughput is the metric with much more direct impact on the cluster’s users.

In addition to the optimized Automatic WLM settings to maximize throughput, the concurrency scaling functionality in Amazon Redshift extends the throughput capability of the cluster to up to 10 times greater than what’s delivered with the original cluster. The tenfold increase is a current soft limit, you can reach out to your account team to increase it.

Investing in the Amazon Redshift driver.

AWS now recommends the Amazon Redshift JDBC or ODBC driver for improved performance. Each driver has optional configurations to further tune it for higher or lower number of statements, with either fewer or greater row counts in the result set.

Ease of use by automating all the common DBA tasks.

In 2018, the SET DW “backronym” summarized the key considerations to drive performance (sort key, encoding, table maintenance, distribution, and workload management). Since then, Amazon Redshift has added automation to inform 100% of SET DW, absorbed table maintenance into the service’s (and no longer the user’s) responsibility, and enhanced out-of-the-box performance with smarter default settings. Amazon Redshift Advisor continuously monitors the cluster for additional optimization opportunities, even if the mission of a table changes over time. AWS publishes the benchmark used to quantify Amazon Redshift performance, so anyone can reproduce the results.

Scaling compute separately from storage with RA3 nodes and Amazon Redshift Spectrum.

Although the convenient cluster building blocks of the Dense Compute and Dense Storage nodes continue to be available, you now have a variety of tools to further scale compute and storage separately. Amazon Redshift Managed Storage (the RA3 node family) allows for focusing on using the right amount of compute, without worrying about sizing for storage. Concurrency scaling lets you specify entire additional clusters of compute to be applied dynamically as-needed. Amazon Redshift Spectrum uses the functionally-infinite capacity of Amazon Simple Storage Service (Amazon S3) to support an on-demand compute layer up to 10 times the power of the main cluster, and is now bolstered with materialized view support.

Pause and resume feature to optimize cost of environments

All Amazon Redshift clusters can use the pause and resume feature. For clusters created using On Demand, the per-second grain billing is stopped when the cluster is paused. Reserved Instance clusters can use the pause and resume feature to define access times or freeze a dataset at a point in time.

Tip #1: Precomputing results with Amazon Redshift materializes views

Materialized views can significantly boost query performance for repeated and predictable analytical workloads such as dash-boarding, queries from BI tools, and extract, load, transform (ELT) data processing. Data engineers can easily create and maintain efficient data-processing pipelines with materialized views while seamlessly extending the performance benefits to data analysts and BI tools.

Materialized views are especially useful for queries that are predictable and repeated over and over. Instead of performing resource-intensive queries on large tables, applications can query the pre-computed data stored in the materialized view.

When the data in the base tables changes, you refresh the materialized view by issuing the Amazon Redshift SQL statement “refresh materialized view“. After issuing a refresh statement, your materialized view contains the same data as a regular view. Refreshes can be incremental or full refreshes (recompute). When possible, Amazon Redshift incrementally refreshes data that changed in the base tables since the materialized view was last refreshed.

To demonstrate how it works, we can create an example schema to store sales information, each sale transaction and details about the store where the sales took place.

To view the total amount of sales per city, we create a materialized view with the create materialized view SQL statement (city_sales) joining records from two tables and aggregating sales amount (sum(sales.amount)) per city (group by city):

CREATE MATERIALIZED VIEW city_sales AS 
  (
  SELECT st.city, SUM(sa.amount) as total_sales
  FROM sales sa, store st
  WHERE sa.store_id = st.id
  GROUP BY st.city
  );

Now we can query the materialized view just like a regular view or table and issue statements like “SELECT city, total_sales FROM city_sales” to get the following results. The join between the two tables and the aggregate (sum and group by) are already computed, resulting in significantly less data to scan.

When the data in the underlying base tables changes, the materialized view doesn’t automatically reflect those changes. You can refresh the data stored in the materialized view on demand with the latest changes from the base tables using the SQL refresh materialized view command. For example, see the following code:

!-- let's add a row in the sales base table

INSERT INTO sales (id, item, store_id, customer_id, amount) 
VALUES(8, 'Gaming PC Super ProXXL', 1, 1, 3000);

SELECT city, total_sales FROM city_sales WHERE city = 'Paris'

|city |total_sales|
|-----|-----------|
|Paris|        690|

!-- the new sale is not taken into account !!
-- let's refresh the materialized view
REFRESH MATERIALIZED VIEW city_sales;

SELECT city, total_sales FROM city_sales WHERE city = 'Paris'

|city |total_sales|
|-----|-----------|
|Paris|       3690|

!-- now the view has the latest sales data

The full code for this use case is available as a very simple demo is available as a gist in GitHub.

You can also extend the benefits of materialized views to external data in your Amazon S3 data lake and federated data sources. With materialized views, you can easily store and manage the pre-computed results of a SELECT statement referencing both external tables and Amazon Redshift tables. Subsequent queries referencing the materialized views run much faster because they use the pre-computed results stored in Amazon Redshift, instead of accessing the external tables. This also helps you reduce the associated costs of repeatedly accessing the external data sources, because you can only access them when you explicitly refresh the materialized views.

Tip #2: Handling bursts of workload with concurrency scaling and elastic resize

The legacy, on-premises model requires you to estimate what the system will need 3-4 years in the future to make sure you’re leasing enough horsepower at the time of purchase. But the ability to resize a cluster allows for right-sizing your resources as you go. Amazon Redshift extends this ability with elastic resize and concurrency scaling.

Elastic resize lets you quickly increase or decrease the number of compute nodes, doubling or halving the original cluster’s node count, or even change the node type. You can expand the cluster to provide additional processing power to accommodate an expected increase in workload, such as Black Friday for internet shopping, or a championship game for a team’s web business. Choose classic resize when you’re resizing to a configuration that isn’t available through elastic resize. Classic resize is slower but allows you to change the node type or expand beyond the doubling or halving size limitations of an elastic resize. 

Elastic resize completes in minutes and doesn’t require a cluster restart. For anticipated workload spikes that occur on a predictable schedule, you can automate the resize operation using the elastic resize scheduler feature on the Amazon Redshift console, the AWS Command Line Interface (AWS CLI), or API.

Concurrency scaling allows your Amazon Redshift cluster to add capacity dynamically in response to the workload arriving at the cluster.

By default, concurrency scaling is disabled, and you can enable it for any workload management (WLM) queue to scale to a virtually unlimited number of concurrent queries, with consistently fast query performance. You can control the maximum number of concurrency scaling clusters allowed by setting the “max_concurrency_scaling_clusters” parameter value from 1 (default) to 10 (contact support to raise this soft limit). The free billing credits provided for concurrency scaling is often enough and the majority of customers using this feature don’t end up paying extra for it. For more information about the concurrency scaling billing model see Concurrency Scaling pricing.

You can monitor and control the concurrency scaling usage and cost by creating daily, weekly, or monthly usage limits and instruct Amazon Redshift to automatically take action (such as logging, alerting or disabling further usage) if those limits are reached. For more information, see Managing usage limits in Amazon Redshift.

Together, these options open up new ways to right-size the platform to meet demand. Before these options, you needed to size your WLM queue, or even an entire Amazon Redshift cluster, beforehand in anticipation of upcoming peaks.

Tip #3: Using the Amazon Redshift Advisor to minimize administrative work

Amazon Redshift Advisor offers recommendations specific to your Amazon Redshift cluster to help you improve its performance and decrease operating costs.

Advisor bases its recommendations on observations regarding performance statistics or operations data. Advisor develops observations by running tests on your clusters to determine if a test value is within a specified range. If the test result is outside of that range, Advisor generates an observation for your cluster. At the same time, Advisor creates a recommendation about how to bring the observed value back into the best-practice range. Advisor only displays recommendations that can have a significant impact on performance and operations. When Advisor determines that a recommendation has been addressed, it removes it from your recommendation list. In this section, we share some examples of Advisor recommendations:

Distribution key recommendation

Advisor analyzes your cluster’s workload to identify the most appropriate distribution key for the tables that can significantly benefit from a KEY distribution style. Advisor provides ALTER TABLE statements that alter the DISTSTYLE and DISTKEY of a table based on its analysis. To realize a significant performance benefit, make sure to implement all SQL statements within a recommendation group.

The following screenshot shows recommendations regarding distribution keys.

If you don’t see a recommendation, that doesn’t necessarily mean that the current distribution styles are the most appropriate. Advisor doesn’t provide recommendations when there isn’t enough data or the expected benefit of redistribution is small.

Sort key recommendation

Sorting a table on an appropriate sort key can accelerate query performance, especially queries with range-restricted predicates, by requiring fewer table blocks to be read from disk.

Advisor analyzes your cluster’s workload over several days to identify a beneficial sort key for your tables. See the following screenshot.

If you don’t see a recommendation for a table, that doesn’t necessarily mean that the current configuration is the best. Advisor doesn’t provide recommendations when there isn’t enough data or the expected benefit of sorting is small.

Table compression recommendation

Amazon Redshift is optimized to reduce your storage footprint and improve query performance by using compression encodings. When you don’t use compression, data consumes additional space and requires additional disk I/O. Applying compression to large uncompressed columns can have a big impact on your cluster.

The compression analysis in Advisor tracks uncompressed storage allocated to permanent user tables. It reviews storage metadata associated with large uncompressed columns that aren’t sort key columns.

The following screenshot shows an example of table compression recommendation.

Table statistics recommendation

Maintaining current statistics helps complex queries run in the shortest possible time. The Advisor analysis tracks tables whose statistics are out-of-date or missing. It reviews table access metadata associated with complex queries. If tables that are frequently accessed with complex patterns are missing statistics, Amazon Redshift Advisor creates a critical recommendation to run ANALYZE. If tables that are frequently accessed with complex patterns have out-of-date statistics, Advisor creates a suggested recommendation to run ANALYZE.

The following screenshot shows a table statistics recommendation.

Tip #4: Using Auto WLM with priorities to increase throughput

Auto WLM simplifies workload management and maximizes query throughput by using ML to dynamically manage memory and concurrency, which ensures optimal utilization of the cluster resources

Amazon Redshift runs queries using the queuing system (WLM). You can define up to eight queues to separate workloads from each other.

Amazon Redshift Advisor automatically analyzes the current WLM usage and can make recommendations to get more throughput from your cluster. Periodically reviewing the suggestions from Advisor helps you get the best performance.

Query priorities is a feature of Auto WLM that lets you assign priority ranks to different user groups or query groups, to ensure that higher priority workloads get more resources for consistent query performance, even during busy times. It is a good practice to set up query monitoring rules (QMR) to monitor and manage resource intensive or runaway queries. QMR also enables you to dynamically change a query’s priority based on its runtime performance and metrics-based rules you define.

For more information on migrating from manual to automatic WLM with query priorities, see Modifying the WLM configuration.

It’s recommended to take advantage of Amazon Redshift’s short query acceleration (SQA). SQA uses ML to run short-running jobs in their own queue. This keeps small jobs processing, rather than waiting behind longer-running SQL statements. SQA is enabled by default in the default parameter group and for all new parameter groups. You can enable and disable SQA via a check box on the Amazon Redshift console, or by using the Amazon Redshift CLI.

If you enable concurrency scaling, Amazon Redshift can automatically and quickly provision additional clusters should your workload begin to back up. This is an important consideration when deciding the cluster’s WLM configuration.

A common pattern is to optimize the WLM configuration to run most SQL statements without the assistance of supplemental memory, reserving additional processing power for short jobs. Some queueing is acceptable because additional clusters spin up if your needs suddenly expand. To enable concurrency scaling on a WLM queue, set the concurrency scaling mode value to AUTO. You can best inform your decisions by reviewing the concurrency scaling billing model. You can also monitor and control the concurrency scaling usage and cost by using the Amazon Redshift usage limit feature.

In some cases, unless you enable concurrency scaling for the queue, the user or query’s assigned queue may be busy, and you must wait for a queue slot to open. During this time, the system isn’t running the query at all. If this becomes a frequent problem, you may have to increase concurrency.

First, determine if any queries are queuing, using the queuing_queries.sql admin script. Review the maximum concurrency that your cluster needed in the past with wlm_apex.sql, or get an hour-by-hour historical analysis with wlm_apex_hourly.sql. Keep in mind that increasing concurrency allows more queries to run, but each query gets a smaller share of the memory. You may find that by increasing concurrency, some queries must use temporary disk storage to complete, which is also sub-optimal.

Tip #5: Taking advantage of Amazon Redshift data lake integration

Amazon Redshift is tightly integrated with other AWS-native services such as Amazon S3 which let’s the Amazon Redshift cluster interact with the data lake in several useful ways.

Amazon Redshift Spectrum lets you query data directly from files on Amazon S3 through an independent, elastically sized compute layer. Use these patterns independently or apply them together to offload work to the Amazon Redshift Spectrum compute layer, quickly create a transformed or aggregated dataset, or eliminate entire steps in a traditional ETL process.

  • Use the Amazon Redshift Spectrum compute layer to offload workloads from the main cluster, and apply more processing power to the specific SQL statement. Amazon Redshift Spectrum automatically assigns compute power up to approximately 10 times the processing power of the main cluster. This may be an effective way to quickly process large transform or aggregate jobs.
  • Skip the load in an ELT process and run the transform directly against data on Amazon S3. You can run transform logic against partitioned, columnar data on Amazon S3 with an INSERT … SELECT statement. It’s easier than going through the extra work of loading a staging dataset, joining it to other tables, and running a transform against it.
  • Use Amazon Redshift Spectrum to run queries as the data lands in Amazon S3, rather than adding a step to load the data onto the main cluster. This allows for real-time analytics.
  • Land the output of a staging or transformation cluster on Amazon S3 in a partitioned, columnar format. The main or reporting cluster can either query from that Amazon S3 dataset directly or load it via an INSERT … SELECT statement.

Within Amazon Redshift itself, you can export the data into the data lake with the UNLOAD command, or by writing to external tables. Both options export SQL statement output to Amazon S3 in a massively parallel fashion. You can do the following:

  • Using familiar CREATE EXTERNAL TABLE AS SELECT and INSERT INTO SQL commands, create and populate external tables on Amazon S3 for subsequent use by Amazon Redshift or other services participating in the data lake without the need to manually maintain partitions. Materialized views can also cover external tables, further enhancing the accessibility and utility of the data lake.
  • Using the UNLOAD command, Amazon Redshift can export SQL statement output to Amazon S3 in a massively parallel fashion. This technique greatly improves the export performance and lessens the impact of running the data through the leader node. You can compress the exported data on its way off the Amazon Redshift cluster. As the size of the output grows, so does the benefit of using this feature. For writing columnar data to the data lake, UNLOAD can write partition-aware Parquet data.

Tip #6: Improving the efficiency of temporary tables

Amazon Redshift provides temporary tables, which act like normal tables but have a lifetime of a single SQL session. The proper use of temporary tables can significantly improve performance of some ETL operations. Unlike regular permanent tables, data changes made to temporary tables don’t trigger automatic incremental backups to Amazon S3, and they don’t require synchronous block mirroring to store a redundant copy of data on a different compute node. Due to these reasons, data ingestion on temporary tables involves reduced overhead and performs much faster. For transient storage needs like staging tables, temporary tables are ideal.

You can create temporary tables using the CREATE TEMPORARY TABLE syntax, or by issuing a SELECT … INTO #TEMP_TABLE query. The CREATE TABLE statement gives you complete control over the definition of the temporary table. The SELECT … INTO and C(T)TAS commands use the input data to determine column names, sizes and data types, and use default storage properties. Consider default storage properties carefully, because they may cause problems. By default, for temporary tables, Amazon Redshift applies EVEN table distribution with no column encoding (such as RAW compression) for all columns. This data structure is sub-optimal for many types of queries.

If you employ the SELECT…INTO syntax, you can’t set the column encoding, column distribution, or sort keys. The CREATE TABLE AS (CTAS) syntax instead lets you specify a distribution style and sort keys, and Amazon Redshift automatically applies LZO encoding for everything other than sort keys, Booleans, reals, and doubles. You can exert additional control by using the CREATE TABLE syntax rather than CTAS.

If you create temporary tables, remember to convert all SELECT…INTO syntax into the CREATE statement. This ensures that your temporary tables have column encodings and don’t cause distribution errors within your workflow. For example, you may want to convert a statement using this syntax:

SELECT column_a, column_b INTO #my_temp_table FROM my_table;

You need to analyze the temporary table for optimal column encoding:

Master=# analyze compression #my_temp_table;
Table | Column | Encoding
----------------+----------+---------
#my_temp_table | columb_a | lzo
#my_temp_table | columb_b | bytedict
(2 rows)

You can then convert the SELECT INTO a statement to the following:

BEGIN;

CREATE TEMPORARY TABLE my_temp_table(
column_a varchar(128) encode lzo,
column_b char(4) encode bytedict)
distkey (column_a) -- Assuming you intend to join this table on column_a
sortkey (column_b) -- Assuming you are sorting or grouping by column_b
;

INSERT INTO my_temp_table SELECT column_a, column_b FROM my_table;

COMMIT;

If you create a temporary staging table by using a CREATE TABLE LIKE statement, the staging table inherits the distribution key, sort keys, and column encodings from the parent target table. In this case, merge operations that join the staging and target tables on the same distribution key performs faster because the joining rows are collocated. To verify that the query uses a collocated join, run the query with EXPLAIN and check for DS_DIST_NONE on all the joins.

You may also want to analyze statistics on the temporary table, especially when you use it as a join table for subsequent queries. See the following code:

ANALYZE my_temp_table;

With this trick, you retain the functionality of temporary tables but control data placement on the cluster through distribution key assignment. You also take advantage of the columnar nature of Amazon Redshift by using column encoding.

Tip #7: Using QMR and Amazon CloudWatch metrics to drive additional performance improvements

In addition to the Amazon Redshift Advisor recommendations, you can get performance insights through other channels.

The Amazon Redshift cluster continuously and automatically collects query monitoring rules metrics, whether you institute any rules on the cluster or not. This convenient mechanism lets you view attributes like the following:

  • The CPU time for a SQL statement (query_cpu_time)
  • The amount of temporary space a job might ‘spill to disk’ (query_temp_blocks_to_disk)
  • The ratio of the highest number of blocks read over the average (io_skew)

It also makes Amazon Redshift Spectrum metrics available, such as the number of Amazon Redshift Spectrum rows and MBs scanned by a query (spectrum_scan_row_count and spectrum_scan_size_mb, respectively). The Amazon Redshift system view SVL_QUERY_METRICS_SUMMARY shows the maximum values of metrics for completed queries, and STL_QUERY_METRICS and STV_QUERY_METRICS carry the information at 1-second intervals for the completed and running queries respectively.

The Amazon Redshift CloudWatch metrics are data points for use with Amazon CloudWatch monitoring. These can be cluster-wide metrics, such as health status or read/write, IOPS, latency, or throughput. It also offers compute node–level data, such as network transmit/receive throughput and read/write latency. At the WLM queue grain, there are the number of queries completed per second, queue length, and others. CloudWatch facilitates monitoring concurrency scaling usage with the metrics ConcurrencyScalingSeconds and ConcurrencyScalingActiveClusters.

It’s recommended to consider the CloudWatch metrics (and the existing notification infrastructure built around them) before investing time in creating something new. Similarly, the QMR metrics cover most metric use cases and likely eliminate the need to write custom metrics.

Tip #8: Federated queries connect the OLAP, OLTP and data lake worlds

The new Federated Query feature in Amazon Redshift allows you to run analytics directly against live data residing on your OLTP source system databases and Amazon S3 data lake, without the overhead of performing ETL and ingesting source data into Amazon Redshift tables. This feature gives you a convenient and efficient option for providing realtime data visibility on operational reports, as an alternative to micro-ETL batch ingestion of realtime data into the data warehouse. By combining historical trend data from the data warehouse with live developing trends from the source systems, you can gather valuable insights to drive real-time business decision making.

For example, consider sales data residing in three different data stores:

  • Live sales order data stored on an Amazon RDS for PostgreSQL database (represented as “ext_postgres” in the following external schema)
  • Historical sales data warehoused in a local Amazon Redshift database (represented as “local_dwh”)
  • Archived, “cold” sales data older than 5 years stored on Amazon S3 (represented as “ext_spectrum”)

We can create a late binding view in Amazon Redshift that allows you to merge and query data from all three sources. See the following code:

CREATE VIEW store_sales_integrated AS 
SELECT * FROM ext_postgres.store_sales_live 
UNION ALL 
SELECT * FROM local_dwh.store_sales_current 
UNION ALL 
SELECT ss_sold_date_sk, ss_sold_time_sk, ss_item_sk, ss_customer_sk, ss_cdemo_sk, 
ss_hdemo_sk, ss_addr_sk, ss_store_sk, ss_promo_sk, ss_ticket_number, ss_quantity, 
ss_wholesale_cost, ss_list_price, ss_sales_price, ss_ext_discount_amt, 
ss_ext_sales_price, ss_ext_wholesale_cost, ss_ext_list_price, ss_ext_tax, 
ss_coupon_amt, ss_net_paid, ss_net_paid_inc_tax, ss_net_profit 
FROM ext_spectrum.store_sales_historical 
WITH NO SCHEMA BINDING
;

Currently, direct federated querying is supported for data stored in Amazon Aurora PostgreSQL and Amazon RDS for PostgreSQL databases, with support for other major RDS engines coming soon. You can also use the federated query feature to simplify the ETL and data-ingestion process. Instead of staging data on Amazon S3, and performing a COPY operation, federated queries allow you to ingest data directly into an Amazon Redshift table in one step, as part of a federated CTAS/INSERT SQL query.

For example, the following code shows an upsert/merge operation in which the COPY operation from Amazon S3 to Amazon Redshift is replaced with a federated query sourced directly from PostgreSQL:

BEGIN;

CREATE TEMP TABLE staging (LIKE ods.store_sales);

-- replace the following COPY from S3: 
   /*COPY staging FROM 's3://yourETLbucket/daily_store_sales/' 
   IAM_ROLE 'arn:aws:iam::<account_id>:role/<s3_reader_role>' 
   DELIMITER '|' COMPUPDATE OFF; */
      
-- with this federated query to load staging data directly from PostgreSQL source
INSERT INTO staging SELECT * FROM pg.store_sales p
    WHERE p.last_updated_date > (SELECT MAX(last_updated_date) FROM ods.store_sales);

DELETE FROM ods.store_sales USING staging s WHERE ods.store_sales.id = s.id;

INSERT INTO ods.store_sales SELECT * FROM staging;

DROP TABLE staging;

COMMIT;

For more information about setting up the preceding federated queries, see Build a Simplified ETL and Live Data Query Solution using Redshift Federated Query. For additional tips and best practices on federated queries, see Best practices for Amazon Redshift Federated Query.

Tip #9: Maintaining efficient data loads

Amazon Redshift best practices suggest using the COPY command to perform data loads of file-based data. Single-row INSERTs are an anti-pattern. The COPY operation uses all the compute nodes in your cluster to load data in parallel, from sources such as Amazon S3, Amazon DynamoDB, Amazon EMR HDFS file systems, or any SSH connection.

When performing data loads, compress the data files whenever possible. For row-oriented (CSV) data, Amazon Redshift supports both GZIP and LZO compression. It’s more efficient to load a large number of small files than one large one, and the ideal file count is a multiple of the cluster’s total slice count. Columnar data, such as Parquet and ORC, is also supported. You can achieve best performance when the compressed files are between 1MB-1GB each.

The number of slices per node depends on the cluster’s node size (and potentially elastic resize history). By ensuring an equal number of files per slice, you know that the COPY command evenly uses cluster resources and complete as quickly as possible. Query for the cluster’s current slice count with SELECT COUNT(*) AS number_of_slices FROM stv_slices;.

Another script in the amazon-redshift-utils GitHub repo, CopyPerformance, calculates statistics for each load. Amazon Redshift Advisor also warns of missing compression or too few files based on the number of slices (see the following screenshot):

Conducting COPY operations efficiently reduces the time to results for downstream users, and minimizes the cluster resources utilized to perform the load.

Tip #10: Using the latest Amazon Redshift drivers from AWS

Because Amazon Redshift is based on PostgreSQL, we previously recommended using JDBC4 PostgreSQL driver version 8.4.703 and psql ODBC version 9.x drivers. If you’re currently using those drivers, we recommend moving to the new Amazon Redshift–specific drivers. For more information about drivers and configuring connections, see JDBC and ODBC drivers for Amazon Redshift in the Amazon Redshift Cluster Management Guide.

While rarely necessary, the Amazon Redshift drivers do permit some parameter tuning that may be useful in some circumstances. Downstream third-party applications often have their own best practices for driver tuning that may lead to additional performance gains.

For JDBC, consider the following:

  • To avoid client-side out-of-memory errors when retrieving large data sets using JDBC, you can enable your client to fetch data in batches by setting the JDBC fetch size parameter or BlockingRowsMode.
  • Amazon Redshift doesn’t recognize the JDBC maxRows parameter. Instead, specify a LIMIT clause to restrict the result set. You can also use an OFFSET clause to skip to a specific starting point in the result set.

For ODBC, consider the following:

  • A cursor is enabled on the cluster’s leader node when useDelareFecth is enabled. The cursor fetches up to fetchsize/cursorsize and then waits to fetch more rows when the application request more rows.
  • The CURSOR command is an explicit directive that the application uses to manipulate cursor behavior on the leader node. Unlike the JDBC driver, the ODBC driver doesn’t have a BlockingRowsMode mechanism.

It’s recommended that you do not undertake driver tuning unless you have a clear need. AWS Support is available to help on this topic as well.

Conclusion

Amazon Redshift is a powerful, fully managed data warehouse that can offer increased performance and lower cost in the cloud. As Amazon Redshift grows based on the feedback from its tens of thousands of active customers world-wide, it continues to become easier to use and extend its price-for-performance value proposition. Staying abreast of these improvements can help you get more value (with less effort) from this core AWS service.

We hope you learned a great deal about making the most of your Amazon Redshift account with the resources in this post.

If you have questions or suggestions, please leave a comment.

 


About the Authors

Matt Scaer is a Principal Data Warehousing Specialist Solution Architect, with over 20 years of data warehousing experience, with 11+ years at both AWS and Amazon.com.

 

 

 

 

 

Manish Vazirani is an Analytics Specialist Solutions Architect at Amazon Web Services.

 

 

 

 

 

 

Tarun Chaudhary is an Analytics Specialist Solutions Architect at AWS.

Configure and optimize performance of Amazon Athena federation with Amazon Redshift

Post Syndicated from Harsha Tadiparthi original https://aws.amazon.com/blogs/big-data/configure-and-optimize-performance-of-amazon-athena-federation-with-amazon-redshift/

This post provides guidance on how to configure Amazon Athena federation with AWS Lambda and Amazon Redshift, while addressing performance considerations to ensure proper use.

If you use data lakes in Amazon Simple Storage Service (Amazon S3) and use Amazon Redshift as your data warehouse, you may want to integrate the two for a lake house approach. Lake House is the ability to integrate Data Lake and Data warehouse seamlessly. When you need to query your data lake from your Amazon Redshift Data warehouse, you can use Amazon Redshift Spectrum, which works great in unifying your data lake and data warehouse. However, when you use Athena in the data lake and need to access data in Amazon Redshift for the following two scenarios which are commonly seen, there is no easy approach:

  • Team A has a data lake in Amazon S3 and uses Athena. They need access to the data in an Amazon Redshift cluster owned by Team B.
  • Analysts using Athena to query their data lake for analytics need agility and flexibility to access data in an Amazon Redshift data warehouse without moving the data to Amazon S3 Data Lake.

In these scenarios, Athena federation with Amazon Redshift allows you to seamlessly access the data in your Amazon Redshift data warehouse without having to wait to unload the data to the Amazon S3 data lake, which removes the overhead in managing such jobs.

In this post, you walk through a step-by-step configuration to set up Athena federation using Lambda to access data in Amazon Redshift. You also see a performance benchmark analysis of interactive and ad hoc TPC-DS queries, and learn some key performance considerations and best practices when using federation.

Solution overview

Data federation is the capability to integrate data in another data store using a single interface. The following diagram depicts how Athena federation works by using Lambda to integrate with a federated data source.

Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Simply point to your data in Amazon S3, define the schema, and start querying using standard SQL.

Lambda lets you run code without provisioning or managing servers. You can run code for virtually any type of application with zero administration and only pay for when the code is running.

Amazon Redshift is a petabyte-scale data warehouse designed from the ground up, natively for the cloud. Amazon Redshift is the most popular and fastest cloud data warehouse. It’s integrated with your data lake, offers performance up to three times faster than any other data warehouse, and costs up to 75% less than any other cloud data warehouse.

The following diagram depicts all the data source connectors available as of this writing in the AWS Serverless Application Repository.

The AWS Serverless Application Repository is a managed repository for serverless applications. It enables you to store and share reusable applications, and easily assemble and deploy serverless architectures in powerful new ways.

You can also create a custom connector for sources that aren’t in the AWS Serverless Application Repository.

Prerequisites

Before you get started, create a secret for the Amazon Redshift login ID and password using AWS Secrets Manager.

  1. On the Secrets Manager console, choose Secrets.
  2. Choose Store a new secret.
  3. Choose credentials for your Amazon Redshift cluster, and set your user name and password.
  4. Choose the cluster you want to use.
  5. For Secret name, enter a name for your secret. Use the prefix AthenaJDBCFederation so it’s easy to find.
  6. Leave the remaining fields at their defaults and choose Next.
  7. Complete your secret creation.

Setting up your S3 bucket

On the Amazon S3 console, create a new S3 bucket and subfolder for Lambda to use. For this post, use the name myworkspace0009/athenafederation.

Configuring Athena federation with Amazon Redshift

To configure Athena federation with Amazon Redshift, complete the following steps:

  1. On the AWS Serverless Application Repository, choose Available applications.
  2. In the search field, enter athena federation.

  1. Choose
  2. In the Application settings section, provide the following details:
  3. Application nameAthenaRedshiftConnector
  4. SecretNamePrefixAthenaJdbcFederation
  5. SpillBucketmyworkspace0009/athenafederation
  6. JDBCConnectorConfigRedshift://jdbc:Redshift://<YourAmazon Redshift1Hostname>:5439/<DBName>?user=sample2&password=sample2
  7. DisableSpillEncyption – False
  8. LambdaFunctionNamerstpcds30
  9. SecurityGroupID – Security group ID where Amazon Redshift is deployed
  10. SpillPrefix – Leave default
  11. Subnetids – Use the subnets where Amazon Redshift is running with comma separation
  12. Select the I acknowledge check box.
  13. Choose Deploy.

In the next steps, you configure an Amazon Virtual Private Cloud (Amazon VPC) endpoint for Amazon S3 to allow Lambda to write federated query results to Amazon S3.

  1. On the Amazon VPC console, choose Endpoints.
  2. Choose Create endpoint.
  3. Choose the VPC for your endpoint.

  1. Make any necessary security changes as per your security requirements.

  1. Choose Create endpoint.

Running federated queries with Athena

To start running federated queries, complete the following steps:

  1. On the Athena console, choose Workgroups.
  2. If you don’t see a workgroup called AmazonAthenaPreviewFunctionality, create one.

When this feature becomes generally available, you won’t need to use this workgroup name.

  1. Run your queries, using lambda:rstpcds30 to run against tables in Amazon Redshift.

Athena query performance comparison

Several customers have asked us for performance insights and prescriptive guidance on how queries in Athena compare against federated queries and how to use them. In this section, we use a TPC-DS 3 TB standard dataset and a select few queries that fall in the category of ad hoc and interactive. The comparison of their performance should give you an idea of what to expect when running federated queries against Amazon Redshift.

For the following tests, we used a 3 TB TPC-DS dataset in Amazon S3 data lake with Parquet compressed, partitioned and served by Athena, and the same 3 TB TPC-DS dataset on Amazon Redshift cluster running four RA3.4XL nodes.

The following table summarizes the dataset sizes:

Dataset Table Size (Records)
store_sales 8.6 billion
customer 30 million
customer_address 15 million
customer_demographics 1.92 million
item 360,000
date_dim 73,000
store 1,350

We ran the following four tests:

  • T1 – Queries ran in Athena without federation. All table data is in Amazon S3.
  • T2 – Queries ran in Athena with federation to Amazon Redshift. All table data is in Amazon S3, except the store_sales fact table in Amazon Redshift.
  • T3 – Queries ran in Athena with federation to Amazon Redshift. All tables and data are in Redshift.
  • T4 – Queries ran in Amazon Redshift without federation. All tables and data are in Redshift.

The following graph represents the performance of some of the ad hoc and interactive TPC-DS queries.

In the preceding graph, all T3 queries timed out at 900 seconds, depicted by the pink reference line, due to the Lambda 900-second timeout limit. This is due to overhead from store_sales fact data that needed to be transferred back to Athena.

The following graph removes T3 from the visualization, which gives better visibility when comparing the other tests.

Notice the query performance between T1 and T2 that completed in almost the same time while T4 queries ran significantly faster.

Amazon Redshift beats the performance of Athena in providing extremely low latency and should be the tool of choice if you’re looking for very low SLAs for analytics queries that Athena can’t achieve.

The following graph shows the data scanned in Amazon S3 for T1 and T2, which outlines why there isn’t much difference in query performance when compared to federated queries.

For the T2 federated queries, a small amount of dimension data is filtered in Amazon Redshift and brought back to Athena, instead of scanning the entire dimension tables. This is a typical nature for several ad hoc and interactive queries.

The performance of these TPC-DS queries between T1 and T2 is comparable because very little data is transferred back to Athena. You can see a similar behavior in several ad hoc and interactive query use cases because they use limited dimensions and scan a small subset of dimension data. Due to the 900-second timeout for the Lambda instances that connect to Amazon Redshift, it’s advised to minimize the amount of data the query brings back. Although Athena uses multiple Lambda instances in parallel to run your federated query, it’s also important to make sure the Amazon Redshift WLM queue has enough slots to process it, thereby not leading to queue wait time. For example, in some of the preceding queries, 20 Lambda executions were connecting to Amazon Redshift concurrently.

Key performance best practice considerations

When considering Athena federation with Amazon Redshift, you could take into account the following best practices:

  • Athena federation works great for queries with predicate filtering because the predicates are pushed down to Amazon Redshift. Use filter and limited-range scans in your queries to avoid full table scans.
  • If your SQL query requires returning a large volume of data from Amazon Redshift to Athena (which could lead to query timeouts or slow performance), unload the large tables in your query from Redshift to your Amazon S3 data lake.
  • Star schema is a commonly used data model in Amazon Redshift. In the star schema model, unload your large fact tables into your data lake and leave the dimension tables in Amazon Redshift. If large dimension tables are contributing to slow performance or query timeouts, unload those tables to your data lake.
  • When you run federated queries, Athena spins up multiple Lambda functions, which causes a spike in database connections. It’s important to monitor the Amazon Redshift WLM queue slots to ensure there is no queuing. Additionally, you can use concurrency scaling on your Amazon Redshift cluster to benefit from concurrent connections to queue up.

Conclusion

In this post, you learned how to configure and use Athena federation with Amazon Redshift using Lambda. Now you don’t need to wait for all the data in your Amazon Redshift data warehouse to be unloaded to Amazon S3 and maintained on a day-to-day basis to run your queries. You can use the best practice considerations outlined in the post to minimize the data transferred from Amazon Redshift for better performance. When queries are well written for federation, the performance penalties are negligible, as observed in the TPC-DS benchmark queries in this post. Happy query federating!

 


About the Author

Harsha Tadiparthi is a Specialist Sr. Solutions Architect, AWS Analytics. He enjoys solving complex customer problems in Databases and Analytics and delivering successful outcomes. Outside of work, he loves to spend time with his family, watch movies, and travel whenever possible.

 

 

Automate dataset monitoring in Amazon QuickSight

Post Syndicated from Ginni Malik original https://aws.amazon.com/blogs/big-data/automate-dataset-monitoring-in-amazon-quicksight/

Amazon QuickSight is an analytics service that you can use to create datasets, perform one-time analyses, and build visualizations and dashboards. In an enterprise deployment of QuickSight, you can have multiple dashboards, and each dashboard can have multiple visualizations based on multiple datasets. This can quickly become a management overhead to view all the datasets’ status with their latest refresh timestamp.

This post demonstrates how to visualize datasets associated with all the dashboards in your account, with their latest refresh status and refresh time.

Solution overview

The following screenshot illustrates the architecture of the solution.

The architecture includes the following steps:

  1. You create the datasets and tag them via an AWS Lambda
  2. A second function gets the refresh status from the tagged datasets.
  3. The function stores the refresh status in Amazon Simple Storage Service (Amazon S3).
  4. You query the refresh status in Amazon Athena.
  5. You visualize the refresh status in QuickSight.

A QuickSight deployment can have multiple dashboards and each dashboard can have multiple datasets associated with it. You can end up having hundreds of datasets. It’s difficult to know if all the underlying datasets are refreshing as required unless you check them manually. However, QuickSight sends email notifications to the dataset owner on its dataset refresh failure. This solution provides a holistic view of all datasets’ refreshes.

The aim is to create a dashboard that monitors the refresh of the existing datasets and provides refresh status for the datasets.

To implement the solution, you must create the following:

  • A Lambda execution role for QuickSight.
  • A scheduled Lambda function to tag the datasets.
  • A scheduled Lambda function to get the last refresh status of the datasets and store it in Amazon S3.
  • An external table in Athena on top of the S3 bucket.
  • A QuickSight dashboard using Athena as the data source, which provides the datasets’ last refresh status.

This post assumes that you have existing analyses and dashboards with numerous datasets.

Creating a Lambda execution role for QuickSight

Your first step is to create a Lambda execution role that allows you to perform tagging and create QuickSight analysis, datasets, and data sources. The role should be able to describe and update them. The following code is an example role policy (replace the bucket name with the bucket for storing the QuickSight ingestion results):

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "quicksight:CreateDashboard",
                "quicksight:List*",
                "quicksight:Describe*",
                "quicksight:Tag*",
                "tag:GetResources"
            ],
            "Resource": "*"
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": "s3:PutObject",
            "Resource": [
                "arn:aws:s3:::bucket-name-to-store-ingestion/*"
            ]
        }
    ]
}

Creating a scheduled Lambda function to tag the datasets

The next step is to identify all the datasets required for your dashboard and tag them. It’s easier to do this right after you create the dataset. Complete the following steps:

  1. On the QuickSight console, choose Manage data.

  1. Choose your dataset and choose Edit dataset.

  1. Record the dataset ID from the URL (data-sets/<dataset ID>/prepare).

Alternatively, you can use a Lambda function to find the dataset name and ID. See the following code (replace the AwsAccountID with your ID):

import boto3
client = boto3.client('quicksight')
def lambda_handler(event, context):
    response=client.list_data_sets(
        AwsAccountId='1234567890',
        MaxResults=100
    )
    for r in response['DataSetSummaries']:
        dataset_info={}
        dataset_info['id']=r['DataSetId']
        dataset_info['name']=r['Name']
        print (dataset_info['name']+':'+dataset_info['id'])

The function provides all the datasets in your account. Make sure to record the dataset IDs specific to your dashboard.

  1. Create your Lambda function.
  2. Tag the datasets per your individual dashboards. See the following code (use the target dashboard name and ID to create the tagging key, and replace the dataset_ids and account number with your own):
import boto3
client = boto3.client('quicksight')
acct_id = '123456678788'
def lambda_handler(event, context):
    dataset_ids=['0e994f54-8d08-4b64-98ca-195cf7b46077','16d5bf20-4415-42d1-b54c-9aba95b13d67','5c5fd93a-0bb6-468f-a0c4-ff1c15597d20']
    for i in dataset_ids:
        response = client.tag_resource(
            ResourceArn='arn:aws:quicksight:us-east-1:123456678788:dataset/{}'.format(i),
            Tags=[
                {
                    'Key' : 'DashboardName',
                    'Value' : 'QuickSight_refresh_status_demo'
                }
                ]
                )

You can do this for all your dashboards. The only limitation is you can only tag one dataset to one dashboard name key pair.

If you tag the datasets with a wrong key, you can remove them using an untag call and replace the ResourceArn with the specific dataset ARN. See the following code:

     response = client.untag_resource(
    ResourceArn='arn:aws:quicksight:us-east-1:123456678788:dataset/794e28ae-2b89-49ef-b885-196c95bfd4f8',
    TagKeys=[
        'DashboardName'
    ]
    )

Creating a Lambda function to get the last refresh status

The next step is to configure a Lambda function that gets the last refresh status of the tagged datasets and loads it into Amazon S3. You use resourcegroupstaggingapi to get back all the resources with a particular key. For this post, the key is the DashboardName. From the response of the ResourceTagMappingList, you filter out the dataset ID and dataset ARN. You also get the data source ARN and name for each dataset associated with the particular key value. Finally, you list the ingestions for all the datasets and classify them as one of the following:

  • Failed – The last refresh failed.
  • Did not run within last 24 hours – No ingestion ID in the last 24 hours (the time is configurable). You explicitly use this status even if the previous run before the last 24 hours succeeded or failed. This makes sure the datasets adhere to a certain refresh schedule. For this post, you want the datasets to refresh one time a day.
  • Error – No ingestion ID for more than 90 days.

See the following code (replace the placeholder text with your specific values):

import json
import boto3
import csv
from botocore.exceptions import ClientError
from datetime import datetime
from datetime import timedelta
from datetime import timezone
import jmespath

glue = boto3.client('glue')
s3= boto3.client('s3')
client = boto3.client('resourcegroupstaggingapi')
client1 = boto3.client('quicksight')
dataset_ids=[]
AwsAccountId='123456678788'
def lambda_handler(event, context):
    items=[]
    tagfilters=[
        {
            'Key': 'DashboardName',
            'Values': [
                'QuickSight_refresh_status_demo'
                            #add dashboard name
            ]
        },
    ]


    response = client.get_resources(
    TagFilters=tagfilters
    )
    
    # Get the response back for each of the above listed Key Values
    resources = response['ResourceTagMappingList']
    for resource in resources:
        perm=""
        data={}
        permission=[]
        data['resource_ARN']=resource['ResourceARN']
        dataset_id_arn=data['resource_ARN'].split('/')
        data['dataset_id']=dataset_id_arn[1]
        # For each of the above dataset , describe the dataset to get the data source
        response = client1.describe_data_set(
                 AwsAccountId=AwsAccountId,
                 DataSetId=data['dataset_id']
        )

        datasourcearn = jmespath.search('DataSet.PhysicalTableMap.*.*.DataSourceArn',response)
        datasourcearnid= str(datasourcearn[0]).split('/')
        datasourcearnid=datasourcearnid[1]
        datasourcearnid=datasourcearnid.replace("']",'')
        data['DatasetName']=response.get('DataSet').get('Name')
        response = client1.describe_data_source(
            AwsAccountId=AwsAccountId,
            DataSourceId=datasourcearnid
        )
        datasourcename=response.get('DataSource').get('Name')
        data['DataSourceName']=datasourcename
        resource_tags = resource['Tags']
        for tag in resource_tags:

                if tag['Key'] == 'DashboardName':
                    data['dashboard_name'] = tag['Value']

        response1= client1.list_ingestions(
            DataSetId=data['dataset_id'],
            AwsAccountId=AwsAccountId,
            MaxResults=1  # To get the latest ingestion, if you want history you can change this number
        )

        if response1.get('Ingestions'):
            for i in response1['Ingestions']:
                data['IngestionId']=i['IngestionId']
                data['CreatedTime']=i['CreatedTime']
                try:
                    response = client1.describe_ingestion(DataSetId=data['dataset_id'],IngestionId=data['IngestionId'],AwsAccountId=AwsAccountId)

                    if  response:
                        data['Time']=str(response['Ingestion']['CreatedTime'])
                        if ((datetime.utcnow() - response['Ingestion']['CreatedTime'].replace(tzinfo=None)).total_seconds()) >= (24*60*60):  #Check the refresh status within last 24 hours, you can change this per your requirement
                            data['Status']='Did not run within last 24 hrs'
                        else:
                            data['Status']=response['Ingestion']['IngestionStatus']
                except ClientError as e:
                    data['Time']='Failed, Check if dataset is being used'
                    data['Status']=e.response['Error']['Message']

            items.append(data)





    row=['DashboardName,DatsetName,Status,Time,DataSourceName']
    csv_key='quicksight-dashboard-metada/report.csv'



    for data in items:

        row.append(data['dashboard_name'] +','+data['DatasetName']+','+data['Status']+','+data['Time']+','+data['DataSourceName'])
    values = '\n'.join(str(v) for v in row)

    response = s3.put_object(
        Body=values,
        Bucket='bucketname',
        Key=csv_key
    )

The last status run is now stored in a .csv file in the specific bucket mentioned in the Lambda function (see the following screenshot).

You can also schedule your function to run at a certain frequency, depending on when you want to check the status.

Creating an external table in Athena on top of Amazon S3

Now you can create an external Athena table on top of the .csv file you stored in Amazon S3 and query it. Use the following table definition for reference (replace the location with the location of your S3 bucket):

CREATE EXTERNAL TABLE IF NOT EXISTS qs_meta_table
( `DashboardName` string, 
 `DatasetName` string, 
 `Status` string, 
 `LastRefershTime` string, 
 `DataSourceName` string ) 
 ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES 
 ( "separatorChar" = "," )
 LOCATION 's3://bucketname/quicksight-dashboard-metada/' 
 TBLPROPERTIES ( "skip.header.line.count"="1")

You can get the latest status of the dataset refreshes by querying the table with SQL in Athena.

Creating a QuickSight dashboard using Athena as the data source

To visualize this data and share it with others, build a dashboard on top of the data in QuickSight. The following screenshot shows the listed dashboards.

You first create a dataset for the Athena table.

  1. On the QuickSight console, choose Manage data.
  2. Choose Create dataset.

You use Athena as the source for your dataset. If you don’t have an existing Athena data source, you can create a new one. For instructions, see Creating a Data Source.

  1. Choose the table you just created.

  1. Select Import to SPICE for quicker analysis.

Depending on the size of your dataset and expected latency, you can choose Directly query your data instead. If you use SPICE, remember to add a refresh schedule for the dataset.

  1. Create an analysis from the dataset.

For this post, choose a table visual type and drag all the columns to the Value field well.

You can create the visualization as in the following screenshot, with conditional formatting to highlight failed and successful loads.

  1. To publish the dashboard, choose Share on the application bar of the analysis.
  2. Choose Publish dashboard.

  1. For Publish new dashboard as, enter a name for your dashboard.

You can now share the dashboard with end-users.

Conclusion

In this post, we described how to create a QuickSight dashboard that can track the last refresh status of all the datasets in your account. The dashboard provides a single pane view of the status of all the datasets and avoids the manual effort of opening and checking each individual dataset.

 


About the authors

Ginni Malik is an Associate Cloud Developer with AWS.

 

 

 

 

 

Rohan Jamadagni is a Solutions Architect with AWS.