Tag Archives: Amazon Simple Storage Service (S3)

Serverless architecture for optimizing Amazon Connect call-recording archival costs

Post Syndicated from Brian Maguire original https://aws.amazon.com/blogs/architecture/serverless-architecture-for-optimizing-amazon-connect-call-recording-archival-costs/

In this post, we provide a serverless solution to cost-optimize the storage of contact-center call recordings. The solution automates the scheduling, storage-tiering, and resampling of call-recording files, resulting in immediate cost savings. The solution is an asynchronous architecture built using AWS Step Functions, Amazon Simple Queue Service (Amazon SQS), and AWS Lambda.

Amazon Connect provides an omnichannel cloud contact center with the ability to maintain call recordings for compliance and gaining actionable insights using Contact Lens for Amazon Connect and AWS Contact Center Intelligence Partners. The storage required for call recordings can quickly increase as customers meet compliance retention requirements, often spanning six or more years. This can lead to hundreds of terabytes in long-term storage.

Solution overview

When an agent completes a customer call, Amazon Connect sends the call recording to an Amazon Simple Storage Solution (Amazon S3) bucket with: a date and contact ID prefix, the file stored in the .WAV format and encoded using bitrate 256 kb/s, pcm_s16le, 8000 Hz, two channels, and 256 kb/s. The call-recording files are approximately 2 Mb/minute optimized for high-quality processing, such as machine learning analysis (see Figure 1).

Asynchronous architecture for batch resampling for call-recording files on Amazon S3

Figure 1. Asynchronous architecture for batch resampling for call-recording files on Amazon S3

When a call recording is sent to Amazon S3, downstream post-processing is often performed to generate analytics reports for agents and quality auditors. The downstream processing can include services that provide transcriptions, quality-of-service metrics, and sentiment analysis to create reports and trigger actionable events.

While this processing is often completed within minutes, the downstream applications could require processing retries. As audio resampling reduces the quality of the audio files, it is essential to delay resampling until after processing is completed. As processed call recordings are infrequently accessed days after a call is completed, with only a small percentage accessed by agents and call quality auditors, call recordings can benefit from resampling and transitioning to long-term Amazon S3 storage tiers.

In Figure 2, multiple AWS services work together to provide an end-to-end cost-optimization solution for your contact center call recordings.

AWS Step Function orchestrates the batch resampling of call recordings

Figure 2. AWS Step Function orchestrates the batch resampling of call recordings

An Amazon EventBridge schedule rule triggers the step function to perform the batch resampling process for all call recordings from the previous 7 days.

In the first step function task, the Lambda function task iterates the S3 bucket using the ListObjectsV2 API, obtaining the call recordings (1000 objects per iteration) with the date prefix from 7 days ago.

The next task invokes a Lambda function inserting the call recording objects into the Amazon SQS queue. The audio-conversion Lambda function receives the Amazon SQS queue events via the event source mapping Lambda integration. Each concurrent Lambda invocation downloads a stored call recording from Amazon S3, resampling the .WAV with ffmpeg and tagging the S3 object with a “converted=True” tag.

Finally, the conversion function uploads the resampled file to Amazon S3, overwriting the original call recording with the resampled recording using a cost-optimized storage class, such as S3 Glacier Instant Retrieval. S3 Glacier Instant Retrieval provides the lowest cost for long-lived data that is rarely accessed and requires milliseconds retrieval, such as for contact-center call-recording playback. By default, Amazon Connect stores call recordings with S3 Versioning enabled, maintaining the original file as a version. You can use lifecycle policies to delete object versions from a version-enabled bucket to permanently remove the original version, as this will minimize the storage of the original call recording.

This solution captures failures within the step function workflow with logging and a dead-letter queue, such as when an error occurs with resampling a recording file. A Step Function task monitors the Amazon SQS queue using the AWS Step Function integration with AWS SDK with SQS and ending the workflow when the queue is emptied. Table 1 demonstrates the default and resampled formats.

Detailed AWS Step Functions state machine diagram

Figure 3. Detailed AWS Step Functions state machine diagram

Resampling

Table 1. Default and resampled call recording audio formats

Audio sampling formats File size/minute Notes
Bitrate 256 kb/s, pcm_s16le, 8000 Hz, 2 channels, 256 kb/s 2 MB The default for Amazon Connect call recordings. Sampled for audio quality and call analytics processing.
Bitrate 64 kb/s, pcm_alaw, 8000 Hz, 1 channel, 64 kb/s 0.5 MB Resampled to mono channel 8 bit. This resampling is not reversible and should only be performed after all call analytics processing has been completed.

Cost assessment

For pricing information for the primary services used in the solution, visit:

The costs incurred by the solution are based on usage and are AWS Free Tier eligible. After the AWS Free Tier allowance is consumed, usage costs are approximately $0.11 per 1000 minutes of call recordings. S3 Standard starts at $0.023 per GB/month; and S3 Glacier Instant Retrieval is $0.004 per GB/month, with $0.003 per GB of data retrieval. During a 6-year compliance retention term, the schedule-based resampling and storage tiering results in significant cost savings.

In the 6-year example detailed in Table 2, the S3 Standard storage costs would be approximately $356,664 for 3 million call-recording minutes/month. The audio resampling and S3 Glacier Instant Retrieval tiering reduces the 6-year cost to approximately $41,838.

Table 2. Multi-year costs savings scenario (3 million minutes/month) in USD

Year Total minutes (3 million/month) Total storage (TB) Cost of storage, S3 Standard (USD) Cost of running the resampling (USD) Cost of resampling solution with S3 Glacier Instant Retrieval (USD)
1 36,000,000 72 10,764 3,960 4,813
2 72,000,000 108 30,636 3,960 5,677
3 108,000,000 144 50,508 3,960 6,541
4 144,000,000 180 70,380 3,960 7,405
5 180,000,000 216 90,252 3,960 8,269
6 216,000,000 252 110,124 3,960 9,133
Total 1,008,000,000 972 356,664 23,760 41,838

To explore PCA costs for yourself, use AWS Cost Explorer or choose Bill Details on the AWS Billing Dashboard to see your month-to-date spend by service.

Deploying the solution

The code and documentation for this solution are available by cloning the git repository and can be deployed with AWS Cloud Development Kit (AWS CDK).

Bash
# clone repository
git clone https://github.com/aws-samples/amazon-connect-call-recording-cost-optimizer.git
# navigate the project directory
cd amazon-connect-call-recording-cost-optimizer

Modify the cdk.context.json with your environment’s configuration setting, such as the bucket_name. Next, install the AWS CDK dependencies and deploy the solution:

:# ensure you are in the root directory of the repository

./cdk-deploy.sh

Once deployed, you can test the resampling solution by waiting for the EventBridge schedule rule to execute based on the num_days_age setting that is applied. You can also manually run the AWS Step Function with a specified date, for example {"specific_date":"01/01/2022"}.

The AWS CDK deployment creates the following resources:

  • AWS Step Function
  • AWS Lambda function
  • Amazon SQS queues
  • Amazon EventBridge rule

The solution handles the automation of transitioning a storage tier, such as S3 Glacier Instant Retrieval. In addition, Amazon S3 Lifecycles can be set manually to transition the call recordings after resampling to alternative Amazon S3 Storage Classes.

Cleanup

When you are finished experimenting with this solution, cleanup your resources by running the command:

cdk destroy

This command deletes the AWS CDK-deployed resources. However, the S3 bucket containing your call recordings and CloudWatch log groups are retained.

Conclusion

This call recording resampling solution offers an automated, cost-optimized, and scalable architecture to reduce long-term compliance call recording archival costs.

Optimize Federated Query Performance using EXPLAIN and EXPLAIN ANALYZE in Amazon Athena

Post Syndicated from Nishchai JM original https://aws.amazon.com/blogs/big-data/optimize-federated-query-performance-using-explain-and-explain-analyze-in-amazon-athena/

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. In 2019, Athena added support for federated queries to run SQL queries across data stored in relational, non-relational, object, and custom data sources.

In 2021, Athena added support for the EXPLAIN statement, which can help you understand and improve the efficiency of your queries. The EXPLAIN statement provides a detailed breakdown of a query’s run plan. You can analyze the plan to identify and reduce query complexity and improve its runtime. You can also use EXPLAIN to validate SQL syntax prior to running the query. Doing so helps prevent errors that would have occurred while running the query.

Athena also added EXPLAIN ANALYZE, which displays the computational cost of your queries alongside their run plans. Administrators can benefit from using EXPLAIN ANALYZE because it provides a scanned data count, which helps you reduce financial impact due to user queries and apply optimizations for better cost control.

In this post, we demonstrate how to use and interpret EXPLAIN and EXPLAIN ANALYZE statements to improve Athena query performance when querying multiple data sources.

Solution overview

To demonstrate using EXPLAIN and EXPLAIN ANALYZE statements, we use the following services and resources:

Athena uses the AWS Glue Data Catalog to store and retrieve table metadata for the Amazon S3 data in your AWS account. The table metadata lets the Athena query engine know how to find, read, and process the data that you want to query. We use Athena data source connectors to connect to data sources external to Amazon S3.

Prerequisites

To deploy the CloudFormation template, you must have the following:

Provision resources with AWS CloudFormation

To deploy the CloudFormation template, complete the following steps:

  1. Choose Launch Stack:

  1. Follow the prompts on the AWS CloudFormation console to create the stack.
  2. Note the key-value pairs on the stack’s Outputs tab.

You use these values when configuring the Athena data source connectors.

The CloudFormation template creates the following resources:

  • S3 buckets to store data and act as temporary spill buckets for Lambda
  • AWS Glue Data Catalog tables for the data in the S3 buckets
  • A DynamoDB table and Amazon RDS for MySQL tables, which are used to join multiple tables from different sources
  • A VPC, subnets, and endpoints, which are needed for Amazon RDS for MySQL and DynamoDB

The following figure shows the high-level data model for the data load.

Create the DynamoDB data source connector

To create the DynamoDB connector for Athena, complete the following steps:

  1. On the Athena console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. For Data sources, select Amazon DynamoDB.
  4. Choose Next.

  1. For Data source name, enter DDB.

  1. For Lambda function, choose Create Lambda function.

This opens a new tab in your browser.

  1. For Application name, enter AthenaDynamoDBConnector.
  2. For SpillBucket, enter the value from the CloudFormation stack for AthenaSpillBucket.
  3. For AthenaCatalogName, enter dynamodb-lambda-func.
  4. Leave the remaining values at their defaults.
  5. Select I acknowledge that this app creates custom IAM roles and resource policies.
  6. Choose Deploy.

You’re returned to the Connect data sources section on the Athena console.

  1. Choose the refresh icon next to Lambda function.
  2. Choose the Lambda function you just created (dynamodb-lambda-func).

  1. Choose Next.
  2. Review the settings and choose Create data source.
  3. If you haven’t already set up the Athena query results location, choose View settings on the Athena query editor page.

  1. Choose Manage.
  2. For Location of query result, browse to the S3 bucket specified for the Athena spill bucket in the CloudFormation template.
  3. Add Athena-query to the S3 path.
  4. Choose Save.

  1. In the Athena query editor, for Data source, choose DDB.
  2. For Database, choose default.

You can now explore the schema for the sportseventinfo table; the data is the same in DynamoDB.

  1. Choose the options icon for the sportseventinfo table and choose Preview Table.

Create the Amazon RDS for MySQL data source connector

Now let’s create the connector for Amazon RDS for MySQL.

  1. On the Athena console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. For Data sources, select MySQL.
  4. Choose Next.

  1. For Data source name, enter MySQL.

  1. For Lambda function, choose Create Lambda function.

  1. For Application name, enter AthenaMySQLConnector.
  2. For SecretNamePrefix, enter AthenaMySQLFederation.
  3. For SpillBucket, enter the value from the CloudFormation stack for AthenaSpillBucket.
  4. For DefaultConnectionString, enter the value from the CloudFormation stack for MySQLConnection.
  5. For LambdaFunctionName, enter mysql-lambda-func.
  6. For SecurityGroupIds, enter the value from the CloudFormation stack for RDSSecurityGroup.
  7. For SubnetIds, enter the value from the CloudFormation stack for RDSSubnets.
  8. Select I acknowledge that this app creates custom IAM roles and resource policies.
  9. Choose Deploy.

  1. On the Lambda console, open the function you created (mysql-lambda-func).
  2. On the Configuration tab, under Environment variables, choose Edit.

  1. Choose Add environment variable.
  2. Enter a new key-value pair:
    • For Key, enter MYSQL_connection_string.
    • For Value, enter the value from the CloudFormation stack for MySQLConnection.
  3. Choose Save.

  1. Return to the Connect data sources section on the Athena console.
  2. Choose the refresh icon next to Lambda function.
  3. Choose the Lambda function you created (mysql-lamdba-function).

  1. Choose Next.
  2. Review the settings and choose Create data source.
  3. In the Athena query editor, for Data Source, choose MYSQL.
  4. For Database, choose sportsdata.

  1. Choose the options icon by the tables and choose Preview Table to examine the data and schema.

In the following sections, we demonstrate different ways to optimize our queries.

Optimal join order using EXPLAIN plan

A join is a basic SQL operation to query data on multiple tables using relations on matching columns. Join operations affect how much data is read from a table, how much data is transferred to the intermediate stages through networks, and how much memory is needed to build up a hash table to facilitate a join.

If you have multiple join operations and these join tables aren’t in the correct order, you may experience performance issues. To demonstrate this, we use the following tables from difference sources and join them in a certain order. Then we observe the query runtime and improve performance by using the EXPLAIN feature from Athena, which provides some suggestions for optimizing the query.

The CloudFormation template you ran earlier loaded data into the following services:

AWS Storage Table Name Number of Rows
Amazon DynamoDB sportseventinfo 657
Amazon S3 person 7,025,585
Amazon S3 ticketinfo 2,488

Let’s construct a query to find all those who participated in the event by type of tickets. The query runtime with the following join took approximately 7 mins to complete:

SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."person" p, 
"AwsDataCatalog"."athenablog"."ticketinfo" t 
WHERE 
t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

Now let’s use EXPLAIN on the query to see its run plan. We use the same query as before, but add explain (TYPE DISTRIBUTED):

EXPLAIN (TYPE DISTRIBUTED)
SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."person" p, 
"AwsDataCatalog"."athenablog"."ticketinfo" t 
WHERE 
t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

The following screenshot shows our output

Notice the cross-join in Fragment 1. The joins are converted to a Cartesian product for each table, where every record in a table is compared to every record in another table. Therefore, this query takes a significant amount of time to complete.

To optimize our query, we can rewrite it by reordering the joining tables as sportseventinfo first, ticketinfo second, and person last. The reason for this is because the WHERE clause, which is being converted to a JOIN ON clause during the query plan stage, doesn’t have the join relationship between the person table and sportseventinfo table. Therefore, the query plan generator converted the join type to cross-joins (a Cartesian product), which less efficient. Reordering the tables aligns the WHERE clause to the INNER JOIN type, which satisfies the JOIN ON clause and runtime is reduced from 7 minutes to 10 seconds.

The code for our optimized query is as follows:

SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."ticketinfo" t, 
"AwsDataCatalog"."athenablog"."person" p 
WHERE 
t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

The following is the EXPLAIN output of our query after reordering the join clause:

EXPLAIN (TYPE DISTRIBUTED) 
SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."ticketinfo" t, 
"AwsDataCatalog"."athenablog"."person" p 
WHERE t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

The following screenshot shows our output.

The cross-join changed to INNER JOIN with join on columns (eventid, id, ticketholder_id), which results in the query running faster. Joins between the ticketinfo and person tables converted to the PARTITION distribution type, where both left and right tables are hash-partitioned across all worker nodes due to the size of the person table. The join between the sportseventinfo table and ticketinfo are converted to the REPLICATED distribution type, where one table is hash-partitioned across all worker nodes and the other table is replicated to all worker nodes to perform the join operation.

For more information about how to analyze these results, refer to Understanding Athena EXPLAIN statement results.

As a best practice, we recommend having a JOIN statement along with an ON clause, as shown in the following code:

SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"AwsDataCatalog"."athenablog"."person" p 
JOIN "AwsDataCatalog"."athenablog"."ticketinfo" t ON t.ticketholder_id = p.id 
JOIN "ddb"."default"."sportseventinfo" e ON t.sporting_event_id = cast(e.eventid as double)

Also as a best practice when you join two tables, specify the larger table on the left side of join and the smaller table on the right side of the join. Athena distributes the table on the right to worker nodes, and then streams the table on the left to do the join. If the table on the right is smaller, then less memory is used and the query runs faster.

In the following sections, we present examples of how to optimize pushdowns for filter predicates and projection filter operations for the Athena data source using EXPLAIN ANALYZE.

Pushdown optimization for the Athena connector for Amazon RDS for MySQL

A pushdown is an optimization to improve the performance of a SQL query by moving its processing as close to the data as possible. Pushdowns can drastically reduce SQL statement processing time by filtering data before transferring it over the network and filtering data before loading it into memory. The Athena connector for Amazon RDS for MySQL supports pushdowns for filter predicates and projection pushdowns.

The following table summarizes the services and tables we use to demonstrate a pushdown using Aurora MySQL.

Table Name Number of Rows Size in KB
player_partitioned 5,157 318.86
sport_team_partitioned 62 5.32

We use the following query as an example of a filtering predicate and projection filter:

SELECT full_name,
name 
FROM "sportsdata"."player_partitioned" a 
JOIN "sportsdata"."sport_team_partitioned" b ON a.sport_team_id=b.id 
WHERE a.id='1.0'

This query selects the players and their team based on their ID. It serves as an example of both filter operations in the WHERE clause and projection because it selects only two columns.

We use EXPLAIN ANALYZE to get the cost for the running this query:

EXPLAIN ANALYZE 
SELECT full_name,
name 
FROM "MYSQL"."sportsdata"."player_partitioned" a 
JOIN "MYSQL"."sportsdata"."sport_team_partitioned" b ON a.sport_team_id=b.id 
WHERE a.id='1.0'

The following screenshot shows the output in Fragment 2 for the table player_partitioned, in which we observe that the connector has a successful pushdown filter on the source side, so it tries to scan only one record out of the 5,157 records in the table. The output also shows that the query scan has only two columns (full_name as the projection column and sport_team_id and the join column), and uses SELECT and JOIN, which indicates the projection pushdown is successful. This helps reduce the data scan when using Athena data source connectors.

Now let’s look at the conditions in which a filter predicate pushdown doesn’t work with Athena connectors.

LIKE statement in filter predicates

We start with the following example query to demonstrate using the LIKE statement in filter predicates:

SELECT * 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name LIKE '%Aar%'

We then add EXPLAIN ANALYZE:

EXPLAIN ANALYZE 
SELECT * 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name LIKE '%Aar%'

The EXPLAIN ANALYZE output shows that the query performs the table scan (scanning the table player_partitioned, which contains 5,157 records) for all the records even though the WHERE clause only has 30 records matching the condition %Aar%. Therefore, the data scan shows the complete table size even with the WHERE clause.

We can optimize the same query by selecting only the required columns:

EXPLAIN ANALYZE 
SELECT sport_team_id,
full_name 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name LIKE '%Aar%'

From the EXPLAIN ANALYZE output, we can observe that the connector supports the projection filter pushdown, because we select only two columns. This brought the data scan size down to half of the table size.

OR statement in filter predicates

We start with the following query to demonstrate using the OR statement in filter predicates:

SELECT id,
first_name 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name = 'Aaron' OR id ='1.0'

We use EXPLAIN ANALYZE with the preceding query as follows:

EXPLAIN ANALYZE 
SELECT * 
FROM 
"MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name = 'Aaron' OR id ='1.0'

Similar to the LIKE statement, the following output shows that query scanned the table instead of pushing down to only the records that matched the WHERE clause. This query outputs only 16 records, but the data scan indicates a complete scan.

Pushdown optimization for the Athena connector for DynamoDB

For our example using the DynamoDB connector, we use the following data:

Table Number of Rows Size in KB
sportseventinfo 657 85.75

Let’s test the filter predicate and project filter operation for our DynamoDB table using the following query. This query tries to get all the events and sports for a given location. We use EXPLAIN ANALYZE for the query as follows:

EXPLAIN ANALYZE 
SELECT EventId,
Sport 
FROM "DDB"."default"."sportseventinfo" 
WHERE Location = 'Chase Field'

The output of EXPLAIN ANALYZE shows that the filter predicate retrieved only 21 records, and the project filter selected only two columns to push down to the source. Therefore, the data scan for this query is less than the table size.

Now let’s see where filter predicate pushdown doesn’t work. In the WHERE clause, if you apply the TRIM() function to the Location column and then filter, predicate pushdown optimization doesn’t apply, but we still see the projection filter optimization, which does apply. See the following code:

EXPLAIN ANALYZE 
SELECT EventId,
Sport 
FROM "DDB"."default"."sportseventinfo" 
WHERE trim(Location) = 'Chase Field'

The output of EXPLAIN ANALYZE for this query shows that the query scans all the rows but is still limited to only two columns, which shows that the filter predicate doesn’t work when the TRIM function is applied.

We’ve seen from the preceding examples that the Athena data source connector for Amazon RDS for MySQL and DynamoDB do support filter predicates and projection predicates for pushdown optimization, but we also saw that operations such as LIKE, OR, and TRIM when used in the filter predicate don’t support pushdowns to the source. Therefore, if you encounter unexplained charges in your federated Athena query, we recommend using EXPLAIN ANALYZE with the query and determine whether your Athena connector supports the pushdown operation or not.

Please note that running EXPLAIN ANALYZE incurs cost because it scans the data.

Conclusion

In this post, we showcased how to use EXPLAIN and EXPLAIN ANALYZE to analyze Athena SQL queries for data sources on AWS S3 and Athena federated SQL query for data source like DynamoDB and Amazon RDS for MySQL. You can use this as an example to optimize queries which would also result in cost savings.


About the Authors

Nishchai JM is an Analytics Specialist Solutions Architect at Amazon Web services. He specializes in building Big-data applications and help customer to modernize their applications on Cloud. He thinks Data is new oil and spends most of his time in deriving insights out of the Data.

Varad Ram is Senior Solutions Architect in Amazon Web Services. He likes to help customers adopt to cloud technologies and is particularly interested in artificial intelligence. He believes deep learning will power future technology growth. In his spare time, he like to be outdoor with his daughter and son.

Extending PowerShell on AWS Lambda with other services

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/extending-powershell-on-aws-lambda-with-other-services/

This post expands on the functionality introduced with the PowerShell custom runtime for AWS Lambda. The previous blog explains how the custom runtime approach makes it easier to run Lambda functions written in PowerShell.

You can add additional functionality to your PowerShell serverless applications by importing PowerShell modules, which are shareable packages of code. Build your own modules or import from the wide variety of existing vendor modules to manage your infrastructure and applications.

You can also take advantage of the event-driven nature of Lambda, which allows you to run Lambda functions in response to events. Events can include an object being uploaded to Amazon S3, a message placed on an Amazon SQS queue, a scheduled task using Amazon EventBridge, or an HTTP request from Amazon API Gateway. Lambda functions support event triggers from over 200 AWS services and software as a service (SaaS) applications.

Adding PowerShell modules

You can add PowerShell modules from a number of locations. These can include modules from the AWS Tools for PowerShell, from the PowerShell Gallery, or your own custom modules. Lambda functions access these PowerShell modules within specific folders within the Lambda runtime environment.

You can include PowerShell modules via Lambda layers, within your function code package, or container image. When using .zip archive functions, you can use layers to package and share modules to use with your functions. Layers reduce the size of uploaded deployment archives and can make it faster to deploy your code. You can attach up to five layers to your function, one of which must be the PowerShell custom runtime layer. You can include multiple modules per layer.

The custom runtime configures PowerShell’s PSModulePath environment variable, which contains the list of folder locations to search to find modules. The runtime searches the folders in the following order:

1. User supplied modules as part of function package

You can include PowerShell modules inside the published Lambda function package in a /modules subfolder.

2. User supplied modules as part of Lambda layers

You can publish Lambda layers that include PowerShell modules in a /modules subfolder. This allows you to share modules across functions and accounts. Lambda extracts layers to /opt within the Lambda runtime environment so the modules are located in /opt/modules. This is the preferred solution to use modules with multiple functions.

3. Default/user supplied modules supplied with PowerShell

You can also include additional default modules and add them within a /modules folder within the PowerShell custom runtime layer.

For example, the following function includes four Lambda layers. One layer includes the custom runtime. Three additional layers include further PowerShell modules; the AWS Tools for PowerShell, your own custom modules, and third-party modules. You can also include additional modules with your function code.

Lambda layers

Lambda layers

Within your PowerShell code, you can load modules during the function initialization (init) phase. This initializes the modules before the handler function runs, which speeds up subsequent warm-start invocations.

Adding modules from the AWS Tools for PowerShell

This post shows how to use the AWS Tools for PowerShell to manage your AWS services and resources. The tools are packaged as a set of PowerShell modules that are built on the functionality exposed by the AWS SDK for .NET. You can follow similar packaging steps to add other modules to your functions.

The AWS Tools for PowerShell are available as three distinct packages:

The AWS.Tools package is the preferred modularized version, which allows you to load only the modules for the services you want to use. This reduces package size and function memory usage. The AWS.Tools cmdlets support auto-importing modules without having to call Import-Module first. However, specifically importing the modules during the function init phase is more efficient and can reduce subsequent invoke duration. The AWS.Tools.Common module is required and provides cmdlets for configuration and authentication that are not service specific.

The accompanying GitHub repository contains the code for the custom runtime, along with a number of example applications. There are also module build instructions for adding a number of common PowerShell modules as Lambda layers, including AWS.Tools.

Building an event-driven PowerShell function

The repository contains an example of an event-driven demo application that you can build using serverless services.

A clothing printing company must manage its t-shirt size and color inventory. The printers store t-shirt orders for each day in a CSV file. The inventory service is one service that must receive the CSV file. It parses the file and, for each order, records the details to manage stock deliveries.

The stores upload the files to S3. This automatically invokes a PowerShell Lambda function, which is configured to respond to the S3 ObjectCreated event. The Lambda function receives the S3 object location as part of the $LambdaInput event object. It uses the AWS Tools for PowerShell to download the file from S3. It parses the contents and, for each line in the CSV file, sends the individual order details as an event to an EventBridge event bus.

In this example, there is a single rule to log the event to Amazon CloudWatch Logs to show the received event. However, you could route each order, depending on the order details, to different targets. For example, you can send different color combinations to SQS queues, which the dyeing service can use to order dyes. You could send particular size combinations to another Lambda function that manages cloth orders.

Example event-driven application

Example event-driven application

The previous blog post shows how to use the AWS Serverless Application Model (AWS SAM) to build a Lambda layer, which includes only the AWS.Tools.Common module to run Get-AWSRegion. To build a PowerShell application to process objects from S3 and send events to EventBridge, you can extend this functionality by also including the AWS.Tools.S3 and AWS.Tools.EventBridge modules in a Lambda layer.

Lambda layers, including S3 and EventBridge

Lambda layers, including S3 and EventBridge

Building the AWS Tools for PowerShell layer

You could choose to add these modules and rebuild the existing layer. However, the example in this post creates a new Lambda layer to show how you can have different layers for different module combinations of AWS.Tools. The example also adds the Lambda layer Amazon Resource Name (ARN) to AWS Systems Manager Parameter Store to track deployed layers. This allows you to reference them more easily in infrastructure as code tools.

The repository includes build scripts for both Windows and non-Windows developers. Windows does not natively support Makefiles. When using Windows, you can use either Windows Subsystem for Linux (WSL)Docker Desktop, or native PowerShell.

When using Linux, macOS, WSL, or Docker, the Makefile builds the Lambda layers. After downloading the modules, it also extracts the additional AWS.Tools.S3 and AWS.Tools.EventBridge modules.

# Download AWSToolsLayer module binaries
curl -L -o $(ARTIFACTS_DIR)/AWS.Tools.zip https://sdk-for-net.amazonwebservices.com/ps/v4/latest/AWS.Tools.zip
mkdir -p $(ARTIFACTS_DIR)/modules

# Extract select AWS.Tools modules (AWS.Tools.Common required)
unzip $(ARTIFACTS_DIR)/AWS.Tools.zip 'AWS.Tools.Common/**/*' -d $(ARTIFACTS_DIR)/modules/
unzip $(ARTIFACTS_DIR)/AWS.Tools.zip 'AWS.Tools.S3/**/*' -d $(ARTIFACTS_DIR)/modules/
unzip $(ARTIFACTS_DIR)/AWS.Tools.zip 'AWS.Tools.EventBridge/**/*' -d $(ARTIFACTS_DIR)/modules/

When using native PowerShell on Windows to build the layer, the build-AWSToolsLayer.ps1 script performs the same file copy functionality as the Makefile. You can use this option for Windows without WSL or Docker.

### Extract entire AWS.Tools modules to stage area but only move over select modules
…
Move-Item "$PSScriptRoot\stage\AWS.Tools.Common" "$PSScriptRoot\modules\" -Force
Move-Item "$PSScriptRoot\stage\AWS.Tools.S3" "$PSScriptRoot\modules\" -Force
Move-Item "$PSScriptRoot\stage\AWS.Tools.EventBridge" "$PSScriptRoot\modules\" -Force

The Lambda function code imports the required modules in the function init phase.

Import-Module "AWS.Tools.Common"
Import-Module "AWS.Tools.S3"
Import-Module "AWS.Tools.EventBridge"

For other combinations of AWS.Tools, amend the example build-AWSToolsLayer.ps1 scripts to add the modules you require. You can use a similar download and copy process, or PowerShell’s Save-Module to build layers for modules from other locations.

Building and deploying the event-driven serverless application

Follow the instructions in the GitHub repository to build and deploy the application.

The demo application uses AWS SAM to deploy the following resources:

  1. PowerShell custom runtime.
  2. Additional Lambda layer containing the AWS.Tools.Common, AWS.Tools.S3, and AWS.Tools.EventBridge modules from AWS Tools for PowerShell. The layer ARN is stored in Parameter Store.
  3. S3 bucket to store CSV files.
  4. Lambda function triggered by S3 upload.
  5. Custom EventBridge event bus and rule to send events to CloudWatch Logs.

Testing the event-driven application

Use the AWS CLI or AWS Tools for PowerShell to copy the sample CSV file to S3. Replace BUCKET_NAME with your S3 SourceBucket Name from the AWS SAM outputs.

AWS CLI

aws s3 cp .\test.csv s3://BUCKET_NAME

AWS Tools for PowerShell

Write-S3Object -BucketName BUCKET_NAME -File .\test.csv

The S3 file copy action generates an S3 notification event. This invokes the PowerShell Lambda function, passing the S3 file location details as part of the function $LambdaInput event object.

The function downloads the S3 CSV file, parses the contents, and sends the individual lines to EventBridge, which logs the events to CloudWatch Logs.

Navigate to the CloudWatch Logs group /aws/events/demo-s3-lambda-eventbridge.

You can see the individual orders logged from the CSV file.

EventBridge logs showing CSV lines

EventBridge logs showing CSV lines

Conclusion

You can extend PowerShell Lambda applications to provide additional functionality.

This post shows how to import your own or vendor PowerShell modules and explains how to build Lambda layers for the AWS Tools for PowerShell.

You can also take advantage of the event-driven nature of Lambda to run Lambda functions in response to events. The demo application shows how a clothing printing company builds a PowerShell serverless application to manage its t-shirt size and color inventory.

See the accompanying GitHub repository, which contains the code for the custom runtime, along with additional installation options and additional examples.

Start running PowerShell on Lambda today.

For more serverless learning resources, visit Serverless Land.

Build a big data Lambda architecture for batch and real-time analytics using Amazon Redshift

Post Syndicated from Jagadish Kumar original https://aws.amazon.com/blogs/big-data/build-a-big-data-lambda-architecture-for-batch-and-real-time-analytics-using-amazon-redshift/

With real-time information about customers, products, and applications in hand, organizations can take action as events happen in their business application. For example, you can prevent financial fraud, deliver personalized offers, and identify and prevent failures before they occur in near real time. Although batch analytics provides abilities to analyze trends and process data at scale that allow processing data in time intervals (such as daily sales aggregations by individual store), real-time analytics is optimized for low-latency analytics, ensuring that data is available for querying in seconds. Both paradigms of data processing operate in silos, which results in data redundancy and operational overhead to maintain them. A big data Lambda architecture is a reference architecture pattern that allows for the seamless coexistence of the batch and near-real-time paradigms for large-scale data for analytics.

Amazon Redshift allows you to easily analyze all data types across your data warehouse, operational database, and data lake using standard SQL. In this post, we collect, process, and analyze data streams in real time. With data sharing, you can share live data across Amazon Redshift clusters for read purposes with relative security and ease out of the box. In this post, we discuss how we can harness the data sharing ability of Amazon Redshift to set up a big data Lambda architecture to allow both batch and near-real-time analytics.

Solution overview

Example Corp. is a leading electric automotive company that revolutionized the automotive industry. Example Corp. operationalizes the connected vehicle data and improves the effectiveness of various connected vehicle and fleet use cases, including predictive maintenance, in-vehicle service monetization, usage-based insurance. and delivering exceptional driver experiences. In this post, we explore the real-time and trend analytics using the connected vehicle data to illustrate the following use cases:

  • Usage-based insurance – Usage-based insurance (UBI) relies on analysis of near-real-time data from the driver’s vehicle to access the risk profile of the driver. In addition, it also relies on the historical analysis (batch) of metrics (such as the number of miles driven in a year). The better the driver, the lower the premium.
  • Fleet performance trends – The performance of a fleet (such as a taxi fleet) relies on the analysis of historical trends of data across the fleet (batch) as well as the ability to drill down to a single vehicle within the fleet for near-real-time analysis of metrics like fuel consumption or driver distraction.

Architecture overview

In this section, we discuss the overall architectural setup for the Lambda architecture solution.

The following diagram shows the implementation architecture and the different computational layers:

  • Data ingestion from AWS IoT Core
  • Batch layer
  • Speed layer
  • Serving layer

Data ingestion

Vehicle telemetry data is ingested into the cloud through AWS IoT Core and routed to Amazon Kinesis Data Streams. The Kinesis Data Streams layer acts as a separation layer for the speed layer and batch layer, where the incoming telemetry is consumed by the speed layer’s Amazon Redshift cluster and Amazon Kinesis Data Firehose, respectively.

Batch layer

Amazon Kinesis Data Firehose is a fully managed service that can batch, compress, transform, and encrypt your data streams before loading them into your Amazon Simple Storage Service (Amazon S3) data lake. Kinesis Data Firehose also allows you to specify a custom expression for the Amazon S3 prefix where data records are delivered. This provides the ability to filter the partitioned data and control the amount of data scanned by each query, thereby improving performance and reducing cost.

The batch layer persists data in Amazon S3 and is accessed directly by an Amazon Redshift Serverless endpoint (serving layer). With Amazon Redshift Serverless, you can efficiently query and retrieve structured and semistructured data from files in Amazon S3 without having to load the data into Amazon Redshift tables.

The batch layer can also optionally precompute results as batch views from the immutable Amazon S3 data lake and persist them as either native tables or materialized views for very high-performant use cases. You can create these precomputed batch views using AWS Glue, Amazon Redshift stored procedures, Amazon Redshift materialized views, or other options.

The batch views can be calculated as:

batch view = function (all data)

In this solution, we build a batch layer for Example Corp. for two types of queries:

  • rapid_acceleration_by_year – The number of rapid accelerations by each driver aggregated per year
  • total_miles_driven_by_year – The total number of miles driven by the fleet aggregated per year

For demonstration purposes, we use Amazon Redshift stored procedures to create the batch views as Amazon Redshift native tables from external tables using Amazon Redshift Spectrum.

Speed layer

The speed layer processes data streams in real time and aims to minimize latency by providing real-time views into the most recent data.

Amazon Redshift Streaming Ingestion uses SQL to connect with one or more Kinesis data streams simultaneously. The native streaming ingestion feature in Amazon Redshift lets you ingest data directly from Kinesis Data Streams and enables you to ingest hundreds of megabytes of data per second and query it at exceptionally low latency—in many cases only 10 seconds after entering the data stream.

The speed cluster uses materialized views to materialize a point-in-time view of a Kinesis data stream, as accumulated up to the time it is queried. The real-time views are computed using this layer, which provide a near-real-time view of the incoming telemetry stream.

The speed views can be calculated as a function of recent data unaccounted for in the batch views:

speed view = function (recent data)

We calculate the speed views for these batch views as follows:

  • rapid_acceleration_realtime – The number of rapid accelerations by each driver for recent data not accounted for in the batch view rapid_acceleration_by_month
  • miles_driven_realtime – The number of miles driven by each driver for recent data not in miles_driven_by_month

Serving layer

The serving layer comprises an Amazon Redshift Serverless endpoint and any consumption services such as Amazon QuickSight or Amazon SageMaker.

Amazon Redshift Serverless (preview) is a serverless option of Amazon Redshift that makes it easy to run and scale analytics in seconds without the need to set up and manage data warehouse infrastructure. With Amazon Redshift Serverless, any user—including data analysts, developers, business professionals, and data scientists—can get insights from data by simply loading and querying data in the data warehouse.

Amazon Redshift data sharing enables instant, granular, and fast data access across Amazon Redshift clusters without the need to maintain redundant copies of data.

The speed cluster provides outbound data shares of the real-time materialized views to the Amazon Redshift Serverless endpoint (serving cluster).

The serving cluster joins data from the batch layer and speed layer to get near-real-time and historical data for a particular function with minimal latency. The consumption layer (such as Amazon API Gateway or QuickSight) is only aware of the serving cluster, and all the batch and stream processing is abstracted from the consumption layer.

We can view the queries to the speed layer from data consumption layer as follows:

query = function (batch views, speed views)

Deploy the CloudFormation template

We have provided an AWS CloudFormation template to demonstrate the solution. You can download and use this template to easily deploy the required AWS resources. This template has been tested in the us-east-1 Region.

The template requires you to provide the following parameters:

  • DatabaseName – The name of the first database to be created for speed cluster
  • NumberOfNodes – The number of compute nodes in the cluster.
  • NodeType – The type of node to be provisioned
  • MasterUserName – The user name that is associated with the master user account for the cluster that is being created
  • MasterUserPassword – The password that is associated with the master user account
  • InboundTraffic – The CIDR range to allow inbound traffic to the cluster
  • PortNumber – The port number on which the cluster accepts incoming connections
  • SQLForData – The source query to extract from AWS IOT Core topic

Prerequisites

When setting up this solution and using your own application data to push to Kinesis Data Streams, you can skip setting up the IoT Device Simulator and start creating your Amazon Redshift Serverless endpoint. This post uses the simulator to create related database objects and assumes use of the simulator in the solution walkthrough.

Set up the IoT Device Simulator

We use the IoT Device simulator to generate and simulate vehicle IoT data. The solution allows you to create and simulate hundreds of connected devices, without having to configure and manage physical devices or develop time-consuming scripts.

Use the following CloudFormation template to create the IoT Device Simulator in your account for trying out this solution.

Configure devices and simulations

To configure your devices and simulations, complete the following steps:

  1. Use the login information you received in the email you provided to log in to the IoT Device Simulator.
  2. Choose Device Types and Add Device Type.
  3. Choose Automotive Demo.
  4. For Device type name, enter testVehicles.
  5. For Topic, enter the topic where the sensor data is sent to AWS IoT Core.
  6. Save your settings.
  7. Choose Simulations and Add simulation.
  8. For Simulation name, enter testSimulation.
  9. For Simulation type¸ choose Automotive Demo.
  10. For Select a device type¸ choose the device type you created (testVehicles).
  11. For Number of devices, enter 15.

You can choose up to 100 devices per simulation. You can configure a higher number of devices to simulate large data.

  1. For Data transmission interval, enter 1.
  2. For Data transmission duration, enter 300.

This configuration runs the simulation for 5 minutes.

  1. Choose Save.

Now you’re ready to simulate vehicle telemetry data to AWS IoT Core.

Create an Amazon Redshift Serverless endpoint

The solution uses an Amazon Redshift Serverless endpoint as the serving layer cluster. You can set up Amazon Redshift Serverless in your account.

Set up Amazon Redshift Query Editor V2

To query data, you can use Amazon Redshift Query Editor V2. For more information, refer to Introducing Amazon Redshift Query Editor V2, a Free Web-based Query Authoring Tool for Data Analysts.

Get namespaces for the provisioned speed layer cluster and Amazon Redshift Serverless

Connect to speed-cluster-iot (the speed layer cluster) through Query Editor V2 and run the following SQL:

select current_namespace; -- (Save as <producer_namespace>)

Similarly, connect to the Amazon Redshift Serverless endpoint and get the namespace:

select current_namespace; -- (Save as <consumer_namespace>)

You can also get this information via the Amazon Redshift console.

Now that we have all the prerequisites set up, let’s go through the solution walkthrough.

Implement the solution

The workflow includes the following steps:

  1. Start the IoT simulation created in the previous section.

The vehicle IoT is simulated and ingested through IoT Device Simulator for the configured number of vehicles. The raw telemetry payload is sent to AWS IoT Core, which routes the data to Kinesis Data Streams.

At the batch layer, data is directly put from Kinesis Data Streams to Kinesis Data Firehose, which converts the data to parquet and delivers to Amazon with the prefix s3://<Bucketname>/vehicle_telematics_raw/year=<>/month=<>/day=<>/.

  1. When the simulation is complete, run the pre-created AWS Glue crawler vehicle_iot_crawler on the AWS Glue console.

The serving layer Amazon Redshift Serverless endpoint can directly access data from the Amazon S3 data lake through Redshift Spectrum external tables. In this demo, we compute batch views through Redshift Spectrum and store them as Amazon Redshift tables using Amazon Redshift stored procedures.

  1. Connect to the Amazon Redshift Serverless endpoint through Query Editor V2 and create the stored procedures using the following SQL script.
  2. Run the two stored procedures to create the batch views:
call rapid_acceleration_by_year_sp();
call total_miles_driven_by_year_sp();

The two stored procedures create batch views as Amazon Redshift native tables:

    • batchlayer_rapid_acceleration_by_year
    • batchlayer_total_miles_by_year

You can also schedule these stored procedures as batch jobs. For more information, refer to Scheduling SQL queries on your Amazon Redshift data warehouse.

At the speed layer, the incoming data stream is read and materialized by the speed layer Amazon Redshift cluster in the materialized view vehicleiotstream_mv.

  1. Connect to the provisioned speed-cluster-iot and run the following SQL script to create the required objects.

Two real-time views are created from this materialized view:

    • batchlayer_rapid_acceleration_by_year
    • batchlayer_total_miles_by_year
  1. Refresh the materialized view vehicleiotstream_mv at the required interval, which triggers Amazon Redshift to read from the stream and load data into the materialized view.
    REFRESH MATERIALIZED VIEW vehicleiotstream_mv;

Refreshes are currently manual, but can be automated using the query scheduler.

The real-time views are shared as an outbound data share by the speed cluster to the serving cluster.

  1. Connect to speed-cluster-iot and create an outbound data share (producer) with the following SQL:
    -- Create Datashare from Primary (Producer) to Serverless (Consumer)
    CREATE DATASHARE speedlayer_datashare SET PUBLICACCESSIBLE TRUE;
    ALTER DATASHARE speedlayer_datashare ADD SCHEMA public;
    ALTER DATASHARE speedlayer_datashare ADD ALL TABLES IN SCHEMA public;
    GRANT USAGE ON DATASHARE speedlayer_datashare TO NAMESPACE '<consumer_namespace>'; -- (replace with consumer namespace created in prerequisites 5)

  2. Connect to speed-cluster-iot and create an inbound data share (consumer) with the following SQL:
    CREATE DATABASE vehicleiot_shareddb FROM DATASHARE speedlayer_datashare OF NAMESPACE '< producer_namespace >'; -- (replace with producer namespace created in prerequisites 5)

Now that the real-time views are available for the Amazon Redshift Serverless endpoint, we can run queries to get real-time metrics or historical trends with up-to-date data by accessing the batch and speed layers and joining them using the following queries.

For example, to calculate total rapid acceleration by year with up-to-the-minute data, you can run the following query:

-- Rapid Acceleration By Year

select SUM(rapid_acceleration) rapid_acceleration, vin, year from 
(
select rapid_acceleration, vin,year
  from public.batchlayer_rapid_acceleration_by_year batch
union all
select rapid_acceleration, vin,year
from speedlayer_shareddb.public.speedlayer_rapid_acceleration_by_year speed)
group by VIN, year;

Similarly, to calculate total miles driven by year with up-to-the-minute data, run the following query:

-- Total Miles Driven By Year

select SUM(total_miles) total_miles_driven , year from 
(
select total_miles, year
  from public.batchlayer_total_miles_by_year batch
union all
select total_miles, year
from speedlayer_shareddb.public.speedlayer_total_miles_by_year speed)
group by year;

For only access to real-time data to power daily dashboards, you can run queries against real-time views shared to your Amazon Redshift Serverless cluster.

For example, to calculate the average speed per trip of your fleet, you can run the following SQL:

select CAST(measuretime as DATE) "date",
vin,
trip_id,
avg(vehicleSpeed)
from speedlayer_shareddb.public.vehicleiotstream_mv 
group by vin, date, trip_id;

Because this demo uses the same data as a quick start, there are duplicates in this demonstration. In actual implementations, the serving cluster manages the data redundancy and duplication by creating views with date predicates that consume non-overlapping data from batch and real-time views and provide overall metrics to the consumption layer.

You can consume the data with QuickSight for dashboards, with API Gateway for API-based access, or via the Amazon Redshift Data API or SageMaker for AI and machine learning (ML) workloads. This is not included as part of the provided CloudFormation template.

Best practices

In this section, we discuss some best practices and lessons learned when using this solution.

Provisioned vs. serverless

The speed layer is a continuous ingestion layer reading data from the IoT streams often running 24/7 workloads. There is less idle time and variability in the workloads and it is advantageous to have a provisioned cluster supporting persistent workloads that can scale elastically.

The serving layer can be provisioned (in case of 24/7 workloads) or Amazon Redshift Serverless in case of sporadic or ad hoc workloads. In this post, we assumed sporadic workloads, so serverless is the best fit. In addition, the serving layer can house multiple Amazon Redshift clusters, each consuming their data share and serving downstream applications.

RA3 instances for data sharing

Amazon Redshift RA3 instances enable data sharing to allow you to securely and easily share live data across Amazon Redshift clusters for reads. You can combine the data that is ingested in near-real time with the historical data using the data share to provide personalized driving characteristics to determine the insurance recommendation.

You can also grant fine-grained access control to the underlying data in the producer to the consumer cluster as needed. Amazon Redshift offers comprehensive auditing capabilities using system tables and AWS CloudTrail to allow you to monitor the data sharing permissions and usage across all the consumers and revoke access instantly when necessary. The permissions are granted by the superusers from both the producer and the consumer clusters to define who gets access to what objects, similar to the grant commands used in the earlier section. You can use the following commands to audit the usage and activities for the data share.

Track all changes to the data share and the shared database imported from the data share with the following code:

Select username, share_name, recordtime, action, 
         share_object_type, share_object_name 
  from svl_datashare_change_log
   order by recordtime desc;

Track data share access activity (usage), which is relevant only on the producer, with the following code:

Select * from svl_datashare_usage;

Pause and Resume

You can pause the producer cluster when batch processing is complete to save costs. The pause and resume actions on Amazon Redshift allow you to easily pause and resume clusters that may not be in operation at all times. It allows you to create a regularly-scheduled time to initiate the pause and resume actions at specific times or you can manually initiate a pause and later a resume. Flexible on-demand pricing and per-second billing gives you greater control of costs of your Redshift compute clusters while maintaining your data in a way that is simple to manage.

Materialized views for fast access to data

Materialized views allow pre-composed results from complex queries on large tables for faster access. The producer cluster exposes data as materialized views to simplify access for the consumer cluster. This also allows flexibility at the producer cluster to update the underlying table structure to address new business use cases, without affecting consumer-dependent queries and enabling a loose coupling.

Conclusion

In this post, we demonstrated how to process and analyze large-scale data from streaming and batch sources using Amazon Redshift as the core of the platform guided by the Lambda architecture principles.

You started by collecting real-time data from connected vehicles, and storing the streaming data in an Amazon S3 data lake through Kinesis Data Firehose. The solution simultaneously processes the data for near-real-time analysis through Amazon Redshift streaming ingestion.

Through the data sharing feature, you were able to share live, up-to-date data to an Amazon Redshift Serverless endpoint (serving cluster), which merges the data from the speed layer (near-real time) and batch layer (batch analysis) to provide low-latency access to data from near-real-time analysis to historical trends.

Click here to get started with this solution today and let us know how you implemented this solution in your organization through the comments section.


About the Authors

Jagadish Kumar is a Sr Analytics Specialist Solutions Architect at AWS. He is deeply passionate about Data Architecture and helps customers build analytics solutions at scale on AWS. He is an avid college football fan and enjoys reading, watching sports and riding motorcycle.

Thiyagarajan Arumugam is a Big Data Solutions Architect at Amazon Web Services and designs customer architectures to process data at scale. Prior to AWS, he built data warehouse solutions at Amazon.com. In his free time, he enjoys all outdoor sports and practices the Indian classical drum mridangam.

Eesha Kumar is an Analytics Solutions Architect with AWS. He works with customers to realize business value of data by helping them building solutions leveraging AWS platform and tools.

Orchestrating Amazon S3 Glacier Deep Archive object retrieval using AWS Step Functions

Post Syndicated from Eric Johnson original https://aws.amazon.com/blogs/compute/orchestrating-amazon-s3-glacier-deep-archive-object-retrieval-using-aws-step-functions/

This blog was written by Monica Cortes Sack, Solutions Architect, Oskar Neumann, Partner Solutions Architect, and Dhiraj Mahapatro, Principal Specialist SA, Serverless.

AWS Step Functions now support over 220 services and over 10,000 AWS API actions. This enables you to use the AWS SDK integration directly instead of writing an AWS Lambda function as a proxy.

One such service integration is with Amazon S3. Currently, you write scripts using AWS CLI S3 commands to achieve automation around running S3 tasks. For example, S3 integrates with AWS Transfer Family, builds a custom security check, takes action on S3 buckets on S3 object creations, or orchestrates a workflow around S3 Glacier Deep Archive object retrieval. These script executions do not provide an execution history or an easy way to validate the behavior.

Step Functions’ AWS SDK integration with S3 declaratively creates serverless workflows around S3 tasks without relying on those scripts. You can validate the execution history and behavior of a Step Functions workflow.

This blog highlights one of the S3 use cases. It shows how to orchestrate workflows around S3 Glacier Deep Archive object retrieval, cost estimation, and interaction with the requester using Step Functions. The demo application provides additional details on the entire architecture.

S3 Glacier Deep Archive is a storage class in S3 used for data that is rarely accessed. The service provides durable and secure long-term storage, trading immediate access for cost-effectiveness. You must restore archived objects before they are downloadable. It supports two options for object retrieval:

  1. Standard – Access objects within 12 hours of the start of the restoration process.
  2. Bulk – Access objects within 48 hours of the start of the restoration process.

Business use case

Consider a research institute that stores backups on S3 Glacier Deep Archive. The backups are maintained in S3 Glacier Deep Archive for redundancy. The institute has multiple researchers with one central IT team. When a researcher requests an object from S3 Glacier Deep Archive, the central IT team retrieves it and charges the corresponding research group for retrieval and data transfer costs.

Researchers are the end users and do not operate in the AWS Cloud. They run computing clusters on-premises and depend on the central IT team to provide them with the restored archive. A member of the research team requesting an object retrieval provides the following information to the central IT team:

  1. Object key to be restored.
  2. The number of days the researcher needs the object accessible for download.
  3. Researcher’s email address.
  4. Retrieve within 12 or 48 hours SLA. This determines whether “Standard” or “Bulk” retrieval respectively.

The following overall architecture explains the setup on AWS and the interaction between a researcher and the central IT team’s architecture.

Architecture overview

Architecture diagram

Architecture diagram

  1. The researcher uses a front-end application to request object retrieval from S3 Glacier Deep Archive.
  2. Amazon API Gateway synchronously invokes AWS Step Functions Express Workflow.
  3. Step Functions initiates RestoreObject from S3 Glacier Deep Archive.
  4. Step Functions stores the metadata of this retrieval in an Amazon DynamoDB table.
  5. Step Functions uses Amazon SES to email the researcher about archive retrieval initiation.
  6. Upon completion, S3 sends the RestoreComplete event to Amazon EventBridge.
  7. EventBridge rule triggers another Step Functions for post-processing after the restore is complete.
  8. A Lambda function inside the Step Functions calculates the estimated cost (retrieval and data transfer out) and updates existing metadata in the DynamoDB table.
  9. Sync data from DynamoDB table using Amazon Athena Federated Queries to generate reports dashboard in Amazon QuickSight.
  10. Step Functions uses SES to email the researcher with cost details.
  11. Once the researcher receives an email, the researcher uses the front-end application to call the /download API endpoint.
  12. API Gateway invokes a Lambda function that generates a pre-signed S3 URL of the retrieved object and returns it in the response.

Setup

  1. To run the sample application, you must install CDK v2, Node.js, and npm.
  2. To clone the repository, run:
    git clone https://github.com/aws-samples/aws-stepfunctions-examples.git
    cd cdk/app-glacier-deep-archive-retrieval
  3. To deploy the application, run:
    cdk deploy --all

Identifying workflow components

Starting the restore object workflow

The first component is accepting the researcher’s request to start the archive retrieval process. The sample application created from the demo provides a basic front-end app that shows the files from an S3 bucket that has objects stored in S3 Glacier Deep Archive. The researcher retrieves file requests from the front-end application reached by the sample application’s Amazon CloudFront URL.

Glacier Deep Archive Retrieval menu

Glacier Deep Archive Retrieval menu

The front-end app asks the researcher for an email address, the number of days the researcher wants the object to be available for download, and their ETA on retrieval speed. Based on the retrieval speed, the researcher accepts either Standard or Bulk object retrieval. To test this, put objects in the data bucket under the S3 Glacier Deep Archive storage class and use the front-end application to retrieve them.

Item retrieval prompt

Item retrieval prompt

The researcher then chooses the Retrieve file. This action invokes an API endpoint provided by API Gateway. The API Gateway endpoint synchronously invokes a Step Functions Express Workflow. This validates the restore object request, gets the object metadata, and starts to restore the object from S3 Glacier Deep Archive.

The state machine stores the metadata of the restore object AWS SDK call in a DynamoDB table for later use. You can use this metadata to build a dashboard in Amazon QuickSight for reporting and administration purposes. Finally, the state machine uses Amazon SES to email the researcher, notifying them about the restore object initiation process:

Restore object initiated

Restore object initiated

The following state machine shows the workflow:

Workflow diagram

Workflow diagram

The ability to use S3 APIs declaratively using AWS SDK from Step Functions makes it convenient to integrate with S3. This approach avoids writing a Lambda function to wrap the SDK calls. The following portion of the state machine definition shows the usage of S3 HeadObject and RestoreObject APIs:

"Get Object Metadata": {
    "Next": "Initiate Restore Object from Deep Archive",
    "Catch": [{
        "ErrorEquals": ["States.ALL"],
        "Next": "Bad Request"
    }],
    "Type": "Task",
    "ResultPath": "$.result.metadata",
    "Resource": "arn:aws:states:::aws-sdk:s3:headObject",
    "Parameters": {
        "Bucket": "glacierretrievalapp-databucket-abc123",
        "Key.$": "$.fileKey"
    }
}, 
"Initiate Restore Object from Deep Archive": {
    "Next": "Update restore operation metadata",
    "Type": "Task",
    "ResultPath": null,
    "Resource": "arn:aws:states:::aws-sdk:s3:restoreObject",
    "Parameters": {
        "Bucket": "glacierretrievalapp-databucket-abc123",
        "Key.$": "$.fileKey",
        "RestoreRequest": {
            "Days.$": "$.requestedForDays"
        }
    }
}

You can extend the previous workflow and build your own Step Functions workflows to orchestrate other S3 related workflows.

Processing after object restoration completion

S3 RestoreObject is a long-running process for S3 Glacier Deep Archive objects. S3 emits a RestoreCompleted event notification on the object restore completion to EventBridge. You set up an EventBridge rule to trigger another Step Functions workflow as a target for this event. This workflow takes care of the object restoration post-processing.

cfnDataBucket.addPropertyOverride('NotificationConfiguration.EventBridgeConfiguration.EventBridgeEnabled', true);

An EventBridge rule triggers the following Step Functions workflow and passes the event payload as an input to the Step Functions execution:

new aws_events.Rule(this, 'invoke-post-processing-rule', {
  eventPattern: {
    source: ["aws.s3"],
    detailType: [
      "Object Restore Completed"
    ],
    detail: {
      bucket: {
        name: [props.dataBucket.bucketName]
      }
    }
  },
  targets: [new aws_events_targets.SfnStateMachine(this.stateMachine, {
    input: aws_events.RuleTargetInput.fromObject({
      's3Event': aws_events.EventField.fromPath('$')
    })
  })]
});

The Step Functions workflow gets object metadata from the DynamoDB table and then invokes a Lambda function to calculate the estimated cost. The Lambda function calculates the estimated retrieval and the data transfer costs using the contentLength of the retrieved object and the Price List API for the unit cost. The workflow then updates the calculated cost in the DynamoDB table.

The retrieval cost and the data transfer out cost are proportional to the size of the retrieved object. The Step Functions workflow also invokes a Lambda function to create the download API URL for object retrieval. Finally, it emails the researcher with the estimated cost and the download URL as a restoration completion notification.

Workflow studio diagram

Workflow studio diagram

The email notification to the researcher looks like:

Email example

Email example

Downloading the restored object

Once the object restoration is complete, the researcher can download the object from the front-end application.

Front end retrieval menu

Front end retrieval menu

The researcher chooses the Download action, which invokes another API Gateway endpoint. The endpoint integrates with a Lambda function as a backend that creates a pre-signed S3 URL sent as a response to the browser.

Administering object restoration usage

This architecture also provides a view for the central IT team to understand object restoration usage. You achieve this by creating reports and dashboards from the metadata stored in DynamoDB.

The sample application uses Amazon Athena Federated Queries and Amazon Athena DynamoDB Connector to generate a reports dashboard in Amazon QuickSight. You can also use Step Functions AWS SDK integration with Amazon Athena and visualize the workflows in the Athena console.

The following QuickSight visualization shows the count of restored S3 Glacier Deep Archive objects by their contentType:

QuickSite visualization

QuickSight visualization

Considerations

With the preceding approach, you should consider that:

  1. You must start the object retrieval in the same Region as the Region of the archived object.
  2. S3 Glacier Deep Archive only supports standard and bulk retrievals.
  3. You must enable the “Object Restore Completed” event notification on the S3 bucket with the S3 Glacier Deep Archive object.
  4. The researcher’s email must be verified in SES.
  5. Use a Lambda function for the Price List GetProducts API as the service endpoints are available in specific Regions.

Cleanup

To clean up the infrastructure used in this sample application, run:

cdk destroy --all

Conclusion

Step Functions’ AWS SDK integration opens up different opportunities to orchestrate a workflow. Step Functions provides native support for retries and error handling, which offloads the heavy lifting of handling them manually in scripts.

This blog shows one example use case with S3 Glacier Deep Archive. With AWS SDK integration in Step Functions, you can build any workflow orchestration using S3 or S3 control APIs. For example, a workflow to enforce AWS Key Management Service encryption based on an S3 event, or create static website hosting on-demand in a few steps.

With different S3 API calls available in Step Functions’ Workflow Studio, you can declaratively build a Step Functions workflow instead of imperatively calling each S3 API from a shell script or command line. Refer to the demo application for more details.

For more serverless learning resources, visit Serverless Land.

Simplify your ETL and ML pipelines using the Amazon Athena UNLOAD feature

Post Syndicated from Dylan Qu original https://aws.amazon.com/blogs/big-data/simplify-your-etl-and-ml-pipelines-using-the-amazon-athena-unload-feature/

Many organizations prefer SQL for data preparation because they already have developers for extract, transform, and load (ETL) jobs and analysts preparing data for machine learning (ML) who understand and write SQL queries. Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

By default, Athena automatically writes SELECT query results in CSV format to Amazon S3. However, you might often have to write SELECT query results in non-CSV files such as JSON, Parquet, and ORC for various use cases. In this post, we walk you through the UNLOAD statement in Athena and how it helps you implement several use cases, along with code snippets that you can use.

Athena UNLOAD overview

CSV is the only output format used by the Athena SELECT query, but you can use UNLOAD to write the output of a SELECT query to the formats and compression that UNLOAD supports. When you use UNLOAD in a SELECT query statement, it writes the results into Amazon S3 in specified data formats of Apache Parquet, ORC, Apache Avro, TEXTFILE, and JSON.

Although you can use the CTAS statement to output data in formats other than CSV, those statements also require the creation of a table in Athena. The UNLOAD statement is useful when you want to output the results of a SELECT query in a non-CSV format but don’t require the associated table. For example, a downstream application might require the results of a SELECT query to be in JSON format, and Parquet or ORC might provide a performance advantage over CSV if you intend to use the results of the SELECT query for additional analysis.

In this post, we walk you through the following use cases for the UNLOAD feature:

  • Compress Athena query results to reduce storage costs and speed up performance for downstream consumers
  • Store query results in JSON file format for specific downstream consumers
  • Feed downstream Amazon SageMaker ML models that require files as input
  • Simplify ETL pipelines with AWS Step Functions without creating a table

Use case 1: Compress Athena query results

When you’re using Athena to process and create large volumes of data, storage costs can increase significantly if you don’t compress the data. Furthermore, uncompressed formats like CSV and JSON require you to store and transfer a large number of files across the network, which can increase IOPS and network costs. To reduce costs and improve downstream big data processing application performance such as Spark applications, a best practice is to store Athena output into compressed columnar compressed file formats such as ORC and Parquet.

You can use the UNLOAD statement in your Athena SQL statement to create compressed ORC and Parquet file formats. In this example, we use a 3 TB TPC-DS dataset to find all items returned between a store and a website. The following query joins the four tables: item, store_returns, web_returns, and customer_address:

UNLOAD (
		select *
		from store_returns, item, web_returns, customer_address
		where sr_item_sk = i_item_sk and
		wr_item_sk = i_item_sk and
		wr_refunded_addr_sk = ca_address_sk
	) to 's3://your-bucket/temp/athenaunload/usecase1/' with (
		format = 'PARQUET',
		compression = 'SNAPPY',
		partitioned_by = ARRAY['ca_location_type']
		
	)

The resulting query output when Snappy compressed and stored in Parquet format resulted in a 62 GB dataset. The same output in a non-compressed CSV format resulted in a 248 GB dataset. The Snappy compressed Parquet format yielded a 75% smaller storage size, thereby saving storage costs and resulting in faster performance.

Use case 2: Store query results in JSON file format

Some downstream systems to Athena such as web applications or third-party systems require the data formats to be in JSON format. The JSON file format is a text-based, self-describing representation of structured data that is based on key-value pairs. It’s lightweight, and is widely used as a data transfer mechanism by different services, tools, and technologies. In these use cases, the UNLOAD statement with the parameter format value of JSON can unload files in JSON file format to Amazon S3.

The following SQL extracts the returns data for a specific customer within a specific data range against the 3 TB catalog_returns table and stores it to Amazon S3 in JSON format:

UNLOAD (
		select cr_returned_date_sk, cr_returning_customer_sk, cr_catalog_page_sk, cr_net_loss
		from catalog_returns
		where cr_returned_date_sk = 2450821 and cr_returning_customer_sk = 11026691
	) to 's3://your-bucket/temp/athenaunload/usecase2/' with (
		format = 'JSON', compression = 'NONE'
	)

By default, Athena uses Gzip for JSON and TEXTFILE formats. You can set the compression to NONE to store the UNLOAD result without any compression. The query result is stored as the following JSON file:

{"cr_returned_date_sk":2450821,"cr_returning_customer_sk":11026691,"cr_catalog_page_sk":20.8,"cr_net_loss":53.31}

The query result can now be consumed by a downstream web application.

Use case 3: Feed downstream ML models

Analysts and data scientists rely on Athena for ad hoc SQL queries, data discovery, and analysis. They often like to quickly create derived columns such as aggregates or other features. These need to be written as files in Amazon S3 so a downstream ML model can directly read the files without having to rely on a table.

You can also parametrize queries using Athena prepared statements that are repetitive. Using the UNLOAD statement in a prepared statement provides the self-service capability to less technical users or analysts and data scientists to export files needed for their downstream analysis without having to write queries.

In the following example, we create derived columns and feature engineer for a downstream SageMaker ML model that predicts the best discount for catalog items in future promotions. We derive averages for quantity, list price, discount, and sales price for promotional items sold in stores where the promotion is not offered by mail or a special event. Then we restrict the results to a specific gender, marital, and educational status. We use the following query:

UNLOAD(
		Select i_item_id, 
	        avg(ss_quantity) avg_sales,
	        avg(ss_list_price) avg_list_price,
	        avg(ss_coupon_amt) avg_coupon_amt,
	        avg(ss_sales_price) avg_sales_price 
	 from store_sales, customer_demographics, date_dim, item, promotion
	 where cast(ss_sold_date_sk AS int) = d_date_sk and
	       ss_item_sk = i_item_sk and
	       ss_cdemo_sk = cd_demo_sk and
	       ss_promo_sk = p_promo_sk and
	       cd_gender = 'M' and 
	       cd_marital_status = 'M' and
	       cd_education_status = '4 yr Degree' and
	       (p_channel_email = 'N' or p_channel_event = 'N') and
	       d_year = 2001 
	 group by i_item_id
	 order by i_item_id
	) to 's3://your-bucket/temp/athenaunload/usecase3/' with (
		format = 'PARQUET',compression = 'SNAPPY'
	)

The output is written as Parquet files in Amazon S3 for a downstream SageMaker model training job to consume.

Use case 4: Simplify ETL pipelines with Step Functions

Step Functions is integrated with the Athena console to facilitate building workflows that include Athena queries and data processing operations. This helps you create repeatable and scalable data processing pipelines as part of a larger business application and visualize the workflows on the Athena console.

In this use case, we provide an example query result in Parquet format for downstream consumption. In this example, the raw data is in TSV format and gets ingested on a daily basis. We use the Athena UNLOAD statement to convert the data into Parquet format. After that, we send the location of the Parquet file as an Amazon Simple Notification Service (Amazon SNS) notification. Downstream applications can be notified via SNS to take further actions. One common example is to initiate a Lambda function that uploads the Athena transformation result into Amazon Redshift.

The following diagram illustrates the ETL workflow.

The workflow includes the following steps:

  1. Start an AWS Glue crawler pointing to the raw S3 bucket. The crawler updates the metadata of the raw table with new files and partitions.
  2. Invoke a Lambda function to clean up the previous UNLOAD result. This step is required because UNLOAD doesn’t write data to the specified location if the location already has data in it (UNLOAD doesn’t overwrite existing data). To reuse a bucket location as a destination for UNLOAD, delete the data in the bucket location, and then run the query again. Another common pattern is to UNLOAD data to a new partition with incremental data processing.
  3. Start an Athena UNLOAD query to convert the raw data into Parquet.
  4. Send a notification to downstream data consumers when the file is updated.

Set up resources with AWS CloudFormation

To prepare for querying both data sources, launch the provided AWS CloudFormation template:

Keep all the provided parameters and choose Create stack.

The CloudFormation template creates the following resources:

  • An Athena workgroup etl-workgroup, which holds the Athena UNLOAD queries.
  • A data lake S3 bucket that holds the raw table. We use the Amazon Customer Reviews Dataset in this post.
  • An Athena output S3 bucket that holds the UNLOAD result and query metadata.
  • An AWS Glue database.
  • An AWS Glue crawler pointing to the data lake S3 bucket.
  • A LoadDataBucket Lambda function to load the Amazon Customer Reviews raw data into the S3 bucket.
  • A CleanUpS3Folder Lambda function to clean up previous Athena UNLOAD result.
  • An SNS topic to notify downstream systems when the UNLOAD is complete.

When the stack is fully deployed, navigate to the Outputs tab of the stack on the AWS CloudFormation console and note the value of the following resources:

  • AthenaWorkgroup
  • AthenaOutputBucket
  • CleanUpS3FolderLambda
  • GlueCrawler
  • SNSTopic

Build a Step Functions workflow

We use the Athena Workflows feature to build the ETL pipeline.

  1. On the Athena console, under Jobs in the navigation pane, choose Workflows.
  2. Under Create Athena jobs with Step Functions workflows, for Query large datasets, choose Get started.
  3. Choose Create your own workflow.
  4. Choose Continue.

The following is a screenshot of the default workflow. Compare the default workflow against the earlier ETL workflow we described. The default workflow doesn’t contain a Lambda function invocation and has an additional GetQueryResult step.

Next, we add a Lambda Invoke step.

  1. Search for Lambda Invoke in the search bar.
  2. Choose the Lambda:Invoke step and drag it to above the Athena: StartQueryExecution step.
  3. Choose the Athena:GetQueryResults step (right-click) and choose Delete state.

  4. Now the workflow aligns with the earlier design.
  5. Choose the step Glue: StartCrawler.
  6. In the Configuration section, under API Parameters, enter the following JSON (provide the AWS Glue crawler name from the CloudFormation stack output):
    {
      "Name": "GlueCrawler"
    }

  7. Choose the step Glue: GetCrawler.
  8. In the Configuration section, under API Parameters, enter the following JSON:
    {
      "Name": "GlueCrawler"
    }

  9. Choose the step Lambda: Invoke.
  10. In the Configuration section, under API Parameters, for Function name, choose the function -CleanUpS3FolderLambda-.
  11. In the Payload section, enter the following JSON (include the Athena output bucket from the stack output):
    {
      "bucket_name": “AthenaOutputBucket”,
      "prefix": "parquet/"
    }

  12. Choose the step Athena: StartQueryExecution.
  13. In the right Configuration section, under API Parameters, enter the following JSON (provide the Athena output bucket and workgroup name):
    {
      "QueryString": "UNLOAD (SELECT * FROM \"athena_unload_blog\".\"reviews\" )  TO 's3://AthenaOutputBucket/parquet' WITH (format = 'PARQUET',compression = 'SNAPPY')",
      "WorkGroup": “AthenaWorkgroup”
    }

Notice the Wait for task to complete check box is selected. This pauses the workflow while the Athena query is running.

  1. Choose the step SNS: Publish.
  2. In the Configuration section, under API Parameters, for Topic, pick the SNSTopic created by the CloudFormation template.
  3. In the Message section, enter the following JSON to pass the data manifest file location to the downstream consumer:
    {
      "Input.$": "$.QueryExecution.Statistics.DataManifestLocation"
    }

For more information, refer to the GetQueryExecution response syntax.

  1. Choose Next.
  2. Review the generated code and choose Next.
  3. In the Permissions section, choose Create new role.
  4. Review the auto-generated permissions and choose Create state machine.
  5. In the Add additional permissions to your new execution role section, choose Edit role in IAM.
  6. Add permissions and choose Attach policies.
  7. Search for the AWSGlueConsoleFullAccess managed policy and attach it.

This policy grants full access to AWS Glue resources when using the AWS Management console. Generate a policy based on access activity in production following the least privilege principle.

Test the workflow

Next, we test out the Step Functions workflow.

  1. On the Athena console, under Jobs in the navigation pane, choose Workflows.
  2. Under State machines, choose the workflow we just created.
  3. Choose Execute, then choose Start execution to start the workflow.
  4. Wait until the workflow completes, then verify there are UNLOAD Parquet files in the bucket AthenaOutputBucket.

Clean up

To help prevent unwanted charges to your AWS account, delete the AWS resources that you used in this post.

  1. On the Amazon S3 console, choose the -athena-unload-data-lake bucket.
  2. Select all files and folders and choose Delete.
  3. Enter permanently delete as directed and choose Delete objects.
  4. Repeat these steps to remove all files and folders in the -athena-unload-output bucket.
  5. On the AWS CloudFormation console, delete the stack you created.
  6. Wait for the stack status to change to DELETE_COMPLETE.

Conclusion

In this post, we introduced the UNLOAD statement in Athena with some common use cases. We demonstrated how to compress Athena query results to reduce storage costs and improve performance, store query results in JSON file format, feed downstream ML models, and create and visualize ETL pipelines with Step Functions without creating a table.

To learn more, refer to the Athena UNLOAD documentation and Visualizing AWS Step Functions workflows from the Amazon Athena console.


About the Authors

Dylan Qu is a Specialist Solutions Architect focused on Big Data & Analytics with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Harsha Tadiparthi is a Principal Solutions Architect focused on providing analytics and AI/ML strategies and solution designs to customers.

Detecting data drift using Amazon SageMaker

Post Syndicated from Shibu Nair original https://aws.amazon.com/blogs/architecture/detecting-data-drift-using-amazon-sagemaker/

As companies continue to embrace the cloud and digital transformation, they use historical data in order to identify trends and insights. This data is foundational to power tools, such as data analytics and machine learning (ML), in order to achieve high quality results.

This is a time where major disruptions are not only lasting longer, but also happening more frequently, as discussed in a McKinsey article on risk and resilience. Any disruption—a pandemic, hurricane, or even blocked sailing routes—has a major impact on the patterns of data and can create anomalous behavior.

ML models are dependent on data insights to help plan and support production-ready applications. With any disruptions, data drift can occur. Data drift is unexpected and undocumented changes to data structure, semantics, and/or infrastructure. If there is data drift, the model performance will degrade and no longer provide an accurate guidance. To mitigate the effects of the disruption, data drift needs to be detected and the ML models quickly trained and adjusted accordingly.

This blog post explains how to approach changing data patterns in the age of disruption and how to mitigate its effects on ML models. We also discuss the steps of building a feedback loop to capture the request data in the production environment and create a data pipeline to store the data for profiling and baselining. Then, we explain how Amazon SageMaker Clarify can help detect data drift.

How to detect data drift

There are three stages to detecting data drift: data quality monitoring, model quality monitoring, and drift evaluation (see Figure 1).

Stages in detecting data drift

Figure 1. Stages in detecting data drift

Data quality monitoring establishes a profile of the input data during model training, and then continuously compares incoming data with the profile. Deviations in the data profile signal a drift in the input data.

You can also detect drift through model quality monitoring, which requires capturing actual values that can be compared with the predictions. For example, using weekly demand forecasting, you can compare the forecast quantities one week later with the actual demand. Some use cases can require extra steps to collect actual values. For example, product recommendations may require you to ask a selected group of consumers for their feedback to the recommendation.

SageMaker Clarify provides insights into your trained models, including importance of model features and any biases towards certain segments of the input data. Changes of these attributes between re-trained models also signal drift. Drift evaluation constitutes the monitoring data and mechanisms to detect changes and triggering consequent actions. With Amazon CloudWatch, you can define rules and thresholds that prompt drift notifications.

Figure 2 illustrates a basic architecture with the data sources for training and production (on the left) and the observed data concerning drift (on the right). You can use Amazon SageMaker Data Wrangler, a visual data preparation tool, to clean and normalize your input data for your ML task. You can store the features that you defined for your models in the Amazon SageMaker Feature Store, a fully managed, purpose-built repository to store, update, retrieve, and share ML features.

The white, rectangular boxes in the architecture diagram represent the tasks for detecting data and model drift. You can integrate those tasks into your ML workflow with Amazon SageMaker Pipelines.

Basic architecture on how data drift is detected using Amazon SageMaker

Figure 2. Basic architecture on how data drift is detected using Amazon SageMaker

The drift observation data can be captured in tabular format, such as comma-separated values or Parquet, on Amazon Simple Storage Service (S3) and analyzed with Amazon Athena and Amazon QuickSight.

How to build a feedback loop

The baselining task establishes a data profile from training data. It uses Amazon SageMaker Model Monitor and runs before training or re-training the model. The baseline profile is stored on Amazon S3 to be referenced by the data drift monitoring job.

The data drift monitoring task continuously profiles the input data, compares it with baseline, and the results are captured in CloudWatch. This tasks runs on its own computation resources using Deequ, which checks that the monitoring job does not slow down your ML inference flow and scales with the data. The frequency of running this task can be adjusted to control cost, which can depend on how rapidly you anticipate that the data may change.

The model quality monitoring task computes model performance metrics from actuals and predicted values. The origin of these data points depends on the use case. Demand forecasting use cases naturally capture actuals that can be used to validate past predictions. Other use cases can require extra steps to acquire ground-truth data.

CloudWatch is a monitoring and observability service with which you can define rules to act on deviation in model performance or data drift. With CloudWatch, you can setup alerts to users via e-mail or SMS, and it can automatically start the ML model re-training process.

Run the baseline task on your updated data set before re-training your model. Use the SageMaker model registry to catalog your ML models for production, manage model versions, and control the associate training metrics.

Gaining insight into data and models

SageMaker Clarify provides greater visibility into your training data and models, helping identify and limit bias and explain predictions. For example, the trained models may consider some features more strongly than others when generating predictions. Compare the feature importance and bias between model-provided versions for a better understanding of the changes.

Conclusion

As companies continue to use data analytics and ML to inform daily activity, data drift may become a more common occurrence. Recognizing that drift can have a direct impact on models and production-ready applications, it is important to architect to identify potential data drift and avoid downgrading the models and negatively impacting results. Failure to capture changes in data can result in loss of process confidence, downgraded model accuracy, or a bottom-line impact to the business.

Secure data movement across Amazon S3 and Amazon Redshift using role chaining and ASSUMEROLE

Post Syndicated from Sudipta Mitra original https://aws.amazon.com/blogs/big-data/secure-data-movement-across-amazon-s3-and-amazon-redshift-using-role-chaining-and-assumerole/

Data lakes use a ring of purpose-built data services around a central data lake. Data needs to move between these services and data stores easily and securely. The following are some examples of such services:

  • Amazon Simple Storage Service (Amazon S3), which stores structured, unstructured, and semi-structured data
  • Amazon Redshift, a fully managed, petabyte-scale data warehouse product to analyze large-scale structured and semi-structured data across data warehouses and operational databases
  • Amazon SageMaker, which consumes data for machine learning (ML) capabilities

In multi-tenant architectures, groups or users within a group may require exclusive permissions to the group’s S3 bucket and also the schema and tables belonging to Amazon Redshift. These teams also need to be able to control loading and unloading of data between the team-owned S3 buckets and Amazon Redshift schemas. Additionally, individual users within the team may require fine-grained control over objects in S3 buckets and specific schemas in Amazon Redshift. Implementing this permissions control use case should be scalable as more teams and users are onboarded and permission-separation requirements evolve.

Amazon Redshift and Amazon S3 provide a unified, natively integrated storage layer for data lakes. You can move data between Amazon Redshift and Amazon S3 using the Amazon Redshift COPY and UNLOAD commands.

This post presents an approach that you can apply at scale to achieve fine-grained access controls to resources in S3 buckets and Amazon Redshift schemas for tenants, including groups of users belonging to the same business unit down to the individual user level. This solution provides tenant isolation and data security. In this approach, we use the bridge model to store data and control access for each tenant at the individual schema level in the same Amazon Redshift database. We utilize ASSUMEROLE and role chaining to provide fine-grained access control when data is being copied and unloaded between Amazon Redshift and Amazon S3, so the data flows within each tenant’s namespace. Role chaining also streamlines the new tenant onboarding process.

Solution overview

In this post, we explore how to achieve resource isolation, data security, scaling to multiple tenants, and fine-grained access control at the individual user level for teams that access, store, and move data across storage using Amazon S3 and Amazon Redshift.

We use the bridge model to store data and control access for each tenant at the individual schema level in the same Amazon Redshift database. In the bridge model, a separate database schema is created for each tenant, and data for each tenant is stored in its own schema. The tenant has access only to its own schema.

We use the COPY and UNLOAD commands to load and unload data into the Amazon Redshift cluster using an S3 bucket. These commands require Amazon Redshift to access Amazon S3 on your behalf, and security credentials are provided to your clusters.

We create an AWS Identity and Access Management (IAM) role—we call it the Amazon Redshift onboarding role—and associate it with the Amazon Redshift cluster. For each tenant, we create a tenant-specific IAM role—we call it the tenant role—to define the fine-grained access to its own Amazon S3 resources. The Amazon Redshift onboarding role doesn’t have any permissions granted except allowing sts:AssumeRole to the tenant roles. The trust relationship to the Amazon Redshift onboarding role is defined in each of the tenant roles. We use the Amazon Redshift ASSUMEROLE privilege to control IAM role access privileges for database users and groups on COPY and UNLOAD commands.

Each tenant database user or group is granted ASSUMEROLE on the Amazon Redshift onboarding role and its own tenant role, which restricts the tenant to access its own Amazon S3 resources when using COPY and UNLOAD commands. We use role chaining when ASSUMEROLE is granted. This means that the tenant role isn’t required to be attached to the Amazon Redshift cluster, and the only IAM role associated is the Amazon Redshift onboarding role. Role chaining streamlines the new tenant onboarding process. With role chaining, we don’t need to modify the cluster; we can make modifications on the tenant IAM role definition when onboarding a new tenant.

For our use case, we have two tenants: team 1 and team 2. A tenant here represents a group of users—a team from the same business unit. We want separate S3 buckets and Amazon Redshift schemas for each team. These teams should be able to access their own data only and also be able to support fine-grained access control over copying and unloading data from Amazon S3 to Amazon Redshift (and vice versa) within the team boundary. We can apply access control at the individual user level using the same approach.

The following architecture diagram shows the AWS resources and process flow of our solution.

In this tutorial, you create two S3 buckets, two Amazon Redshift tenant schemas, two Amazon Redshift tenant groups, one Amazon Redshift onboarding role, and two tenant roles. Then you grant ASSUMEROLE on the onboarding and tenant role to each tenant, using role chaining. To verify that each tenant can only access its own S3 resources, you create two Amazon Redshift users assigned to their own tenant group and run COPY and UNLOAD commands.

Prerequisites

To follow along with this solution, you need the following prerequisites:

Download the source code to your local environment

To implement this solution in your local development environment, you can download the source code from the GitHub repo or clone the source code using the following command:

git clone https://github.com/aws-samples/amazon-redshift-assume-role-sample.git

The following files are included in the source code:

  • redshift-onboarding-role.cf.yaml – A CloudFormation template to deploy the Amazon Redshift onboarding role redshift-onboarding-role
  • redshift-tenant-resources.cf.yaml – A CloudFormation template to deploy an S3 bucket, KMS key, and IAM role for each tenant you want to onboard

Provision an IAM role for Amazon Redshift and attach this role to the Amazon Redshift cluster

Deploy the template redshift-onboarding-role.cf.yaml using the AWS CloudFormation console or the AWS Command Line Interface (AWS CLI). For more information about stack creation, see Create the stack. This template doesn’t have any required parameters. The stack provisions an IAM role named redshift-s3-onboarding-role for Amazon Redshift. The following code is the policy defining sts:AssumeRole to the tenant-specific IAM roles:

{
  "Version": "2012-10-17",
  "Statement": [
  {
   "Action": [
     "sts:AssumeRole"
    ],
   "Resource": [
     "arn:aws:iam::xxxxxxxxxxxx:role/*-tenant-redshift-s3-access-role"
    ],
   "Effect": "Allow"
   }
  ]
}

Navigate to the Amazon Redshift console and select the cluster you want to update. On the Actions menu, choose Manage IAM roles. Choose the role redshift-s3-onboarding-role to associate with the cluster. For more information, see Associate the IAM role with your cluster.

Provision the IAM role and resources for tenants

Deploy the template redshift-tenant-resources.cf.yaml using the AWS CloudFormation console or the AWS CLI. For this post, you deploy the stack twice, supplying two unique tenant names for TenantName. For example, you can use team1 and team2 as the TenantName parameter values.

For each tenant, the stack provisions the following resources:

  • A KMS key
  • An S3 bucket named team1-data-<account id>-<region> with default encryption enabled with SSE-KMS using the created key
  • An IAM role named team1-tenant-redshift-s3-access-role

The policy attached to the role team1-tenant-redshift-s3-access-role can only access the team’s own S3 bucket. The role redshift-s3-onboarding-role is trusted to assume all tenant roles *-tenant-redshift-s3-access-role to enable role chaining. The tenant role *-tenant-redshift-s3-access-role has a trust relationship to redshift-s3-onboarding-role. See the following policy code:

        {
            "Action": [
                "s3:List*",
                "s3:Get*",
                "s3:Put*"
            ],
            "Resource": [
                "arn:aws:s3:::team1-data-<account id>-<region>/*",
                "arn:aws:s3:::team1-data-<account id>-<region>"
            ],
            "Effect": "Allow"
        }

Create a tenant schema and tenant user with appropriate privileges

For this post, you create the following Amazon Redshift database objects using the query editor on the Amazon Redshift console or a SQL client tool like SQL Workbench/J. Replace <password> with your password and <account id> with your AWS account ID before running the following SQL statements:

create schema team1;
create schema team2;

create group team1_grp;
create group team2_grp;

create user team1_usr with password '<password>' in group team1_grp;
create user team2_usr with password '<password>' in group team2_grp;

grant usage on schema team1 to group team1_grp;
grant usage on schema team2 to group team2_grp;

GRANT ALL ON SCHEMA team1 TO group team1_grp;
GRANT ALL ON SCHEMA team2 TO group team2_grp;

revoke assumerole on all from public for all;

grant assumerole
on 'arn:aws:iam::<account id>:role/redshift-s3-onboarding-role,arn:aws:iam::<account-id>:role/team1-tenant-redshift-s3-access-role'
to group team1_grp for copy;

grant assumerole
on 'arn:aws:iam::<account id>:role/redshift-s3-onboarding-role,arn:aws:iam::<account id>:role/team1-tenant-redshift-s3-access-role'
to group team1_grp for unload;

grant assumerole
on 'arn:aws:iam::<account id>:role/redshift-s3-onboarding-role,arn:aws:iam::<account id>:role/team2-tenant-redshift-s3-access-role'
to group team2_grp for copy;

grant assumerole
on 'arn:aws:iam::<account id>:role/redshift-s3-onboarding-role,arn:aws:iam::<account id>:role/team2-tenant-redshift-s3-access-role'
to group team2_grp for unload;

commit;

Verify that each tenant can only access its own resources

To verify your access control settings, you can create a test table in each tenant schema and upload a file to the tenant’s S3 bucket using the following commands. You can use the Amazon Redshift query editor or a SQL client tool.

  1. Sign in as team1_usr and enter the following commands:
    CREATE TABLE TEAM1.TEAM1_VENUE(
    VENUEID SMALLINT,
    VENUENAME VARCHAR(100),
    VENUECITY VARCHAR(30),
    VENUESTATE CHAR(2),
    VENUESEATS INTEGER
    ) DISTSTYLE EVEN;
    
    commit;

  2. Sign in as team2_usr and enter the following commands:
    CREATE TABLE TEAM2.TEAM2_VENUE(
    VENUEID SMALLINT,
    VENUENAME VARCHAR(100),
    VENUECITY VARCHAR(30),
    VENUESTATE CHAR(2),
    VENUESEATS INTEGER
    ) DISTSTYLE EVEN;
    
    commit;

  3. Create a file named test-venue.txt with the following contents:
    7|BMO Field|Toronto|ON|0
    16|TD Garden|Boston|MA|0
    23|The Palace of Auburn Hills|Auburn Hills|MI|0
    28|American Airlines Arena|Miami|FL|0
    37|Staples Center|Los Angeles|CA|0
    42|FedExForum|Memphis|TN|0
    52|PNC Arena|Raleigh|NC|0
    59|Scotiabank Saddledome|Calgary|AB|0
    66|SAP Center|San Jose|CA|0
    73|Heinz Field|Pittsburgh|PA|65050

  4. Upload this file to both team1 and team2 S3 buckets.
  5. Sign in as team1_usr and enter the following commands to test Amazon Redshift COPY and UNLOAD:
    copy team1.team1_venue
    from 's3://team1-data-<account id>-<region>/'
    iam_role 'arn:aws:iam::<account id>:role/redshift-s3-onboarding-role,arn:aws:iam::<account id>:role/team1-tenant-redshift-s3-access-role'
    delimiter '|' ;
    
    unload ('select * from team1.team1_venue')
    to 's3://team1-data-<account id>-<region>/unload/' 
    iam_role 'arn:aws:iam::<account id>:role/redshift-s3-onboarding-role,arn:aws:iam::<account id>:role/team1-tenant-redshift-s3-access-role';

The file test-venue.txt uploaded to the team1 bucket is copied to the table team1_venue in the team1 schema, and the data in table team1_venue is unloaded to the team1 bucket successfully.

  1. Replace team1 with team2 in the preceding commands and then run them again, this time signed in as team2_usr.

If you’re signed in as team1_usr and try to access the team2 S3 bucket or team2 schema or table and team2 IAM role when running COPY and UNLOAD, you get an access denied error. You get the same error if trying to access team1 resources while logged in as team2_usr.

Clean up

To clean up the resources you created, delete the CloudFormation stack created in your AWS account.

Conclusion

In this post, we presented a solution to achieve role-based secure data movement between Amazon S3 and Amazon Redshift. This approach combines with the ASSUMEROLE feature in Amazon Redshift to allow fine-grained access control over the COPY and UNLOAD commands down to the individual user level within a particular team. This in turn provides finer control over resource isolation and data security in a multi-tenant solution. Many use cases can benefit from this solution as more enterprises build data platforms to provide the foundations for highly scalable, customizable, and secure data consumption models.


About the Authors

Sudipta Mitra is a Senior Data Architect for AWS, and passionate about helping customers to build modern data analytics applications by making innovative use of latest AWS services and their constantly evolving features. A pragmatic architect who works backwards from customer needs, making them comfortable with the proposed solution, helping achieve tangible business outcomes. His main areas of work are Data Mesh, Data Lake, Knowledge Graph, Data Security and Data Governance.

Michelle Deng is a Sr. Data Architect at Amazon Web Services. She works with AWS customers to provide guidance and technical assistance about database migrations and Big data projects.

Jared Cook is a Sr. Cloud Infrastructure Architect at Amazon Web Services. He is committed to driving business outcomes in the cloud, and uses Infrastructure as Code and DevOps best practices to build resilient architectures on AWS.  In his leisure time, Jared enjoys the outdoors, music, and plays the drums.

Lisa Matacotta is a Senior Customer Solutions Manager at Amazon Web Services. She works with AWS customers to help customers achieve business and strategic goals, understand their biggest challenges and provide guidance based on AWS best practices to overcome them.

Best practices to optimize data access performance from Amazon EMR and AWS Glue to Amazon S3

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/best-practices-to-optimize-data-access-performance-from-amazon-emr-and-aws-glue-to-amazon-s3/

Customers are increasingly building data lakes to store data at massive scale in the cloud. It’s common to use distributed computing engines, cloud-native databases, and data warehouses when you want to process and analyze your data in data lakes. Amazon EMR and AWS Glue are two key services you can use for such use cases. Amazon EMR is a managed big data framework that supports several different applications, including Apache Spark, Apache Hive, Presto, Trino, and Apache HBase. AWS Glue Spark jobs run on top of Apache Spark, and distribute data processing workloads in parallel to perform extract, transform, and load (ETL) jobs to enrich, denormalize, mask, and tokenize data on a massive scale.

For data lake storage, customers typically use Amazon Simple Storage Service (Amazon S3) because it’s secure, scalable, durable, and highly available. Amazon S3 is designed for 11 9’s of durability and stores over 200 trillion objects for millions of applications around the world, making it the ideal storage destination for your data lake. Amazon S3 averages over 100 million operations per second, so your applications can easily achieve high request rates when using Amazon S3 as your data lake.

This post describes best practices to achieve the performance scaling you need when analyzing data in Amazon S3 using Amazon EMR and AWS Glue. We specifically focus on optimizing for Apache Spark on Amazon EMR and AWS Glue Spark jobs.

Optimizing Amazon S3 performance for large Amazon EMR and AWS Glue jobs

Amazon S3 is a very large distributed system, and you can scale to thousands of transactions per second in request performance when your applications read and write data to Amazon S3. Amazon S3 performance isn’t defined per bucket, but per prefix in a bucket. Your applications can achieve at least 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix in a bucket. Additionally, there are no limits to the number of prefixes in a bucket, so you can horizontally scale your read or write performance using parallelization. For example, if you create 10 prefixes in an S3 bucket to parallelize reads, you could scale your read performance to 55,000 read requests per second. You can similarly scale writes by writing data across multiple prefixes.

You can scale performance by utilizing automatic scaling in Amazon S3 and scan millions of objects for queries run over petabytes of data. Amazon S3 automatically scales in response to sustained new request rates, dynamically optimizing performance. While Amazon S3 is internally optimizing for a new request rate, you receive HTTP 503 request responses temporarily until the optimization completes:

AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown)

Such situations require the application to retry momentarily, but after Amazon S3 internally optimizes performance for the new request rate, all requests are generally served without retries. One such situation is when multiple workers in distributed compute engines such as Amazon EMR and AWS Glue momentarily generate a high number of requests to access data under the same prefix.

When using Amazon EMR and AWS Glue to process data in Amazon S3, you can employ certain best practices to manage request traffic and avoid HTTP Slow Down errors. Let’s look at some of these strategies.

Best practices to manage HTTP Slow Down responses

You can use the following approaches to take advantage of the horizontal scaling capability in Amazon S3 and improve the success rate of your requests when accessing Amazon S3 data using Amazon EMR and AWS Glue:

  • Modify the retry strategy for Amazon S3 requests
  • Adjust the number of Amazon S3 objects processed
  • Adjust the number of concurrent Amazon S3 requests

We recommend choosing and applying the options that fit best for your use case to optimize data processing on Amazon S3. In the following sections, we describe best practices of each approach.

Modify the retry strategy for Amazon S3 requests

This is the easiest way to avoid HTTP 503 Slow Down responses and improve the success rate of your requests. To access Amazon S3 data, both Amazon EMR and AWS Glue use the EMR File System (EMRFS), which retries Amazon S3 requests with jitters when it receives 503 Slow Down responses. To improve the success rate of your Amazon S3 requests, you can adjust your retry strategy by configuring certain properties. In Amazon EMR, you can configure parameters in your emrfs-site configuration. In AWS Glue, you can configure the parameters in job parameters. You can adjust your retry strategy in the following ways:

  • Increase the EMRFS default retry limit – By default, EMRFS uses an exponential backoff strategy to retry requests to Amazon S3. The default EMRFS retry limit is 15. However, you can increase this limit when you create a new cluster, on a running cluster, or at application runtime. To increase the retry limit, you can change the value of the fs.s3.maxRetries parameter. Note that you may experience longer job duration if you set a higher value for this parameter. We recommend experimenting with different values, such as 20 as a starting point, confirm the duration overhead of the jobs for each value, and then adjust this parameter based on your requirement.
  • For Amazon EMR, use the AIMD retry strategy – With Amazon EMR versions 6.4.0 and later, EMRFS supports an alternative retry strategy based on an additive-increase/multiplicative-decrease (AIMD) model. This strategy can be useful in shaping the request rate from large clusters. Instead of treating each request in isolation, this mode keeps track of the rate of recent successful and throttled requests. Requests are limited to a rate determined from the rate of recent successful requests. This decreases the number of throttled requests, and therefore the number of attempts needed per request. To enable the AIMD retry strategy, you can set the fs.s3.aimd.enabled property to true. You can further refine the AIMD retry strategy using the advanced AIMD retry settings.

Adjust the number of Amazon S3 objects processed

Another approach is to adjust the number of Amazon S3 objects processed so you have fewer requests made concurrently. When you lower the number of objects to be processed in a job, you use fewer Amazon S3 requests, thereby lowering the request rate or transactions per second (TPS) required for each job. Note the following considerations:

  • Preprocess the data by aggregating multiple smaller files into fewer, larger chunks – For example, use s3-dist-cp or an AWS Glue compaction blueprint to merge a large number of small files (generally less than 64 MB) into a smaller number of optimally sized files (such as 128–512 MB). This approach reduces the number of requests required, while simultaneously improving the aggregate throughput to read and process data in Amazon S3. You may need to experiment to arrive at the optimal size for your workload, because creating extremely large files can reduce the parallelism of the job.
  • Use partition pruning to scan data under specific partitions – In Apache Hive and Hive Metastore-compatible applications such as Apache Spark or Presto, one table can have multiple partition folders. Partition pruning is a technique to scan only the required data in a specific partition folder of a table. It’s useful when you want to read a specific portion from the entire table. To take advantage of predicate pushdown, you can use partition columns in the WHERE clause in Spark SQL or the filter expression in a DataFrame. In AWS Glue, you can also use a partition pushdown predicate when creating DynamicFrames.
  • For AWS Glue, enable job bookmarks – You can use AWS Glue job bookmarks to process continuously ingested data repeatedly. It only picks unprocessed data from the previous job run, thereby reducing the number of objects read or retrieved from Amazon S3.
  • For AWS Glue, enable bounded executionsAWS Glue bounded execution is a technique to only pick unprocessed data, with an upper bound on the dataset size or the number of files to be processed. This is another way to reduce the number of requests made to Amazon S3.

Adjust the number of concurrent Amazon S3 requests

To adjust the number of Amazon S3 requests to have fewer concurrent reads per prefix, you can configure Spark parameters. By default, Spark populates 10,000 tasks to list prefixes when creating Spark DataFrames. You may experience Slow Down responses, especially when you read from a table with highly nested prefix structures. In this case, it’s a good idea to configure Spark to limit the number of maximum listing parallelism by decreasing the parameter spark.sql.sources.parallelPartitionDiscovery.parallelism (the default is 10000).

To have fewer concurrent write requests per prefix, you can use the following techniques:

  • Reduce the number of Spark RDD partitions before writes – You can do this by using df.repartition(n) or df.coalesce(n) in DataFrames. For Spark SQL, you can also use query hints like REPARTITION or COALESCE. You can see the number of tasks (=RDD partitions) on the Spark UI.
  • For AWS Glue, group the input data – If the datasets are made up of small files, we recommend grouping the input data because it reduces the number of RDD partitions, and reduces the number of Amazon S3 requests to write the files.
  • Use the EMRFS S3-optimized committer – The EMRFS S3-optimized committer is used by default in Amazon EMR 5.19.0 and later, and AWS Glue 3.0. In AWS Glue 2.0, you can configure it in the job parameter --enable-s3-parquet-optimized-committer. The committer uses Amazon S3 multipart uploads instead of renaming files, and it usually reduces the number of HEAD/LIST requests significantly.

The following are other techniques to adjust the Amazon S3 request rate in Amazon EMR and AWS Glue. These options have the net effect of reducing parallelism of the Spark job, thereby reducing the probability of Amazon S3 Slow Down responses, although it can lead to longer job duration. We recommend testing and adjusting these values for your use case.

  • Reduce the number of concurrent jobs – Start with the most read/write heavy jobs. If you configured cross-account access for Amazon S3, keep in mind that other accounts might also be submitting jobs to the prefix.
  • Reduce the number of concurrent Spark tasks – You have several options:
    • For Amazon EMR, set the number of Spark executors (for example, the spark-submit option --num-executors and Spark parameter spark.executor.instance).
    • For AWS Glue, set the number of workers in the NumberOfWorkers parameter.
    • For AWS Glue, change the WorkerType parameter to a smaller one (for example, G.2X to G.1X).
    • Configure Spark parameters:
      • Decrease the number of spark.default.parallelism.
      • Decrease the number of spark.sql.shuffle.partitions.
      • Increase the number of spark.task.cpus (the default is 1) to allocate more CPU cores per Spark task.

Conclusion

In this post, we described the best practices to optimize data access from Amazon EMR and AWS Glue to Amazon S3. With these best practices, you can easily run Amazon EMR and AWS Glue jobs by taking advantage of Amazon S3 horizontal scaling, and process data in a highly distributed way at a massive scale.

For further guidance, please reach out to AWS Premium Support.

Appendix A: Configure CloudWatch request metrics

To monitor Amazon S3 requests, you can enable request metrics in Amazon CloudWatch for the bucket. Then, define a filter for the prefix. For a list of useful metrics to monitor, see Monitoring metrics with Amazon CloudWatch. After you enable metrics, use the data in the metrics to determine which of the aforementioned options is best for your use case.

Appendix B: Configure Spark parameters

To configure Spark parameters in Amazon EMR, there are several options:

  • spark-submit command – You can pass Spark parameters via the --conf option.
  • Job script – You can set Spark parameters in the SparkConf object in the job script codes.
  • Amazon EMR configurations – You can configure Spark parameters via API using Amazon EMR configurations. For more information, see Configure Spark.

To configure Spark parameters in AWS Glue, you can configure AWS Glue job parameters using key --conf with value like spark.hadoop.fs.s3.maxRetries=50.

To set multiple configs, configure your job parameters using key --conf with value like spark.hadoop.fs.s3.maxRetries=50 --conf spark.task.cpus=2.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is passionate about releasing AWS Glue connector custom blueprints and other software artifacts to help customers build their data lakes. In his spare time, he enjoys watching hermit crabs with his children.

Aditya Kalyanakrishnan is a Senior Product Manager on the Amazon S3 team at AWS. He enjoys learning from customers about how they use Amazon S3 and helping them scale performance. Adi’s based in Seattle, and in his spare time enjoys hiking and occasionally brewing beer.

Building a serverless cloud-native EDI solution with AWS

Post Syndicated from Ripunjaya Pattnaik original https://aws.amazon.com/blogs/architecture/building-a-serverless-cloud-native-edi-solution-with-aws/

Electronic data interchange (EDI) is a technology that exchanges information between organizations in a structured digital form based on regulated message formats and standards. EDI has been used in healthcare for decades on the payer side for determination of coverage and benefits verification. There are different standards for exchanging electronic business documents, like American National Standards Institute X12 (ANSI), Electronic Data Interchange for Administration, Commerce and Transport (EDIFACT), and Health Level 7 (HL7).

HL7 is the standard to exchange messages between enterprise applications, like a Patient Administration System and a Pathology Laboratory Information. However, HL7 messages are embedded in Health Insurance Portability and Accountability Act (HIPAA) X12 for transactions between enterprises, like hospital and insurance companies.

HIPAA is a federal law that required the creation of national standards to protect sensitive patient health information from being disclosed without the patient’s consent or knowledge. It also mandates healthcare organizations to follow a standardized mechanism of EDI to submit and process insurance claims.

In this blog post, we will discuss how you can build a serverless cloud-native EDI implementation on AWS using the Edifecs XEngine Server.

EDI implementation challenges

Due to its structured format, EDI facilitates the consistency of business information for all participants in the exchange process. The primary EDI software that is used processes the information and then translates it into a more readable format. This can be imported directly and automatically into your integration systems. Figure 1 shows a high-level transaction for a healthcare EDI process.

EDI Transaction Sets exchanges between healthcare provider and payer

Figure 1. EDI Transaction Sets exchanges between healthcare provider and payer

Along with the implementation itself, the following are some of the common challenges encountered in EDI system development:

  1. Scaling. Despite the standard protocols of EDI, the document types and business rules differ across healthcare providers. You must scale the scope of your EDI judiciously to handle a diverse set of data rules with multiple EDI protocols.
  2. Flexibility in EDI integration. As standards evolve, your EDI system development must reflect those changes.
  3. Data volumes and handling bad data. As the volume of data increases, so does the chance for errors. Your storage plans must adjust as well.
  4. Agility. In healthcare, EDI handles business documents promptly, as real-time document delivery is critical.
  5. Compliance. State Medicaid and Medicare rules and compliance can be difficult to manage. HIPAA compliance and CAQH CORE certifications can be difficult to acquire.

Solution overview and architecture data flow

Providers and Payers can send requests as enrollment inquiry, certification request, or claim encounter to one another. This architecture uses these as source data requests coming from the Providers and Payers as flat files (.txt and .csv), Active Message Queues, and API calls (submitters).

The steps for the solution shown in Figure 2 are as follows:

1. Flat, on-premises files are transferred to Amazon Simple Storage Service (S3) buckets using AWS Transfer Family (2).
3. AWS Fargate on Amazon Elastics Container Service (Amazon ECS) runs Python packages to convert the transactions into JSON messages, then queues it on Amazon MQ (4).
5. Java Message Service (JMS) Bridge, which runs Apache Camel on Fargate, pulls the messages from the on-premises messaging systems and queues them on Amazon MQ (6).
7. Fargate also runs programs to call the on-premises API or web services to get the transactions and queues it on Amazon MQ (8).
9. Amazon CloudWatch monitors the queue depth. If queue depth goes beyond a set threshold, CloudWatch sends notifications to the containers through Amazon Simple Notification Service (SNS) (10).
11. Amazon SNS triggers AWS Lambda, which adds tasks to Fargate (12), horizontally scaling it to handle the spike.
13. Fargate runs Python programs to read the messages on Amazon MQ and uses PYX12 packages to convert the JSON messages to EDI file formats, depending on the type of transactions.
14. The container also may queue the EDI requests on different queues, as the solution uses multiple trading partners for these requests.
15. The solution runs Edifecs XEngine Server on Fargate with Docker image. This polls the messages from the queues previously mentioned and converts them to EDI specification by the trading partners that are registered with Edifecs.
16. Python module running on Fargate converts the response from the trading partners to JSON.
17. Fargate sends JSON payload as a POST request using Amazon API Gateway, which updates requestors’ backend systems/databases (12) that are running microservices on Amazon ECS (11).
18. The solution also runs Elastic Load Balancing to balance the load across the Amazon ECS cluster to take care of any spikes.
19. Amazon ECS runs microservices that uses Amazon RDS (20) for domain specific data.

EDI transaction-processing system architecture on AWS

Figure 2. EDI transaction-processing system architecture on AWS

Handling PII/PHI data

The EDI request and response file includes protected health information (PHI)/personal identifiable information (PII) data related to members, claims, and financial transactions. The solution leverages all AWS services that are HIPAA eligible and encrypts data at rest and in-transit. The file transfers are through FTP, and the on-premises request/response files are Pretty Good Privacy (PGP) encrypted. The Amazon S3 buckets are secured through bucket access policies and are AES-256 encrypted.

Amazon ECS tasks that are hosted in Fargate use ephemeral storage that is encrypted with AES-256 encryption, using an encryption key managed by Fargate. User data stored in Amazon MQ is encrypted at rest. Amazon MQ encryption at rest provides enhanced security by encrypting data using encryption keys stored in the AWS Key Management Service. All connections between Amazon MQ brokers use Transport Layer Security to provide encryption in transit. All APIs are accessed through API gateways secured through Amazon Cognito. Only authorized users can access the application.

The architecture provides many benefits to EDI processing:

  • Scalability. Because the solution is highly scalable, it can speed integration of new partner/provider requirements.
  • Compliance. Use the architecture to run sensitive, HIPAA-regulated workloads. If you plan to include PHI (as defined by HIPAA) on AWS services, first accept the AWS Business Associate Addendum (AWS BAA). You can review, accept, and check the status of your AWS BAA through a self-service portal available in AWS Artifact. Any AWS service can be used with a healthcare application, but only services covered by the AWS BAA can be used to store, process, and transmit protected health information under HIPAA.
  • Cost effective. Though serverless cost is calculated by usage, with this architecture you save as your traffic grows.
  • Visibility. Visualize and understand the flow of your EDI processing using Amazon CloudWatch to monitor your databases, queues, and operation portals.
  • Ownership. Gain ownership of your EDI and custom or standard rules for rapid change management and partner onboarding.

Conclusion

In this healthcare use case, we demonstrated how a combination of AWS services can be used to increase efficiency and reduce cost. This architecture provides a scalable, reliable, and secure foundation to develop your EDI solution, while using dependent applications. We established how to simplify complex tasks in order to manage and scale your infrastructure for a high volume of data. Finally, the solution provides for monitoring your workflow, services, and alerts.

For further reading:

Run AWS Glue crawlers using Amazon S3 event notifications

Post Syndicated from Pradeep Patel original https://aws.amazon.com/blogs/big-data/run-aws-glue-crawlers-using-amazon-s3-event-notifications/

The AWS Well-Architected Data Analytics Lens provides a set of guiding principles for analytics applications on AWS. One of the best practices it talks about is build a central Data Catalog to store, share, and track metadata changes. AWS Glue provides a Data Catalog to fulfill this requirement. AWS Glue also provides crawlers that automatically discover datasets stored in multiple source systems, including Amazon Redshift, Amazon DynamoDB, Amazon Simple Storage Service (Amazon S3), MongoDB, Amazon DocumentDB (with MongoDB compatibility), and various other data stores using JDBC. A crawler extracts schemas of tables from these sources and stores the information in the AWS Glue Data Catalog. You can run a crawler on-demand or on a schedule.

When you schedule a crawler to discover data in Amazon S3, you can choose to crawl all folders or crawl new folders only. In the first mode, every time the crawler runs, it scans data in every folder under the root path it was configured to crawl. This can be slow for large tables because on every run, the crawler must list all objects and then compare metadata to identify new objects. In the second mode, commonly referred as incremental crawls, every time the crawler runs, it processes only S3 folders that were added since the last crawl. Incremental crawls can reduce runtime and cost when used with datasets that append new objects with consistent schema on a regular basis.

AWS Glue also supports incremental crawls using Amazon S3 Event Notifications. You can configure Amazon S3 Event Notifications to be sent to an Amazon Simple Queue Service (Amazon SQS) queue, which the crawler uses to identify the newly added or deleted objects. With each run of the crawler, the SQS queue is inspected for new events; if none are found, the crawler stops. If events are found in the queue, the crawler inspects their respective folders and processes the new objects. This new mode reduces cost and crawler runtime to update large and frequently changing tables.

In this post, we present two design patterns to create a crawler pipeline using this new feature. A crawler pipeline refers to components required to implement incremental crawling using Amazon S3 Event Notifications.

Crawler pipeline design patterns

We define design patterns for the crawler pipeline based on a simple question: do I have any applications other than the crawler that consume S3 event notifications?

If the answer is no, you can send event notifications directly to an SQS queue that has no other consumers. The crawler consumes events from the queue.

If you have multiple applications that want to consume the event notifications, send the notifications directly to an Amazon Simple Notification Service (Amazon SNS) topic, and then broadcast them to an SQS queue. If you have an application or microservice that consumes notifications, you can subscribe it to the SNS topic. This way, you can populate metadata in the Data Catalog while still supporting use cases around the files ingested into the S3 bucket.

The following are some considerations for these options:

  • S3 event notifications can only be sent to standard Amazon SNS; Amazon SNS FIFO is not supported. Refer to Amazon S3 Event Notifications for more details.
  • Similarly, S3 event notifications sent to Amazon SNS can only be forwarded to standard SQS queues and not Amazon SQS FIFO queues. For more information, see FIFO topics example use case.
  • The AWS Identity and Access Management (IAM) role used by the crawler needs to include an IAM policy for Amazon SQS. We provide an example policy later in this post.

Let’s take a deeper look into each design pattern to understand the architecture and its pros and cons.

Option 1: Publish events to an SQS queue

The following diagram represents a design pattern where S3 event notifications are published directly to a standard SQS queue. First, you need to configure an SQS queue as a target for S3 event notification on the S3 bucket where the table you want to crawl is stored. Next, attach an IAM policy to the queue including permissions for Amazon S3 to send messages to Amazon SQS, and permissions for the crawler IAM role to read and delete messages from Amazon SQS. This approach is useful when the SQS queue is used only for incremental crawling and no other application or service is depending on it. The crawler removes events from the queue after they are processed, so they’re not available for other applications. The following diagram illustrates this architecture.

Figure 1: Crawler pipeline using Amazon SQS queue

Figure 1: Crawler pipeline using Amazon SQS queue

Option 2: Publish events to an SNS topic and forward to an SQS queue

The following diagram represents a design pattern where S3 event notifications are sent to an SNS topic, which are then forwarded to an SQS queue for the crawler to consume. First, you need to configure an SNS topic as a target for S3 event notification on the S3 bucket where the table you want to crawl is stored. Next, attach an IAM policy to the topic including permissions for Amazon S3 to send messages to Amazon SNS. Then, create an SQS queue and subscribe it to the SNS topic to receives S3 events. Finally, attach an IAM policy to the queue that includes permissions for Amazon SNS to publish messages to Amazon SQS and permissions for the crawler IAM role to read and delete messages from Amazon SQS. This approach is useful when other applications depend on the S3 event notifications. For more information about fanout capabilities in Amazon SNS, see Fanout S3 Event Notifications to Multiple Endpoints.

Figure 2 : Crawler pipeline using Amazon SNS topic and Amazon SQS queue

Figure 2: Crawler pipeline using Amazon SNS topic and Amazon SQS queue

Solution overview

It’s common to have multiple applications consuming S3 event notifications, so in this post we demonstrate how to implement the second design pattern using Amazon SNS and Amazon SQS.

We create the following AWS resources:

  • S3 bucket – The location where table data is stored. Event notifications are enabled.
  • SNS topic and access policy – Amazon S3 sends event notifications to the SNS topic. The topic must have a policy that gives permissions to Amazon S3.
  • SQS queue and access policy – The SNS topic publishes messages to SQS queue. The queue must have a policy that gives the SNS topic permission to write messages.
  • Three IAM policies – The policies are as follows:
    • SQS queue policy – Lets the crawler consume messages from the SQS queue.
    • S3 policy – Lets the crawler read files from the S3 bucket.
    • AWS Glue crawler policy – Lets the crawler make changes to the AWS Glue Data Catalog.
  • IAM role – The IAM role used by the crawler. This role uses the three preceding policies.
  • AWS Glue crawler – Crawls the table’s objects and updates the AWS Glue Data Catalog.
  • AWS Glue database – The database in the Data Catalog.
  • AWS Glue table – The crawler creates a table in the Data Catalog.

In the following sections, we walk you through the steps to create your resources and test the solution.

Create an S3 bucket and set up a folder

To create your Amazon S3 resources, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. For Bucket name, enter s3-event-notifications-bucket-<random-number>.
  4. Select Block all public access.
  5. Choose Create bucket.
  6. In the buckets list, select the bucket and choose Create a folder.
  7. For Folder name, enter nyc_data.
  8. Choose Create folder.

Create an IAM policy with permissions on Amazon S3

To create your IAM policy with Amazon S3 permissions, complete the following steps:

  1. On the IAM console, choose Policies.
  2. Choose Create policy.
  3. On the JSON tab, enter the policy code from s3_event_notifications_iam_policy_s3.json.
  4. Update the S3 bucket name.
  5. Choose Next: Tags.
  6. Choose Next: Review.
  7. For Name, enter s3_event_notifications_iam_policy_s3.
  8. Choose Create policy.

Create an IAM policy with permissions on Amazon SQS

To create your IAM policy with Amazon SQS permissions, complete the following steps:

  1. On the IAM console, choose Policies.
  2. Choose Create policy.
  3. On the JSON tab, enter the policy code from s3_event_notifications_iam_policy_sqs.json.
  4. Update the AWS account number.
  5. Choose Next: Tags.
  6. Choose Next: Review.
  7. For Name, enter s3_event_notifications_iam_policy_sqs.
  8. Choose Create policy.

Create an IAM role for the crawler

To create your IAM policy with for the AWS Glue crawler, complete the following steps:

  1. On the IAM console, choose Roles.
  2. Choose Create role.
  3. For Choose a use case, choose Glue.
  4. Choose Next: Permissions.
  5. Attach the two policies you just created: s3_event_notifications_iam_policy_s3 and s3_event_notifications_iam_policy_sqs.
  6. Attach the AWS managed policy AWSGlueServiceRole.
  7. Choose Next: Tags.
  8. Choose Next: Review.
  9. For Role name, enter s3_event_notifications_crawler_iam_role.
  10. Review to confirm that all three policies are attached.
  11. Choose Create role.

Create an SNS topic

To create your SNS topic, complete the following steps:

  1. On the Amazon SNS console, choose Topics.
  2. Choose Create topic.
  3. For Type, choose Standard (FIFO isn’t supported).
  4. For Name, enter s3_event_notifications_topic.
  5. Choose Create topic.
  6. On the Access policy tab, choose Advanced.
  7. Enter the policy contents from s3_event_notifications_sns_topic_access_policy.json.
  8. Update the account number and S3 bucket.
  9. Choose Create topic.

Create an SQS queue

To create your SQS queue, complete the following steps.

  1. On the Amazon SQS console, choose Create a queue.
  2. For Type, choose Standard.
  3. For Name, enter s3_event_notifications_queue.
  4. Keep the remaining settings at their default.
  5. On the Access policy tab, choose Advanced.
  6. Enter the policy contents from s3_event_notifications_sqs_queue_policy.json.
  7. Update the account number.
  8. Choose Create queue.
  9. On the SNS subscription tab, choose Subscribe to SNS topic.
  10. Choose the topic you created, s3_event_notifications_topic.
  11. Choose Save.

Create event notifications on the S3 bucket

To create event notifications for your S3 bucket, complete the following steps:

  1. Navigate to the Properties tab of the S3 bucket you created.
  2. In the Event notifications section, choose Create event notification.
  3. For Event name, enter crawler_event.
  4. For Prefix, enter nyc_data/.
  5. For Event Types, choose All Object Create Event.
  6. For Destination, choose SNS topic and the topic s3_event_notifications_topic.

Create a crawler

To create your AWS Glue crawler, complete the following steps:

  1. On the AWS Glue console, choose Crawlers.
  2. Choose Add crawler.
  3. For Crawler name, enter s3_event_notifications_crawler.
  4. Choose Next.
  5. For Crawler source type, choose data stores.
  6. For Repeat crawls of S3 data stores, choose Crawl changes identified by Amazon S3 events.
  7. Choose Next.
  8. For Include path, enter an S3 path.
  9. For Include SQS ARN, add your Amazon SQS ARN.

Including a dead-letter queue is optional; we skip it for this post. Dead-letter queues help you isolate problematic event notifications a crawler can’t process successfully. To understand general benefits of dead-letter queues and how it gets messages from the main SQS queue, refer to Amazon SQS dead-letter queues.

  1. Choose Next.
  2. When asked to add another data store, choose No.
  3. For IAM role, select “Choose an existing role” and enter the IAM role created above.
  4. Choose Next.
  5. For Frequency, choose Run on demand.
  6. Choose Next.
  7. Under Database, choose Add database.
  8. For Database name, enter s3_event_notifications_database.
  9. Choose Create.
  10. Choose Next.
  11. Choose Finish to create your crawler.

Test the solution

The following steps show how adding new objects triggers an event notification that propagates to Amazon SQS, which the crawler uses on subsequent runs. For sample data, we use NYC taxi records from January and February, 2020.

  1. Download the following datasets:
    1. green_tripdata_2020-01.csv
    2. green_tripdata_2020-02.csv
  2. On the Amazon S3 console, navigate to the bucket you created earlier.
  3. Create a folder called nyc_data.
  4. Create a subfolder called dt=202001.

This sends a notification to the SNS topic, and a message is sent to the SQS queue.

  1. In the folder dt=202001, upload the file green_tripdata_2020-01.csv.
  2. To validate that this step generated an S3 event notification, navigate to the queue on the Amazon SQS console.
  3. Choose Send and receive messages.
  4. Under Receive messages, Messages available should show as 1.
  5. Return to the Crawlers page on the AWS Glue console and select the crawler s3_event_notifications_crawler.
  6. Choose Run crawler. After a few seconds, the crawler status changes to Starting and then to Running. The crawler should complete in 1–2 minutes and display a success message.
  7. Confirm that a new table, nyc_data, is in your database.
  8. Choose the table to verify its schema.

The dt column is marked as a partition key.

  1. Choose View partitions to see partition details.
  2. To validate that the crawler consumed this event, navigate to the queue on the Amazon SQS console and choose Send and receive messages.
  3. Under Receive messages, Messages available should show as 0.

Now upload another file and see how the S3 event triggers a crawler to run.

  1. On the Amazon S3 console, in your nyc_data folder, create the subfolder dt=202002.
  2. Upload the file green_tripdata_2020-02.csv to this subfolder.
  3. Run the crawler again and wait for the success message.
  4. Return to the AWS Glue table and choose View partitions to see a new partition added.

Additional notes

Keep in mind the following when using this solution:

Clean up

When you’re finished evaluating this feature, you should delete the SNS topic and SQS queue, AWS Glue crawler, and S3 bucket and objects to avoid any further charges.

Conclusion

In this post, we discussed a new way for AWS Glue crawlers to use S3 Event Notifications to reduce the time and cost needed to incrementally process table data updates in the AWS Glue Data Catalog. We discussed two design patterns to implement this approach. The first pattern publishes events directly to an SQS queue, which is useful when only the crawler needs these events. The second pattern publishes events to an SNS topic, which are forwarded to an SQS queue for the crawler to process. This is useful when other applications also depend on these events. We also discussed how to implement the second design pattern to incrementally crawl your data. Incremental crawlers using S3 event notifications reduces the runtime and cost of your crawlers for large and frequently changing tables.

Let us know your feedback in the comments section. Happy crawling!


About the Authors

Pradeep Patel is a Sr. Software Engineer at AWS Glue. He is passionate about helping customers solve their problems by using the power of the AWS Cloud to deliver highly scalable and robust solutions. In his spare time, he loves to hike and play with web applications.

Vivek Shrivastava is a Principal Data Architect, Data Lake in AWS Professional Services. He is a Bigdata enthusiast and holds 13 AWS Certifications. He is passionate about helping customers build scalable and high-performance data analytics solutions in the cloud. In his spare time, he loves reading and finds areas for home automation.

Ravi Itha is a Sr. Data Architect at AWS. He works with customers to design and implement data lakes, analytics, and microservices on AWS. He is an open-source committer and has published more than a dozen solutions using AWS CDK, AWS Glue, AWS Lambda, AWS Step Functions, Amazon ECS, Amazon MQ, Amazon SQS, Amazon Kinesis Data Streams, and Amazon Kinesis Data Analytics for Apache Flink. His solutions can be found at his GitHub handle. Outside of work, he is passionate about books, cooking, movies, and yoga.

Dream11: Blocking application attacks using AWS WAF at scale

Post Syndicated from Vatsal Shah original https://aws.amazon.com/blogs/architecture/dream11-blocking-application-attacks-using-aws-waf-at-scale/

As the world’s largest fantasy sports platforms with more than 120 million registered users, Dream11 runs multiple contests simultaneously while processing millions of user requests per minute. Their user-centric and data-driven teams make it a priority to ensure that the Dream11 application (app) remains protected against all kinds of threats and vulnerabilities.

Introduction to AWS WAF Security Automations

AWS WAF is a web application firewall that helps protect apps and APIs against common web exploits and bots. These attacks may affect availability, compromise security, or consume excessive resources. AWS WAF gives you control over how traffic reaches your applications. You can create security rules that control bot traffic and block common attack patterns, such as SQL injection or cross-site scripting (XSS.)

AWS WAF Security Automations use AWS CloudFormation to quickly configure AWS WAF rules that help block the following common types of attacks:

  • SQL injection
  • Cross-site scripting
  • HTTP floods
  • Scanners and probes
  • Known attacker origins (IP reputation lists)
  • Bots and scrapers

In this blog post, we will explain how Dream11 uses AWS WAF Security Automations to protect its application from scanners and probes attacks.

Scanner and probe automation

To understand the scanner and probe automation, let’s look at a realistic attack scenario for a standard app that is protected by AWS WAF. Let’s assume that a malicious user is trying to scan the app and identify loopholes using their custom tool. They plan to conduct injection attacks (such as SQLi, XSS) or directory brute force attacks.

The app, secured by AWS WAF, has rules in place to block requests if certain signatures and patterns are matched. AWS WAF cannot have all possible payload lists for each attack vector. This means that after some trial and errors, an attacker may find the payload that doesn’t get blocked by AWS WAF and try to exploit the vulnerability.

In this case, what if AWS WAF can detect the behavior of malicious user IPs and block it for a certain time period? Wouldn’t it be great if AWS WAF blocks the IP of a malicious user after receiving a couple of malicious requests? That way, new requests coming from that IP will be blocked without AWS WAF having to check all the rules in the web ACL. Any successful bypass attempts will also get blocked from that IP. Rather than permanently blocking the IP, this feature blocks the offending IP for a certain time period, discouraging the attacker from any further attempts. It acts as a first step of incident response. Here’s where automation can help.

Scanner and probe automation monitors Amazon CloudFront logs and analyses HTTP status codes for requests coming from different IPs. Based on the configured threshold of HTTP status codes, scanner and probe automation will update the malicious IP directly to the AWS WAF rule IPSet. It then blocks subsequent requests from that IP for a configured period of time.

The AWS WAF Security Automations solution creates an AWS WAF rule, an AWS Lambda function, and a Scanner and Probes Amazon Athena query. The Athena query parses Amazon CloudFront or Application Load Balancer access logs at regular intervals. It counts the number of bad requests per minute from unique source IP addresses. The Lambda function updates the AWS WAF IPSet rule to block further scans from IP addresses with a high error rate.

Scanner and probe solution

Solution architecture for scanner and probe automation (xxx represents the numbers as defined by the use case)

Figure 1. Solution architecture for scanner and probe automation (xxx represents the numbers as defined by the use case)

The workflow of the solution is as follows, shown in Figure 1:

  • CloudFront logs are pushed to the Amazon S3 bucket
  • Log Parser Lambda will run the Athena query to find the error code threshold for each unique IP
  • If the HTTP error threshold is crossed for any IP, the Lambda function will update the IP into an AWS WAF IPSet for a certain time
  • The IPSet is unblocked automatically after the time period is over

Customizing the AWS WAF Security Automation solution

Scanner and probe automation with rules will block traffic if the error rate for a particular IP crosses the threshold. It then adds the IP in the blocked IPSet. This IP is blocked for a configurable amount of time (for example, 12 hours, 2 days, 1 week).

During the customization of AWS WAF for Dream11, there were instances which required exceptions to the preceding rule. One was to prevent internal services/gateway IPs from getting blocked by the security automation. We needed to customize the rules for these predefined thresholds. For example: the solution should block the external traffic, but exclude any internal IP addresses.

The Dream11 Security team customized the Lambda logic to approve all internal NAT gateway IPs. Scanner and probe automation ignores these IPs even if there is a high number of errors from the approved IPs. Sample code is as follows:

log.info("[update_ip_set] \tIgnore the approved IP ")

if ip_type == "IPV4" and source_ip not in outstanding_requesters['ApprovedIPs']:  
                addresses_v4.append(source_ip)
elif ip_type == "IPV6" and source_ip not in outstanding_requesters['ApprovedIPs']:                     addresses_v6.append(source_ip)

Note: Create a JSON file with list of approved IPs and store it in APP_ACCESS_LOG_BUCKET
We will use the same S3 bucket to put our office-approved IPs as xyz.json file where we store our CloudFront access logs. This is configurable during CloudFormation template for Security Automation.

Code explanation:

  1. The custom code first validates the particular IP for which the error threshold is crossed against the approved IPs.
  2. If the IP belongs to the IPV4 or IPV6 format and isn’t an approved IP, it will be appended to the blocked IPSet for a certain period of time.

The customization of the Lambda function provides a security automation solution that doesn’t block any legitimate request. At the same time, it provides protection against scanner and probe attacks. AWS WAF security automation is an open-source solution and is hosted on GitHub.

Conclusion

In this blog post, we’ve given a brief overview of how you can reduce attacks by using AWS WAF Security Automations against scanners and probes. We’ve also illustrated the customization implemented by the Dream11 security team.

By automating your security operations, you will improve effective incident response. You can prioritize threats and handle cyber attacks automatically with automated courses of action. This reduces the need for human intervention, reduces response time, and addresses security issues without manual effort.

After implementing this at Dream11, we were able to create custom, application-specific rules that blocked attack patterns. This has provided application availability, secure resources, and has prevented excessive resource consumption. With this solution, we are able to provide the best fantasy sports experience for over 120 million users.

Read more about Security Automations in AWS WAF.

Improve reusability and security using Amazon Athena parameterized queries

Post Syndicated from Blayze Stefaniak original https://aws.amazon.com/blogs/big-data/improve-reusability-and-security-using-amazon-athena-parameterized-queries/

Amazon Athena is a serverless interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL, and you only pay for the amount of data scanned by your queries. If you use SQL to analyze your business on a daily basis, you may find yourself repeatedly running the same queries, or similar queries with minor adjustments. Athena parameterized queries enable you to prepare statements you can reuse with different argument values you provide at run time. Athena parameterized queries also provide a layer of security against SQL injection attacks, and mask the query string in AWS CloudTrail for workloads with sensitive data.

This post shows you how to create and run parameterized queries in Athena. This post provides an example of how Athena parameterized queries protect against SQL injection, and shows the CloudTrail events with the masked query string. Lastly, the post reviews functions related to managing Athena prepared statements. If you want to follow along, this post provides steps to set up the components with a sample dataset; alternatively, you can use your own dataset.

Reusability

Athena prepared statements allow you to run and reuse queries within your Athena workgroup. By decoupling the queries from the code, you can update your prepared statements and your applications independent from one another. If a data lake has schema updates, it could require query updates. If multiple applications share the same Athena workgroup and are using similar queries, you can create a new query or update the existing query to serve multiple use cases, without each application being required to adjust similar queries in their own source code. Parameterized queries are currently supported for SELECT, INSERT INTO, CTAS, and UNLOAD statements. For the most current list, refer to Considerations and Limitations in Querying with Prepared Statements.

Security

Athena prepared statements provide a layer of protection against SQL injection. If you are using Athena behind an application interface, free text inputs inherently present a SQL injection threat vector which, if left unmitigated, could result in data exfiltration. When the parameterized query is run, Athena interprets the arguments as literal values, not as executable commands nor SQL fragments like SQL operators.

When using Athena, CloudTrail captures all Athena API calls as audit events to provide a record of actions taken by an AWS user, role, or AWS service. Customers with sensitive data in their data lakes, such as personally identifiable information (PII), have told us they don’t want query strings in their CloudTrail event history for compliance reasons. When running parameterized queries, the query string is masked with HIDDEN_DUE_TO_SECURITY_REASONS in the CloudTrail event, so you don’t show protected data within your log streams.

Solution overview

This post documents the steps using the public Amazon.com customer reviews dataset; however, you can follow similar steps to use your own dataset.

The example query is to find a product’s 4-star (out of 5 stars) reviews voted as the most helpful by other customers. The intent behind the query is to find query results that indicate constructive product feedback. The intent is to validate the feedback and get helpful feedback incorporated into the product roadmap. The product used in this use case is the Amazon Smile eGift Card.

Prerequisites

As a prerequisite, you need a foundational understanding of SQL syntax, as well as a foundational understanding the following AWS services:

This post assumes you have:

Deploy resources for the example dataset

If you’re using the example dataset, follow the steps in this section. The data is in an S3 bucket in an AWS-managed AWS account. You need to create Athena and AWS Glue resources to get started.

This post provides a CloudFormation template that deploys the following resources in your AWS account:

  • AthenaWorkGroup – An Athena workgroup for your dataset and prepared statements. On the console, this workgroup is named PreparedStatementsWG.
  • GlueDatabase – A database in the AWS Glue Data Catalog for table metadata. The database is named athena_prepared_statements.
  • GlueTableAmazonReviews – An external table with Amazon.com customer reviews in the Data Catalog.

The following diagram shows how the resources interact when the query runs.
Diagram depicting a customer's AWS account and an AWS managed AWS account. In the customer account, there is a region box. In the region, there is an Amazon Athena workgroup taking 3 steps. In the first step, the workgroup accesses metadata from the AWS Glue Data Catalog named default. The catalog has a dotted line to an AWS Glue table called amazon_reviews_parquet, which has the attributes and S3 bucket location. The second step from the workgroup queries data from the S3 bucket. The S3 bucket is in the AWS managed AWS account. The bucket is for the Amazon Customer Reviews dataset. In the third step, the workgroup stores the query results in the Amazon S3 bucket in the customer AWS account. The query results can then be read by users with read access to the Athena workgroup.

To deploy the CloudFormation template, follow these steps:

  1. Navigate to this post’s GitHub repository.
  2. Clone the repository or copy the CloudFormation template athena-prepared-statements.yaml.
  3. On the AWS CloudFormation console, choose Create stack.
  4. Select Upload a template file and choose Choose file.
  5. Upload athena-prepared-statements.yaml, then choose Next.
  6. On the Specify stack details page, enter the stack name athena-prepared-statements-blog.
  7. For S3QueryResultsBucketName, enter your S3 bucket name.
  8. If you leave AthenaWorkGroupName as default, the Athena workgroup is named PreparedStatementsWG. If you change the value, the Athena workgroup name must be unique in your AWS Region.
  9. Choose Next.
  10. On the Configure stack options page, choose Next.
  11. On the Review page, choose Create stack.

The script takes less than a minute to run and change to a CREATE_COMPLETE state. If you deploy the stack twice in the same AWS account and Region, the AWS Glue database, table, or Athena workgroup may already exist, and the process fails with a message indicating that the resource already exists in another template.

For least-privilege authorization for deployment of the CloudFormation template, you can create an AWS CloudFormation service role with the following IAM policy actions. To do this, you must create an IAM policy and IAM role, and choose this role when configuring stack options.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "glue:CreateDatabase"
      ],
      "Resource": [
        "arn:${Partition}:glue:${Region}:${Account}:catalog",
        "arn:${Partition}:glue:${Region}:${Account}:database/athena_prepared_statements"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "glue:DeleteDatabase"
      ],
      "Resource": [
        "arn:${Partition}:glue:${Region}:${Account}:catalog",
        "arn:${Partition}:glue:${Region}:${Account}:database/athena_prepared_statements",
        "arn:${Partition}:glue:${Region}:${Account}:table/athena_prepared_statements/*",
        "arn:${Partition}:glue:${Region}:${Account}:userDefinedFunction/athena_prepared_statements/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "glue:CreateTable"
      ],
      "Resource": [
        "arn:${Partition}:glue:${Region}:${Account}:catalog",
        "arn:${Partition}:glue:${Region}:${Account}:database/athena_prepared_statements",
        "arn:${Partition}:glue:${Region}:${Account}:table/athena_prepared_statements/amazon_reviews_parquet"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "glue:DeleteTable"
      ],
      "Resource": [
        "arn:${Partition}:glue:${Region}:${Account}:catalog",
        "arn:${Partition}:glue:${Region}:${Account}:database/athena_prepared_statements",
        "arn:${Partition}:glue:${Region}:${Account}:table/athena_prepared_statements/amazon_reviews_parquet"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "athena:CreateWorkGroup",
        "athena:DeleteWorkGroup",
        "athena:GetWorkGroup"
      ],
      "Resource": "arn:${Partition}:athena:${Region}:${Account}:workgroup/PreparedStatementsWG"
    }
  ]
}

For authorization for the IAM principal running the CloudFormation template and following along, this post was tested with the following AWS managed policies and the customer managed policy below.

AWS managed policies:

  • AmazonAthenaFullAccess
  • AWSCloudTrailReadOnlyAccess
  • AWSCloudFormationFullAccess

Customer managed policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "ViewS3BucketsWithoutErrors",
            "Effect": "Allow",
            "Action": [
                "s3:ListAllMyBuckets"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Sid": "InteractWithMyBucketAndDataSetBucket",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:GetBucketLocation",
                "s3:ListBucket",
                "s3:PutObject"
            ],
            "Resource": [
                "arn:aws:s3:::${my-bucket-name}*",
                "arn:aws:s3:::amazon-reviews-pds*"
            ]
        },
        {
            "Sid": "UploadCloudFormationTemplate",
            "Effect": "Allow",
            "Action": "s3:PutObject",
            "Resource": "arn:aws:s3:::cf-template*"
        },
        {
            "Sid": "CleanUpResults",
            "Effect": "Allow",
            "Action": [
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::${my-bucket-name}/results*"
            ]
        },
        {
            "Sid": "ListRolesForCloudFormationDeployment",
            "Effect": "Allow",
            "Action": [
                "iam:ListRoles"
            ],
            "Resource": [
                "*"
            ]
        },
        {
            "Sid": "IAMRoleForCloudFormationDeployment",
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": [
                "arn:${Partition}:iam::${Account}:role/${role-name}"
            ],
            "Condition": {
                "StringEquals": {
                    "iam:PassedToService": "cloudformation.amazonaws.com"
                }
            }
        }
    ]
}

Partition the example dataset

The CloudFormation template created an external table pointing at a dataset of over 130 million customer reviews from Amazon.com. Partitioning data improves query performance and reduces cost by restricting the amount of data scanned by each query. The external table for this dataset has Hive-compatible partitions. The MSCK REPAIR TABLE SQL statement scans the prefix paths in the S3 bucket and updates the metadata in the Data Catalog with the partition metadata. To access the dataset, the external table’s partitions must be updated.

After you deploy the CloudFormation template, complete the following steps:

  1. On the Athena console, choose Query editor in the navigation pane.
  2. For Data Source, enter AwsDataCatalog.
  3. For Database, enter athena_prepared_statements.
  4. On the Workgroup drop-down menu, choose PreparedStatementsWG.
  5. Choose Acknowledge to confirm.
  6. In the query editor pane, run the following SQL statement for your external table:
MSCK REPAIR TABLE athena_prepared_statements.amazon_reviews_parquet;

This query takes approximately 15 seconds to run when tested in us-east-1.

  1. Run the following query to list the available partitions. The example dataset has partitions based on product_category.
SHOW PARTITIONS athena_prepared_statements.amazon_reviews_parquet;
  1. Run a SELECT statement to output a sample of data available in the table:
SELECT * FROM athena_prepared_statements.amazon_reviews_parquet limit 10;

Create prepared statements

To use Athena parameterized queries, first you run the PREPARE SQL statement and specify your positional parameters, denoted by question marks. The Athena prepared statement is stored with a name you specify.

Run the following PREPARE statement in the Athena query editor. This example query, named product_helpful_reviews, provides customer reviews with three parameters for a specified product ID, star rating provided by the reviewer, and minimum number of helpful votes provided to the review by other Amazon.com customers.

PREPARE product_helpful_reviews FROM
SELECT product_id, product_title, star_rating, helpful_votes, review_headline, review_body
FROM amazon_reviews_parquet WHERE product_id = ? AND star_rating = ? AND helpful_votes > ?
ORDER BY helpful_votes DESC
LIMIT 10;

You could also use the CreatePreparedStatement API or SDK. For example, to create your prepared statement from AWS CLI, run the following command:

aws athena create-prepared-statement \
--statement-name "product_helpful_reviews" \
--query-statement "SELECT product_id, product_title, star_rating, helpful_votes, review_headline, review_body FROM amazon_reviews_parquet WHERE product_id = ? AND star_rating = ? AND helpful_votes > ? ORDER BY helpful_votes DESC LIMIT 10;" \
--work-group PreparedStatementsWG \
--region region

For more information on creating prepared statements, refer to SQL Statements in Querying with Prepared Statements.

Run parameterized queries

You can run a parameterized query against the prepared statement with the EXECUTE SQL statement and a USING clause. The USING clause specifies the argument values for the prepared statement’s parameters.

Run the following EXECUTE statement in the Athena query editor. The prepared statement created in the previous section is run with parameters to output 4-star reviews for the Amazon Smile eGift Card product ID with at least 10 helpful votes.

EXECUTE product_helpful_reviews USING 'BT00DDVMVQ', 4, 10;

If you receive the message PreparedStatement product_helpful_reviews was not found in workGroup primary, make sure you selected the PreparedStatementsWG workgroup.

For more information on running parameterized queries, refer to SQL Statements in Querying with Prepared Statements.

Mask query string data in CloudTrail events using parameterized queries

You may want to use parameterized queries to redact sensitive data from the query string visible in CloudTrail events. For example, you may have columns containing PII as parameters, which you don’t want visible in logs. Athena automatically masks query strings from CloudTrail events for EXECUTE statements, replacing the query string with the value HIDDEN_DUE_TO_SECURITY_REASONS. This helps you avoid displaying protected data in your log streams.

To access the CloudTrail event for the query, complete the following steps:

  1. Navigate to the Event history page on the CloudTrail console.
  2. On the drop-down menu, choose Event name.
  3. Search for StartQueryExecution events.

CloudTrail event records for parameterized queries include a queryString value redacted with HIDDEN_DUE_TO_SECURITY_REASONS. The query string is visible in the Athena workgroup’s query history. You can control access by using least-privilege IAM policies to Athena, the AWS Glue Data Catalog, and the Amazon S3 query output location in your workgroup settings. For more information on viewing recent queries, refer to Viewing Recent Queries. For more information on IAM policies, refer to Actions, resources, and condition keys for AWS services.

Layer of protection for SQL injection

In this section, you’re shown an example of a SQL injection attack, and how prepared statements can protect against the same attack. We use the Athena console to invoke the StartQueryExecution API against a table named users with three rows.

SQL injection is an attempt to insert malicious SQL code into requests to change the statement and extract data from your dataset’s tables. Without Athena parameterized queries, if you’re querying a dataset directly or appending user input to a SQL query, and users can append SQL fragments, the dataset may be vulnerable to SQL injection attacks which return unauthorized data in the result set.

This post shows an example of inserting a SQL fragment in a malicious way. In the example, an OR condition which will always return true (such as OR 1=1) is appended to the WHERE clause. The same example query is shown with Athena parameterized queries, and the query fails because it contains an invalid parameter value, since the parameter value is expected to be an integer but contains the characters “OR”. If the parameter was based on a String column, then the same SQL injection attempt would result in the query returning no results because the positional argument is interpreted as a literal parameter value.

Athena provides an additional layer of defense against multi-statement SQL injection attacks. Attempting to perform SQL injection with an executable command (such as DROP) results in a failed query with Athena providing an error Only one sql statement is allowed, because Athena only accepts one executable command per SQL statement submission.

Although Athena prepared statements provide a layer of protection against SQL injection attacks, other precautions provide additional layers of defense. Athena prepared statements can be a part of your defense-in-depth strategy. For more information on layers of security, refer to Amazon Athena Security.

SQL injection example

The intended use of the SELECT query in the example is to receive a small set of values. However, an attacker can manipulate the input to append malicious SQL code. For example, an attacker can input a value of 1 OR 1=1, which appends a true condition to the WHERE clause and returns all records in the table:

SELECT * FROM users WHERE id = 1 OR 1=1;

By appending malicious SQL code, the attacker can retrieve all rows of the users table, as shown in the following screenshot.
An image of the Athena graphical user interface. A query SELECT * FROM users WHERE id = 1 OR 1=1; has been run. All 3 users in the table, with ids 1, 2, and 3, returned with all columns of the table.

SQL injection attempt with a prepared statement

If we create prepared statements with the same query from the previous example, the executable command is passed as a literal argument for the parameter’s value. If a user tries to pass additional SQL, they receive a syntax error because the WHERE clause is based on ID, which expects an integer value.

  1. Create a prepared statement using the same query against the users table:
PREPARE get_user FROM SELECT * FROM users WHERE id = ?
  1. Set the parameter to a legitimate value:
EXECUTE get_user USING 1

The expected result returns, as shown in the following screenshot.

Graphical user interface of Athena running query EXECUTE get_user USING 1. Only the user with id 1 returned.

  1. Now, attempt to pass a malicious value:
EXECUTE get_user USING 1 OR 1=1

Running this prepared statement produces a syntax error, because an integer value is expected, but it receives an invalid integer value of 1 OR 1=1. The query and syntax error are shown in the following screenshot.

Graphical user interface of Athena querying EXECUTE get_user USING 1 OR 1=1. There is an error. The error says "SYNTAX_ERROR: line 1:24: Left side of logical expression must evaluate to a boolean (actual: integer). This query ran against the "default" database, unless qualified by the query. Please post the error message in our forum."

Working with prepared statements

This section describes administrative functions to make it easier to work with prepared statements.

List all prepared statements in my AWS account

To list all prepared statements in an Athena workgroup from the AWS Command Line Interface (AWS CLI), you can run the following command:

aws athena list-prepared-statements --work-group workgroup_name --region region_name

If following the example above, the command will return the following response.

{
  "PreparedStatements": [
    {
      "StatementName": "product_helpful_reviews",
      "LastModifiedTime": "2022-01-14T15:33:07.935000+00:00"
    }
  ]
}

To list all available prepared statements in your AWS account, you can use the AWS APIs. This post provides a sample script using the AWS SDK for Python (Boto3) to loop through all Regions in your account, and provide the prepared statements per Athena workgroup.

Make sure you have AWS credentials where you plan to run the Python script. For more information, refer to Credentials.

Clone the GitHub repo or copy the Python script list-prepared-statements.py from the repo and run the script:

python3 list-prepared-statements.py

Replace <my-profile-name> with your AWS profile name when it prompts you, or leave empty to use default local credentials.

Enter the AWS CLI profile name or leave blank if using instance profile: <my-profile-name>

The following text is the output of the script. If following along, the response returns only the product_helpful_reviews prepared statement.

eu-north-1:
ap-south-1:
eu-west-3:
eu-west-2:
eu-west-1:
ap-northeast-3:
ap-northeast-2:
ap-northeast-1:
sa-east-1:
ca-central-1:
ap-southeast-1:
ap-southeast-2:
eu-central-1:
us-east-1:
        athena-v2-wg: my_select
        PreparedStatementsWG: get_user
        PreparedStatementsWG: get_contacts_by_company
        PreparedStatementsWG: product_helpful_reviews
        PreparedStatementsWG: count_stars
        PreparedStatementsWG: helpful_reviews
        PreparedStatementsWG: get_product_info
        PreparedStatementsWG: check_avg_stars_of_category
        PreparedStatementsWG: my_select_v1
        PreparedStatementsWG: my_select_v2
us-east-2:
us-west-1:
us-west-2:

Update prepared statements

You have a few options for updating prepared statements. You may want to do this to optimize your query performance, change the values you select, or for several other reasons.

  1. Rerun the PREPARE statement with the changes in the Athena query editor or against the StartQueryExecution API.
  2. Use the UpdatePreparedStatement API via the AWS CLI or SDK.

You can use this API to add a description to your prepared statements or update your queries. To update your query statement via this method, you must provide the statement name, workgroup name, updated query statement, and optionally a new description. For more information about the UpdatePreparedStatement API, refer to update-prepared-statement.

You may want to roll out versions of your query. To maintain backward-compatibility for users, you could create a new prepared statement with a different name. For example, the prepared statement could have a version number in its name (such as my_select_v1 and my_select_v2). When necessary, you could communicate changes to teams who rely on the prepared statement, and later deallocate the old prepared statement versions.

Delete prepared statements

To delete a prepared statement, you can use the following query syntax when against the StartQueryExecution API, or from within the Athena query editor:

DEALLOCATE PREPARE product_helpful_reviews

You could also use the DeletePreparedStatement API or SDK. For example, to delete your prepared statement from AWS CLI, run the following command:

aws athena delete-prepared-statement --statement-name product_helpful_reviews --work-group PreparedStatementsWG --region region

Clean up

If you followed along with this post, you created several components that may incur costs. To avoid future charges, remove the resources with the following steps:

  1. Delete the S3 bucket’s results prefix created after you run a query on your workgroup.

With the default template, it’s named <S3QueryResultsBucketName>/athena-results. Use caution in this step. Unless you are using versioning on your S3 bucket, deleting S3 objects cannot be undone.

  1. Delete the Athena prepared statements in the PreparedStatementsWG

You can follow the steps in the Delete prepared statements section of this post using either the DEALLOCATE PREPARE statement or delete-prepared-statement API for each prepared statement you created.

  1. To remove the CloudFormation stack, select the stack on the AWS CloudFormation console, choose Delete, and confirm.

Conclusion

Athena parameterized queries make it easy to decouple your code base from your queries by providing a way to store common queries within your Athena workgroup. This post provided information about how Athena parameterized queries can improve your code reusability and data lake security. We showed how you can set up a sample data lake and start using parameterized queries today. We also provided an example of the protections parameterized queries offers, and detailed additional administrative functions.

You can get started with Athena prepared statements via the Athena console, the AWS CLI, or the AWS SDK. To learn more about Athena, refer to the Amazon Athena User Guide.

Thanks for reading this post! If you have questions about Athena parameterized queries, don’t hesitate to leave a comment in the comments section.


About the Authors

Blayze Stefaniak is a Senior Solutions Architect at AWS who works with public sector, federal financial, and healthcare organizations. Blayze is based out of Pittsburgh. He is passionate about breaking down complex situations into something practical and actionable. His interests include artificial intelligence, distributed systems, and Excel formula gymnastics. Blayze holds a B.S.B.A. in Accounting and B.S. in Information Systems from Clarion University of Pennsylvania. In his spare time, you can find Blayze listening to Star Wars audiobooks, trying to make his dogs laugh, and probably talking on mute.

Daniel Tatarkin is a Solutions Architect at Amazon Web Services (AWS) supporting Federal Financial organizations. He is passionate about big data analytics and serverless technologies. Outside of work, he enjoys learning about personal finance, coffee, and trying out new programming languages for fun.

Choosing the right solution for AWS Lambda external parameters

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/choosing-the-right-solution-for-aws-lambda-external-parameters/

This post is written by Thomas Moore, Solutions Architect, Serverless.

When using AWS Lambda to build serverless applications, customers often need to retrieve parameters from an external source at runtime. This allows you to share parameter values across multiple functions or microservices, providing a single source of truth for updates. A common example is retrieving database connection details from an external source and then using the retrieved hostname, user name, and password to connect to the database:

Lambda function retrieving database credentials from an external source

Lambda function retrieving database credentials from an external source

AWS provides a number of options to store parameter data, including AWS Systems Manager Parameter Store, AWS AppConfig, Amazon S3, and Lambda environment variables. This blog explores the different parameter data that you may need to store. I cover considerations for choosing the right parameter solution and how to retrieve and cache parameter data efficiently within the Lambda function execution environment.

Common use cases

Common parameter examples include:

  • Securely storing secret data, such as credentials or API keys.
  • Database connection details such as hostname, port, and credentials.
  • Schema data (for example, a structured JSON response).
  • TLS certificate for mTLS or JWT validation.
  • Email template.
  • Tenant configuration in a multitenant system.
  • Details of external AWS resources to communicate with such as an Amazon SQS queue URL, Amazon EventBridge event bus name, or AWS Step Functions ARN.

Key considerations

There are a number of key considerations when choosing the right solution for external parameter data.

  1. Cost – how much does it cost to store the data and retrieve it via an API call?
  2. Security – what encryption and fine-grained access control is required?
  3. Performance – what are the retrieval latency requirements?
  4. Data size – how much data is there to store and retrieve?
  5. Update frequency – how often does the parameter change and how does the function handle stale parameters?
  6. Access scope – do multiple functions or services access the parameter?

These considerations help to determine where to store the parameter data and how often to retrieve it.

For example, a 4KB parameter that updates hourly and is used by hundreds of functions needs to be optimized for low retrieval costs and high performance. Choosing a solution that supports low-cost API GET requests at a high transaction per second (TPS) would be better than one that supports large data.

AWS service options

There are a number of AWS services available to store external parameter data.

Amazon S3

S3 is an object storage service offering 99.999999999% (11 9s) of data durability and virtually unlimited scalability at low cost. Objects can be up to 5 TB in size in any format, making S3 a good solution to store larger parameter data.

Amazon DynamoDB

Amazon DynamoDB is a fully managed, serverless, key-value NoSQL database designed for single-digit millisecond performance at any scale. Due to the high performance of this service, it’s a great place to store parameters when low retrieval latency is important.

AWS Secrets Manager

AWS Secrets Manager makes it easier to rotate, manage, and retrieve secret data. This makes it the ideal place to store sensitive parameters such as passwords and API keys.

AWS Systems Manager Parameter Store

Parameter Store provides a centralized store to manage configuration data. This data can be plaintext or encrypted using AWS Key Management Service (KMS). Parameters can be tagged and organized into hierarchies for simpler management. Parameter Store is a good default choice for general-purpose parameters in AWS. The standard version (no additional charge) can store parameters up to 4 KB in size and the advanced version (additional charges apply) up to 8 KB.

For a code example using Parameter Store for Lambda parameters, see the Serverless Land pattern.

AWS AppConfig

AppConfig is a capability of AWS Systems Manager to create, manage, and quickly deploy application configurations. AppConfig allows you to validate changes during roll-outs and automatically roll back, if there is an error. AppConfig deployment strategies help to manage configuration changes safely.

AppConfig also provides a Lambda extension to retrieve and locally cache configuration data. This results in fewer API calls and reduced function duration, reducing costs.

AWS Lambda environment variables

You can store parameter data as Lambda environment variables as part of the function’s version-specific configuration. Lambda environment variables are stored during function creation or updates. You can access these variables directly from your code without needing to contact an external source. Environment variables are ideal for parameter values that don’t need updating regularly and help make function code reusable across different environments. However, unlike the other options, values cannot be accessed centrally by multiple functions or services.

Lambda execution lifecycle

It is worth understanding the Lambda execution lifecycle, which has a number of stages. This helps to decide when to handle parameter retrieval within your Lambda code, including cache management.

Lambda execution lifecycle

Lambda execution lifecycle

When a Lambda function is invoked for the first time, or when Lambda is scaling to handle additional requests, an execution environment is created. The first phase in the execution environment’s lifecycle is initialization (Init), during which the code outside the main handler function runs. This is known as a cold start.

The execution environment can then be re-used for subsequent invocations. This means that the Init phase does not need to run again and only the main handler function code runs. This is known as a warm start.

An execution environment can only run a single invocation at a time. Concurrent invocations require additional execution environments. When a new execution environment is required, this starts a new Init phase, which runs the cold start process.

Caching and updates

Retrieving the parameter during Init

Retrieving the parameter during Init

Retrieving the parameter during Init

As Lambda execution environments are re-used, you can improve the performance and reduce the cost of retrieving an external parameter by caching the value. Writing the value to memory or the Lambda /tmp file system allows it to be available during subsequent invokes in the same execution environment.

This approach reduces API calls, as they are not made during every invocation. However, this can cause an out-of-date parameter and potentially different values across concurrent execution environments.

The following Python example shows how to retrieve a Parameter Store value outside the Lambda handler function during the Init phase.

import boto3
ssm = boto3.client('ssm', region_name='eu-west-1')
parameter = ssm.get_parameter(Name='/my/parameter')
def lambda_handler(event, context):
    # My function code...

Retrieving the parameter on every invocation

Retrieving the parameter on every invocation

Retrieving the parameter on every invocation

Another option is to retrieve the parameter during every invocation by making the API call inside the handler code. This keeps the value up to date, but can lead to higher retrieval costs and longer function durations due to the added API call during every invocation.

The following Python example shows this approach:

import boto3
ssm = boto3.client('ssm', region_name='eu-west-1')
def lambda_handler(event, context):
    parameter = ssm.get_parameter(Name='/my/parameter')
    # My function code...

Using AWS AppConfig Lambda extension

Using AWS AppConfig Lambda extension

Using AWS AppConfig Lambda extension

AppConfig allows you to retrieve and cache values from the service using a Lambda extension. The extension retrieves the values and makes them available via a local HTTP server. The Lambda function then queries the local HTTP server for the value. The AppConfig extension refreshes the values at a configurable poll interval, which defaults to 45 seconds. This improves performance and reduces costs, as the function only needs to make a local HTTP call.

The following Python code example shows how to access the cached parameters.

import urllib.request
def lambda_handler(event, context):
    url = f'http://localhost:2772/applications/application_name/environments/environment_name/configurations/configuration_name'
    config = urllib.request.urlopen(url).read()
    # My function code...

For caching secret values using a Lambda extension local HTTP cache and AWS Secrets Manager, see the AWS Prescriptive Guidance documentation.

Using Lambda Powertools for Python or Java

Lambda Powertools for Python or Lambda Powertools for Java contains utilities to manage parameter caching. You can configure the cache interval, which defaults to 5 seconds. Supported parameter stores include Secrets Manager, AWS Systems Manager Parameter Store, AppConfig, and DynamoDB. You also have the option to bring your own provider. The following example shows the Powertools for Python parameters utility retrieving a single value from Systems Manager Parameter Store.

from aws_lambda_powertools.utilities import parameters
def handler(event, context):
    value = parameters.get_parameter("/my/parameter")
    # My function code…

Security

Parameter security is a key consideration. You should evaluate encryption at rest, in-transit, private network access, and fine-grained permissions for each external parameter solution based on the use case.

All services highlighted in this post support server-side encryption at rest, and you can choose to use AWS KMS to manage your own keys. When accessing parameters using the AWS SDK and CLI tools, connections are encrypted in transit using TLS by default. You can force most to use TLS 1.2.

To access parameters from inside an Amazon Virtual Private Cloud (Amazon VPC) without internet access, you can use AWS PrivateLink and create a VPC endpoint for each service. All the services mentioned in this post support AWS PrivateLink connections.

Use AWS Identity and Access Management (IAM) policies to manage which users or roles can access specific parameters.

General guidance

This blog explores a number of considerations to make when using an external source for Lambda parameters. The correct solution is use-case dependent. There are some general guidelines when selecting an AWS service.

  • For general-purpose low-cost parameters, use AWS Systems Manager Parameter Store.
  • For single function, small parameters, use Lambda environment variables.
  • For secret values that require automatic rotation, use AWS Secrets Manager.
  • When you need a managed cache, use the AWS AppConfig Lambda extension or Lambda Powertools for Python/Java.
  • For items larger than 400 KB, use Amazon S3.
  • When access frequency is high, and low latency is required, use Amazon DynamoDB.

Conclusion

External parameters provide a central source of truth across distributed systems, allowing for efficient updates and code reuse. This blog post highlights a number of considerations when using external parameters with Lambda to help you choose the most appropriate solution for your use case.

Consider how you cache and reuse parameters inside the Lambda execution environment. Doing this correctly can help you reduce costs and improve the performance of your Lambda functions.

There are a number of services to choose from to store parameter data. These include DynamoDB, S3, Parameter Store, Secrets Manager, AppConfig, and Lambda environment variables. Each comes with a number of advantages, depending on the use case. This blog guidance, along with the AWS documentation and Service Quotas, can help you select the most appropriate service for your workload.

For more serverless learning resources, visit Serverless Land.

Welcome to AWS Pi Day 2022

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/welcome-to-aws-pi-day-2022/

We launched Amazon Simple Storage Service (Amazon S3) sixteen years ago today!

As I often told my audiences in the early days, I wanted them to think big thoughts and dream big dreams! Looking back, I think it is safe to say that the launch of S3 empowered them to do just that, and initiated a wave of innovation that continues to this day.

Bigger, Busier, and more Cost-Effective
Our customers count on Amazon S3 to provide them with reliable and highly durable object storage that scales to meet their needs, while growing more and more cost-effective over time. We’ve met those needs and many others; here are some new metrics that prove my point:

Object Storage – Amazon S3 now holds more than 200 trillion (2 x 1014) objects. That’s almost 29,000 objects for each resident of planet Earth. Counting at one object per second, it would take 6.342 million years to reach this number! According to Ethan Siegel, there are about 2 trillion galaxies in the visible Universe, so that’s 100 objects per galaxy! Shortly after the 2006 launch of S3, I was happy to announce the then-impressive metric of 800 million stored objects, so the object count has grown by a factor of 250,000 in less than 16 years.

Request Rate – Amazon S3 now averages over 100 million requests per second.

Cost Effective – Over time we have added multiple storage classes to S3 in order to optimize cost and performance for many different workloads. For example, AWS customers are making great use of Amazon S3 Intelligent Tiering (the only cloud storage class that delivers automatic storage cost savings when data access patterns change), and have saved more than $250 million in storage costs as compared to Amazon S3 Standard. When I first wrote about this storage class in 2018, I said:

In order to make it easier for you to take advantage of S3 without having to develop a deep understanding of your access patterns, we are launching a new storage class, S3 Intelligent-Tiering.

With the improved cost optimizations for small and short-lived objects and the archiving capabilities that we launched late last year, you can now use S3 Intelligent-Tiering as the default storage class for just about every workload, especially data lakes, analytics use cases, and new applications.

Customer Innovation
As you can see from the metrics above, our customers use S3 to store and protect vast amounts of data in support of an equally vast number of use cases and applications. Here are just a few of the ways that our customers are innovating:

NASCARAfter spending 15 years collecting video, image, and audio assets representing over 70 years of motor sports history, NASCAR built a media library that encompassed over 8,600 LTO 6 tapes and a few thousand LTO 4 tapes, with a growth rate of between 1.5 PB and 2 PB per year. Over the course of 18 months they migrated all of this content (a total of 15 PB) to AWS, making use of the Amazon S3 Standard, Amazon S3 Glacier Flexible Retrieval, and Amazon S3 Glacier Deep Archive storage classes. To learn more about how they migrated this massive and invaluable archive, read Modernizing NASCAR’s multi-PB media archive at speed with AWS Storage.

Electronic Arts
This game maker’s core telemetry systems handle tens of petabytes of data, tens of thousands of tables, and over 2 billion objects. As their games became more popular and the volume of data grew, they were facing challenges around data growth, cost management, retention, and data usage. In a series of updates, they moved archival data to Amazon S3 Glacier Deep Archive, implemented tag-driven retention management, and implemented Amazon S3 Intelligent-Tiering. They have reduced their costs and made their data assets more accessible; read
Electronic Arts optimizes storage costs and operations using Amazon S3 Intelligent-Tiering and S3 Glacier to learn more.

NRGene / CRISPR-IL
This team came together to build a best-in-class gene-editing prediction platform. CRISPR (
A Crack In Creation is a great introduction) is a very new and very precise way to edit genes and effect changes to an organism’s genetic makeup. The CRISPR-IL consortium is built around an iterative learning process that allows researchers to send results to a predictive engine that helps to shape the next round of experiments. As described in
A gene-editing prediction engine with iterative learning cycles built on AWS, the team identified five key challenges and then used AWS to build GoGenome, a web service that performs predictions and delivers the results to users. GoGenome stores over 20 terabytes of raw sequencing data, and hundreds of millions of feature vectors, making use of Amazon S3 and other
AWS storage services as the foundation of their data lake.

Some other cool recent S3 success stories include Liberty Mutual (How Liberty Mutual built a highly scalable and cost-effective document management solution), Discovery (Discovery Accelerates Innovation, Cuts Linear Playout Infrastructure Costs by 61% on AWS), and Pinterest (How Pinterest worked with AWS to create a new way to manage data access).

Join Us Online Today
In celebration of AWS Pi Day 2022 we have put together an entire day of educational sessions, live demos, and even a launch or two. We will also take a look at some of the newest S3 launches including Amazon S3 Glacier Instant Retrieval, Amazon S3 Batch Replication and AWS Backup Support for Amazon S3.

Designed for system administrators, engineers, developers, and architects, our sessions will bring you the latest and greatest information on security, backup, archiving, certification, and more. Join us at 9:30 AM PT on Twitch for Kevin Miller’s kickoff keynote, and stick around for the entire day to learn a lot more about how you can put Amazon S3 to use in your applications. See you there!

Jeff;

Make data available for analysis in seconds with Upsolver low-code data pipelines, Amazon Redshift Streaming Ingestion, and Amazon Redshift Serverless

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/make-data-available-for-analysis-in-seconds-with-upsolver-low-code-data-pipelines-amazon-redshift-streaming-ingestion-and-amazon-redshift-serverless/

Amazon Redshift is the most widely used cloud data warehouse. Amazon Redshift makes it easy and cost-effective to perform analytics on vast amounts of data. Amazon Redshift launched Streaming Ingestion for Amazon Kinesis Data Streams, which enables you to load data into Amazon Redshift with low latency and without having to stage the data in Amazon Simple Storage Service (Amazon S3). This new capability enables you to build reports and dashboards and perform analytics using fresh and current data, without needing to manage custom code that periodically loads new data.

Upsolver is an AWS Advanced Technology Partner that enables you to ingest data from a wide range of sources, transform it, and load the results into your target of choice, such as Kinesis Data Streams and Amazon Redshift. Data analysts, engineers, and data scientists define their transformation logic using SQL, and Upsolver automates the deployment, scheduling, and maintenance of the data pipeline. It’s pipeline ops simplified!

There are multiple ways to stream data to Amazon Redshift and in this post we will cover two options that Upsolver can help you with: First, we show you how to configure Upsolver to stream events to Kinesis Data Streams that are consumed by Amazon Redshift using Streaming Ingestion. Second, we demonstrate how to write event data to your data lake and consume it using Amazon Redshift Serverless so you can go from raw events to analytics-ready datasets in minutes.

Prerequisites

Before you get started, you need to install Upsolver. You can sign up for Upsolver and deploy it directly into your VPC to securely access Kinesis Data Streams and Amazon Redshift.

Configure Upsolver to stream events to Kinesis Data Streams

The following diagram represents the architecture to write events to Kinesis Data Streams and Amazon Redshift.

To implement this solution, you complete the following high-level steps:

  1. Configure the source Kinesis data stream.
  2. Execute the data pipeline.
  3. Create an Amazon Redshift external schema and materialized view.

Configure the source Kinesis data stream

For the purpose of this post, you create an Amazon S3 data source that contains sample retail data in JSON format. Upsolver ingests this data as a stream; as new objects arrive, they’re automatically ingested and streamed to the destination.

  1. On the Upsolver console, choose Data Sources in the navigation sidebar.
  2. Choose New.
  3. Choose Amazon S3 as your data source.
  4. For Bucket, you can use the bucket with the public dataset or a bucket with your own data.
  5. Choose Continue to create the data source.
  6. Create a data stream in Kinesis Data Streams, as shown in the following screenshot.

This is the output stream Upsolver uses to write events that are consumed by Amazon Redshift.

Next, you create a Kinesis connection in Upsolver. Creating a connection enables you to define the authentication method Upsolver uses—for example, an AWS Identity and Access Management (IAM) access key and secret key or an IAM role.

  1. On the Upsolver console, choose More in the navigation sidebar.
  2. Choose Connections.
  3. Choose New Connection.
  4. Choose Amazon Kinesis.
  5. For Region, enter your AWS Region.
  6. For Name, enter a name for your connection (for this post, we name it upsolver_redshift).
  7. Choose Create.

Before you can consume the events in Amazon Redshift, you must write them to the output Kinesis data stream.

  1. On the Upsolver console, navigate to Outputs and choose Kinesis.
  2. For Data Sources, choose the Kinesis data source you created in the previous step.
  3. Depending on the structure of your event data, you have two choices:
    1. If the event data you’re writing to the output doesn’t contain any nested fields, select Tabular. Upsolver automatically flattens nested data for you.
    2. To write your data in a nested format, select Hierarchical.
  4. Because we’re working with Kinesis Data Streams, select Hierarchical.

Execute the data pipeline

Now that the stream is connected from the source to an output, you must select which fields of the source event you wish to pass through. You can also choose to apply transformations to your data—for example, adding correct timestamps, masking sensitive values, and adding computed fields. For more information, refer to Quick guide: SQL data transformation.

After adding the columns you want to include in the output and applying transformations, choose Run to start the data pipeline. As new events arrive in the source, Upsolver automatically transforms them and forwards the results to the output stream. There is no need to schedule or orchestrate the pipeline; it’s always on.

Create an Amazon Redshift external schema and materialized view

First, create an IAM role with the appropriate permissions (for more information, refer to Streaming ingestion). Now you can use the Amazon Redshift query editor, AWS Command Line Interface (AWS CLI), or API to run the following SQL statements.

  1. Create an external schema that is backed by Kinesis Data Streams. The following command requires you to include the IAM role you created earlier:
    CREATE EXTERNAL SCHEMA upsolver
    FROM KINESIS
    IAM_ROLE 'arn:aws:iam::123456789012:role/redshiftadmin';

  2. Create a materialized view that allows you to run a SELECT statement against the event data that Upsolver produces:
    CREATE MATERIALIZED VIEW mv_orders AS
    SELECT ApproximateArrivalTimestamp, SequenceNumber,
       json_extract_path_text(from_varbyte(Data, 'utf-8'), 'orderId') as order_id,
       json_extract_path_text(from_varbyte(Data, 'utf-8'), 'shipmentStatus') as shipping_status
    FROM upsolver.upsolver_redshift;

  3. Instruct Amazon Redshift to materialize the results to a table called mv_orders:
    REFRESH MATERIALIZED VIEW mv_orders;

  4. You can now run queries against your streaming data, such as the following:
    SELECT * FROM mv_orders;

Use Upsolver to write data to a data lake and query it with Amazon Redshift Serverless

The following diagram represents the architecture to write events to your data lake and query the data with Amazon Redshift.

To implement this solution, you complete the following high-level steps:

  1. Configure the source Kinesis data stream.
  2. Connect to the AWS Glue Data Catalog and update the metadata.
  3. Query the data lake.

Configure the source Kinesis data stream

We already completed this step earlier in the post, so you don’t need to do anything different.

Connect to the AWS Glue Data Catalog and update the metadata

To update the metadata, complete the following steps:

  1. On the Upsolver console, choose More in the navigation sidebar.
  2. Choose Connections.
  3. Choose the AWS Glue Data Catalog connection.
  4. For Region, enter your Region.
  5. For Name, enter a name (for this post, we call it redshift serverless).
  6. Choose Create.
  7. Create a Redshift Spectrum output, following the same steps from earlier in this post.
  8. Select Tabular as we’re writing output in table-formatted data to Amazon Redshift.
  9. Map the data source fields to the Redshift Spectrum output.
  10. Choose Run.
  11. On the Amazon Redshift console, create an Amazon Redshift Serverless endpoint.
  12. Make sure you associate your Upsolver role to Amazon Redshift Serverless.
  13. When the endpoint launches, open the new Amazon Redshift query editor to create an external schema that points to the AWS Glue Data Catalog (see the following screenshot).

This enables you to run queries against data stored in your data lake.

Query the data lake

Now that your Upsolver data is being automatically written and maintained in your data lake, you can query it using your preferred tool and the Amazon Redshift query editor, as shown in the following screenshot.

Conclusion

In this post, you learned how to use Upsolver to stream event data into Amazon Redshift using streaming ingestion for Kinesis Data Streams. You also learned how you can use Upsolver to write the stream to your data lake and query it using Amazon Redshift Serverless.

Upsolver makes it easy to build data pipelines using SQL and automates the complexity of pipeline management, scaling, and maintenance. Upsolver and Amazon Redshift enable you to quickly and easily analyze data in real time.

If you have any questions, or wish to discuss this integration or explore other use cases, start the conversation in our Upsolver Community Slack channel.


About the Authors

Roy Hasson is the Head of Product at Upsolver. He works with customers globally to simplify how they build, manage and deploy data pipelines to deliver high quality data as a product. Previously, Roy was a Product Manager for AWS Glue and AWS Lake Formation.

Mei Long is a Product Manager at Upsolver. She is on a mission to make data accessible, usable and manageable in the cloud. Previously, Mei played an instrumental role working with the teams that contributed to the Apache Hadoop, Spark, Zeppelin, Kafka, and Kubernetes projects.

Maneesh Sharma is a Senior Database Engineer  at AWS with more than a decade of experience designing and implementing large-scale data warehouse and analytics solutions. He collaborates with various Amazon Redshift Partners and customers to drive better integration.

Build a serverless pipeline to analyze streaming data using AWS Glue, Apache Hudi, and Amazon S3

Post Syndicated from Nikhil Khokhar original https://aws.amazon.com/blogs/big-data/build-a-serverless-pipeline-to-analyze-streaming-data-using-aws-glue-apache-hudi-and-amazon-s3/

Organizations typically accumulate massive volumes of data and continue to generate ever-exceeding data volumes, ranging from terabytes to petabytes and at times to exabytes of data. Such data is usually generated in disparate systems and requires an aggregation into a single location for analysis and insight generation. A data lake architecture allows you to aggregate data present in various silos, store it in a centralized repository, enforce data governance, and support analytics and machine learning (ML) on top of this stored data.

Typical building blocks to implement such an architecture include a centralized repository built on Amazon Simple Storage Service (Amazon S3) providing the least possible unit cost of storage per GB, big data ETL (extract, transform, and load) frameworks such as AWS Glue, and analytics using Amazon Athena, Amazon Redshift, and Amazon EMR notebooks.

Building such systems involves technical challenges. For example, data residing in S3 buckets can’t be updated in-place using standard data ingestion approaches. Therefore, you must perform constant ad-hoc ETL jobs to consolidate data into new S3 files and buckets.

This is especially the case with streaming sources, which require constant support for increasing data velocity to provide faster insights generation. An example use case might be an ecommerce company looking to build a real-time date lake. They need their solution to do the following:

  • Ingest continuous changes (like customer orders) from upstream systems
  • Capture tables into the data lake
  • Provide ACID properties on the data lake to support interactive analytics by enabling consistent views on data while new data is being ingested
  • Provide schema flexibility due to upstream data layout changes and provisions for late arrival of data

To deliver on these requirements, organizations have to build custom frameworks to handle in-place updates (also referred as upserts), handle small files created due to the continuous ingestion of changes from upstream systems (such as databases), handle schema evolution, and compromise on providing ACID guarantees on its data lake.

A processing framework like Apache Hudi can be a good way solve such challenges. Hudi allows you to build streaming data lakes with incremental data pipelines, with support for transactions, record-level updates, and deletes on data stored in data lakes. Hudi is integrated with various AWS analytics services, like AWS Glue, Amazon EMR, Athena, and Amazon Redshift. This helps you ingest data from a variety of sources via batch streaming while enabling in-place updates to an append-oriented storage system such as Amazon S3 (or HDFS). In this post, we discuss a serverless approach to integrate Hudi with a streaming use case and create an in-place updatable data lake on Amazon S3.

Solution overview

We use Amazon Kinesis Data Generator to send sample streaming data to Amazon Kinesis Data Streams. To consume this streaming data, we set up an AWS Glue streaming ETL job that uses the Apache Hudi Connector for AWS Glue to write ingested and transformed data to Amazon S3, and also creates a table in the AWS Glue Data Catalog.

After the data is ingested, Hudi organizes a dataset into a partitioned directory structure under a base path pointing to a location in Amazon S3. Data layout in these partitioned directories depends on the Hudi dataset type used during ingestion, such as Copy on Write (CoW) and Merge on Read (MoR). For more information about Hudi storage types, see Using Athena to Query Apache Hudi Datasets and Storage Types & Views.

CoW is the default storage type of Hudi. In this storage type, data is stored in columnar format (Parquet). Each ingestion creates a new version of files during a write. With CoW, each time there is an update to a record, Hudi rewrites the original columnar file containing the record with the updated values. Therefore, this is better suited for read-heavy workloads on data that changes less frequently.

The MoR storage type is stored using a combination of columnar (Parquet) and row-based (Avro) formats. Updates are logged to row-based delta files and are compacted to create new versions of columnar files. With MoR, each time there is an update to a record, Hudi writes only the row for the changed record into the row-based (Avro) format, which is compacted (synchronously or asynchronously) to create columnar files. Therefore, MoR is better suited for write or change-heavy workloads with a lesser amount of read.

For this post, we use the CoW storage type to illustrate our use case of creating a Hudi dataset and serving the same via a variety of readers. You can extend this solution to support MoR storage via selecting the specific storage type during ingestion. We use Athena to read the dataset. We also illustrate the capabilities of this solution in terms of in-place updates, nested partitioning, and schema flexibility.

The following diagram illustrates our solution architecture.

Create the Apache Hudi connection using the Apache Hudi Connector for AWS Glue

To create your AWS Glue job with an AWS Glue custom connector, complete the following steps:

  1. On the AWS Glue Studio console, choose Marketplace in the navigation pane.
  2. Search for and choose Apache Hudi Connector for AWS Glue.
  3. Choose Continue to Subscribe.

  4. Review the terms and conditions and choose Accept Terms.
  5. Make sure that the subscription is complete and you see the Effective date populated next to the product, then choose Continue to Configuration.
  6. For Delivery Method, choose Glue 3.0.
  7. For Software Version, choose the latest version (as of this writing, 0.9.0 is the latest version of the Apache Hudi Connector for AWS Glue).
  8. Choose Continue to Launch.
  9. Under Launch this software, choose Usage Instructions and then choose Activate the Glue connector for Apache Hudi in AWS Glue Studio.

You’re redirected to AWS Glue Studio.

  1. For Name, enter a name for your connection (for example, hudi-connection).
  2. For Description, enter a description.
  3. Choose Create connection and activate connector.

A message appears that the connection was successfully created, and the connection is now visible on the AWS Glue Studio console.

Configure resources and permissions

For this post, we provide an AWS CloudFormation template to create the following resources:

  • An S3 bucket named hudi-demo-bucket-<your-stack-id> that contains a JAR artifact copied from another public S3 bucket outside of your account. This JAR artifact is then used to define the AWS Glue streaming job.
  • A Kinesis data stream named hudi-demo-stream-<your-stack-id>.
  • An AWS Glue streaming job named Hudi_Streaming_Job-<your-stack-id> with a dedicated AWS Glue Data Catalog named hudi-demo-db-<your-stack-id>. Refer to the aws-samples github repository for the complete code of the job.
  • AWS Identity and Access Management (IAM) roles and policies with appropriate permissions.
  • AWS Lambda functions to copy artifacts to the S3 bucket and empty buckets first upon stack deletion.

To create your resources, complete the following steps:

  1. Choose Launch Stack:
  2. For Stack name, enter hudi-connector-blog-for-streaming-data.
  3. For HudiConnectionName, use the name you specified in the previous section.
  4. Leave the other parameters as default.
  5. Choose Next.
  6. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  7. Choose Create stack.

Set up Kinesis Data Generator

In this step, you configure Kinesis Data Generator to send sample data to a Kinesis data stream.

  1. On the Kinesis Data Generator console, choose Create a Cognito User with CloudFormation.

You’re redirected to the AWS CloudFormation console.

  1. On the Review page, in the Capabilities section, select I acknowledge that AWS CloudFormation might create IAM resources.
  2. Choose Create stack.
  3. On the Stack details page, in the Stacks section, verify that the status shows CREATE_COMPLETE.
  4. On the Outputs tab, copy the URL value for KinesisDataGeneratorUrl.
  5. Navigate to this URL in your browser.
  6. Enter the user name and password provided and choose Sign In.

Start an AWS Glue streaming job

To start an AWS Glue streaming job, complete the following steps:

  1. On the AWS CloudFormation console, navigate to the Resources tab of the stack you created.
  2. Copy the physical ID corresponding to the AWS::Glue::Job resource.
  3. On the AWS Glue Studio console, find the job name using the physical ID.
  4. Choose the job to review the script and job details.
  5. Choose Run to start the job.
  6. On the Runs tab, validate if the job is successfully running.

Send sample data to a Kinesis data stream

Kinesis Data Generator generates records using random data based on a template you provide. Kinesis Data Generator extends faker.js, an open-source random data generator.

In this step, you use Kinesis Data Generator to send sample data using a sample template using the faker.js documentation to the previously created data stream created at one record per second rate. You sustain the ingestion until the end of this tutorial to achieve reasonable data for analysis while performing the remaining steps.

  1. On the Kinesis Data Generator console, for Records per second, choose the Constant tab, and change the value to 1.
  2. For Record template, choose the Template 1 tab, and enter the following code sample into the text box:
    {
     "name" : "{{random.arrayElement(["Person1","Person2","Person3", "Person4"])}}",  
     "date": "{{date.utc(YYYY-MM-DD)}}",
     "year": "{{date.utc(YYYY)}}",
     "month": "{{date.utc(MM)}}",
     "day": "{{date.utc(DD)}}",
     "column_to_update_integer": {{random.number(1000000000)}},
     "column_to_update_string": "{{random.arrayElement(["White","Red","Yellow", "Silver"])}}" 
    }

  3. Choose Test template.
  4. Verify the structure of the sample JSON records and choose Close.
  5. Choose Send data.
  6. Leave the Kinesis Data Generator page open to ensure sustained streaming of random records into the data stream.

Continue through the remaining steps while you generate your data.

Verify dynamically created resources

While you’re generating data for analysis, you can verify the resources you created.

Amazon S3 dataset

When the AWS Glue streaming job runs, the records from the Kinesis data stream are consumed and stored in an S3 bucket. While creating Hudi datasets in Amazon S3, the streaming job can also create a nested partition structure. This is enabled through the usage of Hudi configuration properties hoodie.datasource.write.partitionpath.field and hoodie.datasource.write.keygenerator.class in the streaming job definition.

In this example, nested partitions have been created by name, year, month, and day. The values of these properties are set as follows in the script for the AWS Glue streaming job.

For further details on how CustomKeyGenerator works to generate such partition paths, refer to Apache Hudi Key Generators.

The following screenshot shows the nested partitions created in Amazon S3.

AWS Glue Data Catalog table

A Hudi table is also created in the AWS Glue Data Catalog and mapped to the Hudi datasets on Amazon S3. See the following code in the AWS Glue streaming job.

The following table provides more details on the configuration options.

hoodie.datasource.hive_sync.enable Indicates if the table is synced to Apache Hive Metastore.
hoodie.datasource.hive_sync.sync_as_datasource Avoids breaking changes introduced with HUDI-1415 (JIRA).
hoodie.datasource.hive_sync.database The database name for your Data Catalog.
hoodie.datasource.hive_sync.table The table name in your Data Catalog.
hoodie.datasource.hive_sync.use_jdbc Uses JDBC for Hive synchronization. For more information, see the GitHub repo.
hoodie.datasource.write.hive_style_partitioning Creates partitions with <partition_column_name>=<partition_value> format.
hoodie.datasource.hive_sync.partition_extractor_class Required for nested partitioning.
hoodie.datasource.hive_sync.partition_fields Columns in the table to use for Hive partition columns.

The following screenshot shows the Hudi table in the Data Catalog and the associated S3 bucket.

Read results using Athena

Using Hudi with an AWS Glue streaming job allows us to have in-place updates (upserts) on the Amazon S3 data lake. This functionality allows for incremental processing, which enables faster and more efficient downstream pipelines. Apache Hudi enables in-place updates with the following steps:

  1. Define an index (using columns of the ingested record).
  2. Use this index to map every subsequent ingestion to the record storage locations (in our case Amazon S3) ingested previously.
  3. Perform compaction (synchronously or asynchronously) to allow the retention of the latest record for a given index.

In reference to our AWS Glue streaming job, the following Hudi configuration options enable us to achieve in-place updates for the generated schema.

The following table provides more details of the highlighted configuration options.

hoodie.datasource.write.recordkey.field Indicates the column to be used within the ingested record for the Hudi index.
hoodie.datasource.write.operation Defines the nature of operation on the Hudi dataset. In this example, it’s set to upsert for in-place updates.
hoodie.datasource.write.table.type Indicates the Hudi storage type to be used. In this example, it’s set to COPY_ON_WRITE.
hoodie.datasource.write.precombine.field When two records have the same key value, Apache Hudi picks the one with the largest value for the precombined field.

To demonstrate an in-place update, consider the following input records sent to the AWS Glue streaming job via Kinesis Data Generator. The record identifier highlighted indicates the Hudi record key in the AWS Glue configuration. In this example, Person3 receives two updates. In first update, column_to_update_string is set to White; in the second update, it’s set to Red.

The streaming job processes these records and creates the Hudi datasets in Amazon S3. You can query the dataset using Athena. In the following example, we get the latest update.

Schema flexibility

The AWS Glue streaming job allows for automatic handling of different record schemas encountered during the ingestion. This is specifically useful in situations where record schemas can be subject to frequent changes. To elaborate on this point, consider the following scenario:

  • Case 1 – At time t1, the ingested record has the layout <col 1, col 2, col 3, col 4>
  • Case 2 – At time t2, the ingested record has an extra column, with new layout <col 1, col 2, col 3, col 4, col 5>
  • Case 3 – At time t3, the ingested record dropped the extra column and therefore has the layout <col 1, col 2, col 3, col 4>

For Case 1 and 2, the AWS Glue streaming job relies on the built-in schema evolution capabilities of Hudi, which enables an update to the Data Catalog with the extra column (col 5 in this case). Additionally, Hudi also adds an extra column in the output files (Parquet files written to Amazon S3). This allows for the querying engine (Athena) to query the Hudi dataset with an extra column without any issues.

Because Case 2 ingestion updates the Data Catalog, the extra column (col 5) is expected to be present in every subsequent ingested record. If we don’t resolve this difference, the job fails.

To overcome this and achieve Case 3, the streaming job defines a custom function named evolveSchema, which handles the record layout mismatches. The method queries the AWS Glue Data Catalog for each to-be-ingested record and gets the current Hudi table schema. It then merges the Hudi table schema with the schema of the to-be-ingested record and enriches the schema of the record before exposing with the Hudi dataset.

For this example, the to-be-ingested record’s schema <col 1, col 2, col 3, col 4> is modified to <col 1, col 2, col 3, col 4, col 5>, where the value of the extra col 5 is set to NULL.

To illustrate this, we stop the existing ingestion of Kinesis Data Generator and modify the record layout to send an extra column called new_column:

{
 "name" : "{{random.arrayElement(["Person1","Person2","Person3", "Person4"])}}",  
 "date": "{{date.utc(YYYY-MM-DD)}}",
 "year": "{{date.utc(YYYY)}}",
 "month": "{{date.utc(MM)}}",
 "day": "{{date.utc(DD)}}",
 "column_to_update_integer": {{random.number(1000000000)}},
 "column_to_update_string": "{{random.arrayElement(["White","Red","Yellow", "Silver"])}}",
 "new_column": "{{random.number(1000000000)}}" 
}

The Hudi table in the Data Catalog updates as follows, with the newly added column (Case 2).

When we query the Hudi dataset using Athena, we can see the presence of a new column.

We can now use Kinesis Data Generator to send records with an old schema—without the newly added column (Case 3).

In this scenario, our AWS Glue job keeps running. When we query using Athena, the extra added column gets populated with NULL values.

If we stop Kinesis Data Generator and start sending records with a schema containing extra columns, the job keeps running and the Athena query continues to return the latest values.

Clean up

To avoid incurring future charges, delete the resources you created as part of the CloudFormation stack.

Summary

This post illustrated how to set up a serverless pipeline using an AWS Glue streaming job with the Apache Hudi Connector for AWS Glue, which runs continuously and consumes data from Kinesis Data Streams to create a near-real-time data lake that supports in-place updates, nested partitioning, and schema flexibility.

You can also use Apache Kafka and Amazon Managed Streaming for Apache Kafka (Amazon MSK) as the source of a similar streaming job. We encourage you to use this approach for setting up a near-real-time data lake. As always, AWS welcomes feedback, so please leave your thoughts or questions in the comments.


About the Authors

Nikhil Khokhar is a Solutions Architect at AWS. He joined AWS in 2016 and specializes in building and supporting data streaming solutions that help customers analyze and get value out of their data. In his free time, he makes use of his 3D printing skills to solve everyday problems.

Dipta S Bhattacharya is a Solutions Architect Manager at AWS. Dipta joined AWS in 2018. He works with large startup customers to design and develop architectures on AWS and support their journey on the cloud.

Disaster recovery approaches for Db2 databases on AWS

Post Syndicated from Sai Parthasaradhi original https://aws.amazon.com/blogs/architecture/disaster-recovery-approaches-for-db2-databases-on-aws/

As you migrate your critical enterprise workloads from an IBM Db2 on-premises database to the AWS Cloud, it’s critical to have a reliable and effective disaster recovery (DR) strategy. This helps the database applications operate with little or no disruption from unexpected events like a natural disaster.

Recovery point objective (RPO), recovery time objective (RTO), and cost, are three key metrics to consider when developing your DR strategy, (see Figure 1.) Based on these metrics, you can define your DR strategy for Db2 databases on AWS. It can be either an on-demand backup restore approach or nearly continuous replication method.

Figure 1. Disaster recovery strategies

Figure 1. Disaster recovery strategies

In this post, we show an overview of active/passive cross-Region disaster recovery options for the Db2 database on Amazon Elastic Compute Cloud (Amazon EC2). This solution uses native Db2 features and AWS services such as Amazon Simple Storage Service (Amazon S3), Amazon Elastic File System (Amazon EFS), and Amazon VPC Peering connection.

Approach 1: Db2 log shipping

In this approach, the transactional log files produced by the primary database are made available to the standby database via a log archive location. The transaction logs from the archive location can be replayed on the standby database by manually applying the Rollforward command, or by setting up user exit programs.

We can use Amazon S3 or Amazon EFS as the log archive location to share the logs with the standby database hosted in a secondary AWS Region.

Using Amazon S3:

Starting Db2 11.5.7, we can specify DB2REMOTE Amazon S3 storage for LOGARCHMETH1 and LOGARCHMETH2 database log archive method configuration parameters. This enables us to archive/retrieve transaction logs to/from Amazon S3.

In Figure 2, we enable Amazon S3 Cross-Region Replication (CRR) between the S3 buckets in the primary and the DR AWS Regions. This permits the transaction logs to be replicated into the S3 bucket in the DR Region.

We set up an AWS Lambda function to tell AWS Systems Manager (SSM) to run a command document. This document runs a bash script containing Rollforward command on the standby database instance. The Lambda function can be invoked based on the S3 bucket events in the DR Region.

Figure 2. Db2 log shipping using S3 Cross-Region Replication

Figure 2. Db2 log shipping using S3 Cross-Region Replication

This approach works as follows:

  • The transactions are committed and the active transaction log files gets closed on the primary database. It then marks the log file as ready for archive into the destination (the S3 bucket.)
  • The database asynchronously archives the log files into the S3 bucket archive location in the primary Region. This gets replicated to the S3 bucket in the DR Region.
  • This S3 event in the DR Region will initiate an AWS Lambda function to apply the Rollforward database operation on the standby database.
  • Db2 pulls the logs from the S3 bucket in the DR Region and applies them to the standby database.
  • When the primary Region is unavailable, initiate failover manually or by using scripts on the standby database. Use the Rollforward command so that the database can replay up to the end of logs and stop and be ready to accept client connections.

Using Amazon EFS:

In this approach, we configure the database parameter LOGARCHMETH1 with Amazon EFS as an archive location for transaction logs using the DISK option. It will push the transaction logs to a directory on Amazon EFS.

As shown in Figure 3, we configure a Replication for Amazon EFS to automatically replicate the database archive logs to the EFS in the DR Region. This can be mounted on the standby database.

Figure 3. Db2 log shipping using Amazon EFS replication

Figure 3. Db2 log shipping using Amazon EFS replication

This approach replicates transaction logs to EFS. We can schedule a script for every few minutes that runs the Rollforward command to replay the logs on the standby database.

Alternatively, we can use the user exit programs provided along with the Db2 installation. This automatically applies the logs with the log archive method LOGARCHMETH1 with the parameter value set to USEREXIT.

This approach has the following advantages:

  1. Straightforward setup, with minimal database configurations.
  2. This can be a DR option for multi-partitioned database environments or environments where federation is set up with two-phase commit for federated transactions.
  3. Bulk load operations on the primary database can be replayed on standby by sharing the load image using EFS.
  4. Rollforward operation progress can be checked on standby using monitoring commands.

Limitations of this approach are as follows:

  1. We cannot connect to the standby database to offload read-only workloads as the database will be in Rollforward recovery mode.
  2. We must write custom scripts like Lambda, user exit programs, or bash scripts to replay the logs on the standby database.
  3. Non-logged operations, such as database configuration parameters or nonrecoverable bulk data loads, are not replayed on standby database.
  4. Automated failover to standby is not possible.

Approach 2: Db2 highly available and disaster recovery (HADR) auxiliary standby

In this approach, we set up Db2 Highly Available and Disaster Recovery (HADR) to deploy an auxiliary Db2 standby database in a secondary or DR AWS Region.

The architecture for this approach is shown in Figure 4, and works as follows:

  • We establish TCP/IP connectivity between the primary and auxiliary Db2 standby database using Amazon VPC Peering connection.
  • Any transaction written on the primary Db2 database is committed without waiting for replication onto the auxiliary standby database.
  • Replicated transactions are replayed on the auxiliary standby database, which connects with the primary database in a remote catchup state.
  • When the primary AWS Region is unavailable, promote standby database to primary using the takeover commands manually.
Figure 4. Db2 HADR with auxiliary standby database

Figure 4. Db2 HADR with auxiliary standby database

This approach has the following advantages:

  1. The replication is handled by the database automatically without the need for custom scripts.
  2. We can enable reads on standby to offload read-only workload, such as reporting from the primary database to stand by. This will reduce the load on the primary database.
  3. Key metrics such as replication lag, connection status, and errors can be monitored from the primary database.

Limitations of this approach are as follows:

  1. Non-logged operations, such as database configuration parameters or nonrecoverable bulk data loads are not replayed on the standby database.
  2. This approach is not supported in a multi-partitioned database environment or two phase commit federated transactions.
  3. Automated failover to standby is not possible.
  4. There are various other restrictions, which must be evaluated.

Conclusion

In this post, we discussed how to set up a disaster recovery Db2 database using database native features and AWS services. We discussed the advantages and restrictions for each. You can use this post as a reference for setting up the right disaster recovery approach for your database to minimize data loss and maintain business continuity. Let us know your comments, we always love your feedback!

For further reading:

New – Additional Checksum Algorithms for Amazon S3

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/new-additional-checksum-algorithms-for-amazon-s3/

Amazon Simple Storage Service (Amazon S3) is designed to provide 99.999999999% (11 9s) of durability for your objects and for the metadata associated with your objects. You can rest assured that S3 stores exactly what you PUT, and returns exactly what is stored when you GET. In order to make sure that the object is transmitted back-and-forth properly, S3 uses checksums, basically a kind of digital fingerprint.

S3’s PutObject function already allows you to pass the MD5 checksum of the object, and only accepts the operation if the value that you supply matches the one computed by S3. While this allows S3 to detect data transmission errors, it does mean that you need to compute the checksum before you call PutObject or after you call GetObject. Further, computing checksums for large (multi-GB or even multi-TB) objects can be computationally intensive, and can lead to bottlenecks. In fact, some large S3 users have built special-purpose EC2 fleets solely to compute and validate checksums.

New Checksum Support
Today I am happy to tell you about S3’s new support for four checksum algorithms. It is now very easy for you to calculate and store checksums for data stored in Amazon S3 and to use the checksums to check the integrity of your upload and download requests. You can use this new feature to implement the digital preservation best practices and controls that are specific to your industry. In particular, you can specify the use of any one of four widely used checksum algorithms (SHA-1, SHA-256, CRC-32, and CRC-32C) when you upload each of your objects to S3.

Here are the principal aspects of this new feature:

Object Upload – The newest versions of the AWS SDKs compute the specified checksum as part of the upload, and include it in an HTTP trailer at the conclusion of the upload. You also have the option to supply a precomputed checksum. Either way, S3 will verify the checksum and accept the operation if the value in the request matches the one computed by S3. In combination with the use of HTTP trailers, this feature can greatly accelerate client-side integrity checking.

Multipart Object Upload – The AWS SDKs now take advantage of client-side parallelism and compute checksums for each part of a multipart upload. The checksums for all of the parts are themselves checksummed and this checksum-of-checksums is transmitted to S3 when the upload is finalized.

Checksum Storage & Persistence – The verified checksum, along with the specified algorithm, are stored as part of the object’s metadata. If Server-Side Encryption with KMS Keys is requested for the object, then the checksum is stored in encrypted form. The algorithm and the checksum stick to the object throughout its lifetime, even if it changes storage classes or is superseded by a newer version. They are also transferred as part of S3 Replication.

Checksum Retrieval – The new GetObjectAttributes function returns the checksum for the object and (if applicable) for each part.

Checksums in Action
You can access this feature from the AWS Command Line Interface (CLI), AWS SDKs, or the S3 Console. In the console, I enable the Additional Checksums option when I prepare to upload an object:

Then I choose a Checksum function:

If I have already computed the checksum I can enter it, otherwise the console will compute it.

After the upload is complete I can view the object’s properties to see the checksum:

The checksum function for each object is also listed in the S3 Inventory Report.

From my own code, the SDK can compute the checksum for me:

with open(file_path, 'rb') as file:
    r = s3.put_object(
        Bucket=bucket,
        Key=key,
        Body=file,
        ChecksumAlgorithm='sha1'
    )

Or I can compute the checksum myself and pass it to put_object:

with open(file_path, 'rb') as file:
    r = s3.put_object(
        Bucket=bucket,
        Key=key,
        Body=file,
        ChecksumSHA1='fUM9R+mPkIokxBJK7zU5QfeAHSy='
    )

When I retrieve the object, I specify checksum mode to indicate that I want the returned object validated:

r = s3.get_object(Bucket=bucket, Key=key, ChecksumMode='ENABLED')

The actual validation happens when I read the object from r['Body'], and an exception will be raised if there’s a mismatch.

Watch the Demo
Here’s a demo (first shown at re:Invent 2021) of this new feature in action:

Available Now
The four additional checksums are now available in all commercial AWS Regions and you can start using them today at no extra charge.

Jeff;

Export JSON data to Amazon S3 using Amazon Redshift UNLOAD

Post Syndicated from Dipankar Kushari original https://aws.amazon.com/blogs/big-data/export-json-data-to-amazon-s3-using-amazon-redshift-unload/

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL. Amazon Redshift offers up to three times better price performance than any other cloud data warehouse. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads such as high-performance business intelligence (BI) reporting, dashboarding applications, data exploration, and real-time analytics.

As the amount of data generated by IoT devices, social media, and cloud applications continues to grow, organizations are looking to easily and cost-effectively analyze this data with minimal time-to-insight. A vast amount of this data is available in semi-structured format and needs additional extract, transform, and load (ETL) processes to make it accessible or to integrate it with structured data for analysis. Amazon Redshift powers the modern data architecture, which enables you to query data across your data warehouse, data lake, and operational databases to gain faster and deeper insights not possible otherwise. With a modern data architecture, you can store data in semi-structured format in your Amazon Simple Storage Service (Amazon S3) data lake and integrate it with structured data on Amazon Redshift. This allows you to make this data available to other analytics and machine learning applications rather than locking it in a silo.

In this post, we discuss the UNLOAD feature in Amazon Redshift and how to export data from an Amazon Redshift cluster to JSON files on an Amazon S3 data lake.

JSON support features in Amazon Redshift

Amazon Redshift features such as COPY, UNLOAD, and Amazon Redshift Spectrum enable you to move and query data between your data warehouse and data lake.

With the UNLOAD command, you can export a query result set in text, JSON, or Apache Parquet file format to Amazon S3. UNLOAD command is also recommended when you need to retrieve large result sets from your data warehouse. Since UNLOAD processes and exports data in parallel from Amazon Redshift’s compute nodes to Amazon S3, this reduces the network overhead and thus time in reading large number of rows. When using the JSON option with UNLOAD, Amazon Redshift unloads to a JSON file with each line containing a JSON object, representing a full record in the query result. In the JSON file, Amazon Redshift types are unloaded as the closest JSON representation. For example, Boolean values are unloaded as true or false, NULL values are unloaded as null, and timestamp values are unloaded as strings. If a default JSON representation doesn’t suit a particular use case, you can modify it by casting to the desired type in the SELECT query of the UNLOAD statement.

Additionally, to create a valid JSON object, the name of each column in the query result must be unique. If the column names in the query result aren’t unique, the JSON UNLOAD process fails. To avoid this, we recommend using proper column aliases so that each column in the query result remains unique while getting unloaded. We illustrate this behavior later in this post.

With the Amazon Redshift SUPER data type, you can store data in JSON format on local Amazon Redshift tables. This way, you can process the data without any network overhead and use Amazon Redshift schema properties to optimally save and query semi structured data locally. In addition to achieving low latency, you can also use the SUPER data type when your query requires strong consistency, predictable query performance, complex query support, and ease of use with evolving schemas and schemaless data. Amazon Redshift supports writing nested JSON when the query result contains SUPER columns.

Updating and maintaining data with constantly evolving schemas can be challenging and adds extra ETL steps to the analytics pipeline. The JSON file format provides support for schema definition, is lightweight, and is widely used as a data transfer mechanism by different services, tools, and technologies.

Amazon OpenSearch Service (successor to Amazon Elasticsearch Service) is a distributed, open-source search and analytics suite used for a broad set of use cases like real-time application monitoring, log analytics, and website search. It uses JSON as the supported file format for data ingestion. The ability to unload data natively in JSON format from Amazon Redshift into the Amazon S3 data lake reduces complexity and additional data processing steps if that data needs to be ingested into Amazon OpenSearch Service for further analysis.

This is one example of how seamless data movement can help you build an integrated data platform with a data lake on Amazon S3, data warehouse on Amazon Redshift and search and log analytics using Amazon OpenSearch Service and any other JSON-oriented downstream analytics solution. For more information about the Lake House approach, see Build a Lake House Architecture on AWS.

Examples of Amazon Redshift JSON UNLOAD

In this post, we show you the following different scenarios:

  • Example 1 – Unload customer data in JSON format into Amazon S3, partitioning output files into partition folders, following the Apache Hive convention, with customer birth month as the partition key. We make a few changes to the columns in the SELECT statement of the UNLOAD command:
    • Convert the c_preferred_cust_flag column from character to Boolean
    • Remove leading and trailing spaces from the c_first_name, c_last_name, and c_email_address columns using the Amazon Redshift built-in function btrim
  • Example 2 – Unload line item data (with SUPER column) in JSON format into Amazon S3 with data not partitioned
  • Example 3 – Unload line item data (With SUPER column) in JSON format into Amazon S3, partitioning output files into partition folders, following the Apache Hive convention, with customer key as the partition key

For the first example, we used the customer table and data from the TPCDS dataset. For examples involving table with SUPER column, we used the customer_orders_lineitem table and data from the following tutorial.

Example 1: Export customer data

For this example, we used the customer table and data from TPCDS dataset. We created the database schema and customer table, and copied data into it. See the following code:

-- Created a new database
create schema json_unload_demo; 

-- created and populated customer table in the new schema

create table json_unload_demo.customer
(
  c_customer_sk int4 not null ,                 
  c_customer_id char(16) not null ,             
  c_current_cdemo_sk int4 ,   
  c_current_hdemo_sk int4 ,   
  c_current_addr_sk int4 ,    
  c_first_shipto_date_sk int4 ,                 
  c_first_sales_date_sk int4 ,
  c_salutation char(10) ,     
  c_first_name char(20) ,     
  c_last_name char(30) ,      
  c_preferred_cust_flag char(1) ,               
  c_birth_day int4 ,          
  c_birth_month int4 ,        
  c_birth_year int4 ,         
  c_birth_country varchar(20) ,                 
  c_login char(13) ,          
  c_email_address char(50) ,  
  c_last_review_date_sk int4 ,
  primary key (c_customer_sk)
) distkey(c_customer_sk);

copy json_unload_demo.customer from 's3://redshift-downloads/TPC-DS/2.13/3TB/customer/' 
iam_role '<<AWS IAM role attached to your amazon redshift cluster>>' 
gzip delimiter '|' EMPTYASNULL;

You can create a default AWS Identity and Access Management (IAM) role for your Amazon Redshift cluster to copy from and unload to your Amazon S3 location. For more information, see Use the default IAM role in Amazon Redshift to simplify accessing other AWS services.

In this example, we unloaded customer data for all customers with birth year 1992 in JSON format into Amazon S3 without any partitions. We make the following changes to the UNLOAD statement:

  • Convert the c_preferred_cust_flag column from character to Boolean
  • Remove leading and trailing spaces from the c_first_name, c_last_name, and c_email_address columns using the btrim function
  • Set the maximum size of exported files in Amazon S3 to 64 MB

See the following code:

unload ('SELECT c_customer_sk,
    c_customer_id ,
    c_current_cdemo_sk ,
    c_current_hdemo_sk ,
    c_current_addr_sk ,
    c_first_shipto_date_sk ,
    c_first_sales_date_sk ,
    c_salutation ,
    btrim(c_first_name),
    btrim(c_last_name),
    c_birth_day ,
    c_birth_month ,
    c_birth_year ,
    c_birth_country ,
    c_last_review_date_sk,
    DECODE(c_preferred_cust_flag, ''Y'', TRUE, ''N'', FALSE)::boolean as c_preferred_cust_flag_bool,
    c_login, 
    btrim(c_email_address) 
    from customer where c_birth_year = 1992;')
to 's3://<<Your Amazon S3 Bucket>>/non-partitioned/non-super/customer/' 
FORMAT JSON 
partition by (c_birth_month)  include
iam_role '<<AWS IAM role attached to your amazon redshift cluster>>'
MAXFILESIZE 64 MB;

When we ran the UNLOAD command, we encountered an error because the columns that used the btrim function all attempted to be exported as btrim (which is the default behavior of Amazon Redshift when the same function is applied to multiple columns that are selected together). To avoid this error, we need to use a unique column alias for each column where the btrim function was used.

If we select the c_first_name, c_last_name, and c_email_address columns by applying the btrim function and c_preferred_cust_flag, we can convert them from character to Boolean.

We ran the following query in Amazon Redshift Query Editor v2:

SELECT btrim(c_first_name) ,
    btrim(c_last_name),
    btrim(c_email_address) , 
    DECODE(c_preferred_cust_flag, 'Y', TRUE, 'N', FALSE)::boolean c_preferred_cust_flag_bool  
    from customer where c_birth_year = 1992 limit 10; 

All three columns that used the btrim function are set as btrim in the output result instead of their respective column name.

An error occurred in UNLOAD because we didn’t use a column alias.

We added column aliases in the following code:

unload ('SELECT c_customer_sk,
    c_customer_id ,
    c_current_cdemo_sk ,
    c_current_hdemo_sk ,
    c_current_addr_sk ,
    c_first_shipto_date_sk ,
    c_first_sales_date_sk ,
    c_salutation ,
    btrim(c_first_name) as c_first_name,
    btrim(c_last_name) as c_last_name,
    c_birth_day ,
    c_birth_month ,
    c_birth_year ,
    c_birth_country ,
    c_last_review_date_sk,
    DECODE(c_preferred_cust_flag, ''Y'', TRUE, ''N'', FALSE)::boolean as c_preferred_cust_flag_bool,
    c_login, 
    btrim(c_email_address) as c_email_addr_trimmed 
    from customer where c_birth_year = 1992;')
to 's3://<<Your Amazon S3 Bucket>>/non-partitioned/non-super/customer/' 
FORMAT JSON 
partition by (c_birth_month)  include
iam_role '<<AWS IAM role attached to your amazon redshift cluster>>'
MAXFILESIZE 64 MB;

After we added column aliases, the UNLOAD command completed successfully and files were exported to the desired location in Amazon S3.

The following screenshot shows data is unloaded in JSON format partitioning output files into partition folders, following the Apache Hive convention, with customer birth month as the partition key into Amazon S3 from the Amazon Redshift customer table.

A query with Amazon S3 Select shows a snippet of data in the JSON file on Amazon S3 that was unloaded.

The column aliases c_first_name, c_last_name, and c_email_addr_trimmed were written into the JSON record as per the SELECT query. Boolean values were saved in c_preferred_cust_flag_bool as well.

Examples 2 and 3: Using the SUPER column

For the next two examples, we used the customer_orders_lineitem table and data. We created the customer_orders_lineitem table and copied data into it with the following code:

-- Created a new table with SUPER column

CREATE TABLE JSON_unload_demo.customer_orders_lineitem
(c_custkey bigint
,c_name varchar
,c_address varchar
,c_nationkey smallint
,c_phone varchar
,c_acctbal decimal(12,2)
,c_mktsegment varchar
,c_comment varchar
,c_orders super
);

-- Loaded data into the new table
COPY json_unload_demo.customer_orders_lineitem 
FROM 's3://redshift-downloads/semistructured/tpch-nested/data/json/customer_orders_lineitem'
IAM_ROLE '<<AWS IAM role attached to your amazon redshift cluster>>'
FORMAT JSON 'auto';

Next, we ran a few queries to explore the customer_orders_lineitem table’s data:

select * from json_unload_demo.customer_orders_lineitem;

select c_orders from json_unload_demo.customer_orders_lineitem;

SELECT attr as attribute_name, val as object_value FROM json_unload_demo.customer_orders_lineitem c, c.c_orders o, UNPIVOT o AS val AT attr;

Example 2: Without partitions

In this example, we unloaded all the rows of the customer_orders_lineitem table in JSON format into Amazon S3 without any partitions:

unload ('select * from json_unload_demo.customer_orders_lineitem;')
to 's3://<<Your Amazon S3 Bucket>>/non-partitioned/super/customer-order-lineitem/'
FORMAT JSON
iam_role '<<AWS IAM role attached to your amazon redshift cluster>>';

After we run the UNLOAD command, the data is available in the desired Amazon S3 location. The following screenshot shows data is unloaded in JSON format without any partitions into Amazon S3 from the Amazon Redshift customer_orders_lineitem table.

A query with Amazon S3 Select shows a snippet of data in the JSON file on Amazon S3 that was unloaded.

Example 3: With partitions

In this example, we unloaded all the rows of the customer_orders_lineitem table in JSON format partitioning output files into partition folders, following the Apache Hive convention, with customer key as the partition key into Amazon S3:

unload ('select * from json_unload_demo.customer_orders_lineitem;')
to 's3://<<Your Amazon S3 Bucket>>/partitioned/super/customer-order-lineitem-1/'
FORMAT JSON
partition by (c_custkey) include
iam_role '<<AWS IAM role attached to your amazon redshift cluster>>';

After we run the UNLOAD command, the data is available in the desired Amazon S3 location. The following screenshot shows data is unloaded in JSON format partitioning output files into partition folders, following the Apache Hive convention, with customer key as the partition key into Amazon S3 from the Amazon Redshift customer_orders_lineitem table.

A query with Amazon S3 Select shows a snippet of data in the JSON file on Amazon S3 that got unloaded.

Conclusion

In this post, we showed how you can use the Amazon Redshift UNLOAD command to unload the result of a query to one or more JSON files into your Amazon S3 location. We also showed how you can partition the data using your choice of partition key while you unload the data. You can use this feature to export data to JSON files into Amazon S3 from your Amazon Redshift cluster or your Amazon Redshift Serverless endpoint to make your data processing simpler and build an integrated data analytics platform.


About the Authors

Dipankar Kushari is a Senior Analytics Solutions Architect with AWS.

Sayali Jojan is a Senior Analytics Solutions Architect with AWS. She has 7 years of experience working with customers to design and build solutions on the AWS Cloud, with a focus on data and analytics.

Cody Cunningham is a Software Development Engineer with AWS, working on data ingestion for Amazon Redshift.