Tag Archives: Analytics

Real-time anomaly detection via Random Cut Forest in Amazon Kinesis Data Analytics

Post Syndicated from Daren Wong original https://aws.amazon.com/blogs/big-data/real-time-anomaly-detection-via-random-cut-forest-in-amazon-kinesis-data-analytics/

Real-time anomaly detection describes a use case to detect and flag unexpected behavior in streaming data as it occurs. Online machine learning (ML) algorithms are popular for this use case because they don’t require any explicit rules and are able to adapt to a changing baseline, which is particularly useful for continuous streams of data where incoming data changes continuously over time.

Random Cut Forest (RCF) is one such algorithm widely used for anomaly detection use cases. In typical setups, we want to be able to run the RCF algorithm on input data with large throughput, and streaming data processing frameworks can help with that. We are excited to share that RCF is possible with Amazon Kinesis Data Analytics for Apache Flink. Apache Flink is a popular open-source framework for real-time, stateful computations over data streams, and can be used to run RCF on input streams with large throughput.

This post demonstrates how we can use Kinesis Data Analytics for Apache Flink to run an online RCF algorithm for anomaly detection.

Solution overview

The following diagram illustrates our architecture, which consists of three components: an input data stream using Amazon Kinesis Data Streams, a Flink job, and an output Kinesis data stream. In terms of data flow, we use a Python script to generate anomalous sine wave data into the input data stream, the data is then processed by RCF in a Flink job, and the resultant anomaly score is delivered to the output data stream.

The following graph shows an example of our expected result, which indicates that the anomaly score peaked when the sine wave data source anomalously dropped to constant -17.

We can implement this solution in three simple steps:

  1. Set up AWS resources via AWS CloudFormation.
  2. Set up a data generator to produce data into the source data stream.
  3. Run the RCF Flink Java code on Kinesis Data Analytics.

Set up AWS resources via AWS CloudFormation

The following CloudFormation stack will create all the AWS resources we need for this tutorial, including two Kinesis data streams, a Kinesis Data Analytics app, and an Amazon Simple Storage Service (Amazon S3) bucket.

Sign in to your AWS account, then choose Launch Stack:

BDB-2063-launch-cloudformation-stack

Follow the steps on the AWS CloudFormation console to create the stack.

Set up a data generator

Run the following Python script to populate the input data stream with the anomalous sine wave data:

import json
import boto3
import math 

STREAM_NAME = "ExampleInputStream-RCF"


def get_data(time):
    rad = (time/100)%360
    val = math.sin(rad)*10 + 10

    if rad > 2.4 and rad < 2.6:
        val = -17

    return {'time': time, 'value': val}

def generate(stream_name, kinesis_client):
    time = 0

    while True:
        data = get_data(time)
        kinesis_client.put_record(
            StreamName=stream_name,
            Data=json.dumps(data),
            PartitionKey="partitionkey")

        time += 1


if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))

Run the RCF Flink Java code on Kinesis Data Analytics

The CloudFormation stack automatically downloaded and packaged the RCF Flink job JAR file for you. Therefore, you can simply go to the Kinesis Data Analytics console to run your application.

That’s it! We now have a running Flink job that continuously reads in data from an input Kinesis data stream and calculates the anomaly score for each new data point given the previous data points it has seen.

The following sections explain the RCF implementation and Flink job code in more detail.

RCF implementation

Numerous RCF implementations are publicly available. For this tutorial, we use the AWS implementation by wrapping it around a custom wrapper (RandomCutForestOperator) to be used in our Flink job.

RandomCutForestOperator is implemented as an Apache Flink ProcessFunction, which is a function that allows us to write custom logic to process every element in the stream. Our custom logic starts with a data transformation via inputDataMapper.apply, followed by getting the anomaly score by calling the AWS RCF library via rcf.getAnomalyScore. The code implementation of RandomCutForestOperator can be found on GitHub.

RandomCutForestOperatorBuilder requires two main types of parameters:

  • RandomCutForestOperator hyperparameters – We use the following:
    • Dimensions – We set this to 1 because our input data is a 1-dimensional sine wave consisting of the float data type.
    • ShingleSize – We set this to 1, which means our RCF algorithm will take into account the previous and current data points in anomaly score deduction. Note that this can be increased to account for seasonality in data.
    • SampleSize – We set this to 628, which means a maximum of 628 data points is kept in the data sample for each tree.
  • DataMapper parameters for input and output processing – We use the following:
    • InputDataMapper – We use RandomCutForestOperator.SIMPLE_FLOAT_INPUT_DATA_MAPPER to map input data from float to float[].
    • ResultMapper – We use RandomCutForestOperator.SIMPLE_TUPLE_RESULT_DATA_MAPPER, which is a BiFunction that joins the anomaly score with the corresponding sine wave data point into a tuple.

Flink job code

The following code snippet illustrates the core streaming structure of our Apache Flink streaming Java code. It first reads in data from the source Kinesis data stream, then processes it using the RCF algorithm. The computed anomaly score is then written to an output Kinesis data stream.

DataStream<Float> sineWaveSource = createSourceFromStaticConfig(env);

sineWaveSource
        .process(
                RandomCutForestOperator.<Float, Tuple2<Float, Double>>builder()
                        .setDimensions(1)
                        .setShingleSize(1)
                        .setSampleSize(628)
                        .setInputDataMapper(RandomCutForestOperator.SIMPLE_FLOAT_INPUT_DATA_MAPPER)
                        .setResultMapper(RandomCutForestOperator.SIMPLE_TUPLE_RESULT_DATA_MAPPER)
                        .build(),
                TupleTypeInfo.getBasicTupleTypeInfo(Float.class, Double.class))
       .addSink(createSinkFromStaticConfig());

In this example, our baseline input data is a sine wave. As shown in the following screenshot, a low anomaly score is returned when the data is regular. However, when there is an anomaly in the data (when the sine wave input data drops to a constant), a high anomaly score is returned. The anomaly score is delivered into an output Kinesis data stream. You can visualize this result by creating a Kinesis Data Analytics Studio app; for instructions, refer to Interactive analysis of streaming data.

Because this is an unsupervised algorithm, you don’t need to provide any explicit rules or labeled datasets for this operator. In short, only the input data stream, data conversions, and some hyperparameters were provided. The RCF algorithm itself determined the expected baseline based on the input data and identified any unexpected behavior.

Furthermore, this means the model will continuously adapt even if the baseline changes over time. As such, minimal retraining cadence is required. This is powerful for anomaly detection on streaming data because the data will often drift slowly over time due seasonal trends, inflation, equipment calibration drift, and so on.

Clean up

To avoid incurring future charges, complete the following steps:

  1. On the Amazon S3 console, empty the S3 bucket created by the CloudFormation stack.
  2. On the AWS CloudFormation console, delete the CloudFormation stack.

Conclusion

This post demonstrated how to perform anomaly detection on input streaming data with RCF, an online unsupervised ML algorithm using Kinesis Data Analytics. We also showed how this algorithm learns the data baseline on its own, and can adapt to changes in the baseline over time. We hope you consider this solution for your real-time anomaly detection use cases.


About the Authors

Daren Wong is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Aleksandr Pilipenko is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Hong Liang Teoh is a Software Development Engineer in AWS. He works on Amazon Kinesis Data Analytics, the managed offering for running Apache Flink applications on AWS.

Introducing Athena Provisioned Capacity

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/introducing-athena-provisioned-capacity/

Today we launch the ability to provision capacity to run your Amazon Athena queries.

Athena is a query service that makes it simple to analyze data in Amazon Simple Storage Service (Amazon S3) data lakes and 30 different data sources, including on-premises data sources or other cloud systems, using standard SQL queries. Athena is serverless, so there is no infrastructure to manage, and–until today–you pay only for the queries that you run. Starting today, you can get dedicated capacity for your queries and use new workload management features to prioritize, control, and scale your most important queries, paying only for the capacity you provision.

At AWS, 90 percent of the new services and features are driven by your direct feedback. Many of you Athena customers told us that, when running a large volume of queries, you sometimes experience queuing, which might slow down some applications or business processes. To work around this, you typically create a query prioritization mechanism to prioritize mission-critical queries over less critical, interactive, or exploratory queries. This prioritization mechanism helps to get the highest priority queries run first, at the price of building and maintaining code or business processes outside of Athena itself. You also told us it is difficult to forecast your Athena costs. Athena charges by the volume of data scanned, which is often difficult to predict as it depends on the size of your data set, the construction of the user queries, and the storage format for the data.

We heard this feedback, and today, we introduce the capability to provision dedicated query processing capacity at scale. With provisioned capacity, you provision a dedicated set of compute resources to run your queries. This always-on capacity can serve your business-critical queries with near-zero latency and no queuing. It gives you control over workload performance characteristics such as cost, concurrency, and query prioritization. Similar to provisioned capacity for other AWS services, you pay only for the capacity provisioned, not for the actual usage. With provisioned capacity, your Athena bills are predictable, and you do not have to limit user queries to stay within your monthly budget. I’ll share more about the billing model down below.

Behind the scenes, Athena maintains a large pool of compute in each AWS Region that it operates in. You can think of this as one large pool of compute, divided logically across customers. When you reserve capacity in Athena, the capacity is held for your exclusive use. You can choose which queries run on the capacity you provisioned and which run on Athena’s multi-tenant, on-demand capacity. Multiple queries can share the capacity you provisioned. You may add additional capacity units at any time, based on your evolving business requirements. You may also adjust the provisioned capacity down after a minimum period of time of 8 hours.

The unit of capacity is a Data Processing Unit (DPU). A single DPU is equivalent to four vCPU and 16 Gb RAM. The minimum capacity you may provision is 24 DPU for 8 hours. This new provisioned capacity for Athena is ideal for those of you running any volume of queries, but the sweet spot to start using provisioned capacity is when you spend $100 or more per month on Athena.

The number of DPUs you need depends on your goals and analysis patterns. For example, if you need queries to start immediately and without queuing, you should provision enough DPUs to meet your peak concurrent query demand. Provisioning fewer DPUs than your peak demand is allowed, but may result in queuing. When this occurs, queries are held in a queue and executed when capacity is available. If your goal is to run queries within a fixed budget, you can use the AWS Pricing Calculator to determine the number of DPUs that meets your budget. Lastly, remember that data size, storage format, and query construction influence the number of DPU a query requires. You can increase query performance by compressing, partitioning, and converting your data into columnar formats. Athena’s documentation provides you with guidelines to determine how much capacity you might require to run multiple queries at the same time.

How Does It Work?
Getting started is a three-step process. I navigate to the Athena page in the AWS Management Console and select Capacity Reservations on the left-side navigation menu.
(The console you see on this demo is based on the new Cloudscape open-source design system, yours might still see the traditional design on your AWS account.)

Athena Capacity Reservation landing page in the console

I select the Create capacity reservation button at the top right of the page.

On the Create capacity reservation page, I enter a Capacity reservation name and the number of DPUs I want to provision.

Athena Capacity Reservation - Create Reservation

I select Review to review my choices, and I select Create capacity reservation to create my reservation. After a brief period of time, the capacity reservation status becomes ✅ Active.

Athena Capacity Reservation - Status

The third and last step is to create a workgroup and assign the workgroup to the provisioned capacity. A workgroup is an Athena mechanism allowing you to separate users, teams, applications, or workloads to set limits on the amount of data each query or the entire workgroup can process and to track costs.

Queries belonging to the assigned workgroup will run on the capacity you provisioned. Capacity may be shared with multiple workgroups as long as they all use the same Athena engine version. This concept, depicted in the diagram below, is surfaced through a capacity allocation policy, which defines how capacity is assigned over workgroups. This gives you the flexibility to run queries with more or less capacity, depending on your business needs.

Athena Capacity Reservation - shared workgroups

To create a workgroup, I navigate to the Workgroups section of the Athena page. Then, I select Create workgroup.

Athena Capacity Reservation - Create Workgroup

I make sure the analytics engine selected in the reservation matches the one in the workgroup.

Athena Capacity Reservation - select analytic engineThen, I go back to the capacity reservation I just created, and I select Add workgroups to add the workgroup I just created.

Athena Capacity Reservation - Add workgroup

That’s it! Now that the configuration is ready, I can run my queries. Existing queries will run on the provisioned capacity unmodified. I make sure to select the workgroup I just created when I run queries. I choose a workgroup on the top right side of the query editor, or use the --work-group argument on the AWS command line, such as:

aws athena start-query-execution --work-group AWSNewsBlog

Athena Capacity Reservation - Select workgroup

Availability and Pricing
As I explained in the introduction, we charge for the number of DPUs you provisioned and the duration. The minimum duration is 8 hours, and after that, billing is per minute. You can release the provisioned capacity at any time. Cancellations within the minimum duration period are billed for the full term, and capacity is deallocated as soon as all currently running queries are terminated.

Queries run from a workgroup assigned to a provisioned capacity are not billed for the amount of data scanned. You effectively pay a flat rate depending on the provisioned capacity, not the usage. If you have excess capacity, you can reduce the number of DPUs you provisioned or add workgroups to consume the excess capacity.

As usual, the Athena pricing page has all the details.

Athena provisioned capacity is available today in US East (Ohio, N. Virginia), US West (Oregon), Asia Pacific (Singapore, Sydney, Tokyo), and Europe (Ireland, Stockholm) AWS Regions.

Go and provision your Athena capacity today!

— seb

Connect Kafka client applications securely to your Amazon MSK cluster from different VPCs and AWS accounts

Post Syndicated from Ali Alemi original https://aws.amazon.com/blogs/big-data/connect-kafka-client-applications-securely-to-your-amazon-msk-cluster-from-different-vpcs-and-aws-accounts/

You can now use Amazon Managed Streaming for Apache Kafka (Amazon MSK) multi-VPC private connectivity (powered by AWS PrivateLink) and cluster policy support for MSK clusters to simplify connectivity of your Kafka clients to your brokers. Amazon MSK is a fully managed service that makes it easy for you to build and run applications that use Kafka to process streaming data. When you create an MSK cluster, the cluster resources are available to clients within the same Amazon VPC. This allows you to launch the cluster within specific subnets of the VPC, associate it with security groups, and attach IP addresses from your VPC’s address space through elastic network interfaces (ENIs). Network traffic between clients and the cluster stays within the AWS network, with internet access to the cluster not possible by default.

If you have workloads segmented across several VPCs and AWS accounts, there may be scenarios in which you need to make your MSK brokers accessible to Kafka clients across VPCs. With the launch of Amazon MSK multi-VPC private connectivity, you can now privately access your MSK brokers from your client applications in another VPC within the same AWS account or another AWS account without enabling public access or creating and managing your own networking infrastructure for private connectivity. A cluster policy is an AWS Identity and Access Management (IAM) resource-based policy, which is defined for your MSK cluster to provide cross-account IAM principals permissions to set up private connectivity to the cluster.

This post introduces Amazon MSK multi-VPC connectivity and how you can privately access your MSK clusters from your clients in other VPCs. It also shows how to define a cluster policy for your MSK clusters. These new two capabilities simplify configuring cross-VPC network access and setting up permissions needed for Kafka clients to privately connect to MSK brokers in a different account.

Before Amazon MSK multi-VPC connectivity

Before Amazon MSK multi-VPC connectivity, the network admin needed to choose one of the following secure connectivity patterns. Admins had to repeat certain steps for each broker in the cluster.

  • Amazon VPC peering is the simplest networking construct that enables bidirectional connectivity between two VPCs. In this approach, the network admin had to update each VPC with the IP addresses of each broker in the routing tables of all subnets. You can’t use this connectivity pattern when there are overlapping IPv4 or IPv6 CIDR blocks in the VPCs.
  • AWS Transit Gateway provides a highly available and scalable design for connecting VPCs. In this approach, the network admin constantly had to update the routing tables attached to each transit gateway. Unlike VPC peering that can go cross-Region, AWS Transit Gateway is a regional service, but you can use inter-Region peering between transit gateways to route traffic across regions. AWS Transit Gateway has the maximum bandwidth (burst) per Availability Zone per VPC connection (50 Gbps). This could become a challenge for some workloads.
  • AWS PrivateLink is an AWS networking service that provides private access to a specific service instead of all resources within a VPC and without traversing the public internet. It also eliminates the need to expose the entire VPC or subnet, and prevents issues like having to deal with overlapping CIDR blocks between the VPC that hosts the MSK cluster ENIs and the Kafka client VPC. AWS PrivateLink can scale to an unlimited number of VPCs and unlike the other options, traffic here is unidirectional. Because of these benefits, AWS PrivateLink is a popular choice to manage private connectivity. However, this connectivity pattern comes with additional complexity. It requires creating multiple Network Load Balancers (NLBs) per cluster and creating private service endpoints per NLB in the service account. Furthermore, admins had to create private endpoints per private service endpoint, and an Amazon Route 53 alias record per private endpoint in every client account.

The following diagram illustrates the architecture of customer-managed VPC endpoints between different VPCs in different AWS accounts with IAM authentication.

Before multi-vpc connectivity

After Amazon MSK multi-VPC connectivity and cluster policy

You can now enable multi-VPC and cross-account connectivity for your MSK clusters in a few simple steps and pay for what you use. This eliminates the overhead of creating and managing AWS PrivateLink infrastructure. When new brokers are added to a cluster, private connectivity is maintained without the need to make configuration changes, saving you from the overhead and complexity of managing the underlying network infrastructure.

The following diagram illustrates this updated architecture of using Amazon MSK multi-VPC connectivity to connect a client from a different AWS account.

after multi-vpc connectivity

Solution overview

Establishing multi-VPC private connectivity involves turning on this feature for the cluster and configuring the Kafka clients to connect privately to the cluster.

The following are the high-level steps to configure the cluster:

  1. Enable the multi-VPC private connectivity feature for a subset of authentication schemes that are enabled for your MSK cluster.
  2. If a Kafka client is in an AWS account that is different than the cluster, attach a resource-based policy to the MSK cluster to authorize IAM principals for creating cross-account connectivity.
  3. Share the cluster ARN with the IAM principal associated with the Kafka client that needs to create the cross-account access to MSK cluster.

The following are the high-level steps to configure the clients:

  1. Create a managed VPC endpoint for the client VPC that needs to connect privately to the MSK cluster.
  2. Update the VPC endpoint’s security group settings to enable outbound connectivity to the MSK cluster.
  3. Set up the client to use the cluster’s connection string to connect privately to the cluster.

Cluster setup

In this post, we only show the steps for enabling Amazon MSK multi-VPC connectivity for a provisioned cluster.

  1. To enable Amazon MSK multi-VPC connectivity on your existing cluster, choose Turn on multi-VPC connectivity on the Amazon MSK console.
    turn on multi-vpc connectivity
    Note that multi-VPC connectivity cannot be turned on with a cluster that allows unauthenticated access. This is to prevent unauthenticated access from different VPCs.
  2. Select the authentication methods that you allow clients in other VPCs to use.
    The list of authentication methods is populated based on your cluster’s security configuration.
  3. Review the settings and choose Turn on selection. After the multi-VPC connectivity is enabled on your cluster, Amazon MSK will create the NLB and VPC endpoint service infrastructure required for private connectivity. Amazon MSK will vend a new set of bootstrap broker strings that can be used for private connectivity. These can be accessed using the View client info option on the Amazon MSK console. The next step is to provide the IAM principals associated with your clients the permissions to connect privately to your cluster. To do this, you need to attach a cluster policy to the cluster. Turn on selection
  4. Choose Edit cluster policy in the Security section of the cluster details page on the Amazon MSK console.
    The new cluster policy allows for defining a Basic or Advanced cluster policy. With the Basic option, you can simply enter AWS account IDs of your client’s VPCs. This policy allows all allowed principals in those AWS accounts to perform CreateVPCConnection, GetBootstrapBrokers, DescribeCluster, and DescribeClusterV2 actions that are required for creating the cross-VPC connectivity to your cluster. However, in other cases, you may need a more complex policy that allows for additional actions, or principals other than AWS accounts, such as IAM roles, role sessions, IAM users, and more. You can author a cluster policy according to IAM JSON policy guidance and provide that to the cluster in Advanced mode.
  5. Define your cluster policy and choose Save changes.cluster policy

Client setup

On the client side, first you need to attach an identity policy to the IAM principal who wants to create a managed VPC connection. The identity policy must provide permission for creating a managed VPC connection. The necessary permissions are part of the AWS managed policy AmazonMSKFullAccess.

  1. In the other AWS account with the IAM principal you configured, use the new Managed VPC connection page on the Amazon MSK console to create Amazon MSK managed VPC connections.
    A managed VPC connection maps to an AWS PrivateLink endpoint under the hood, and Amazon MSK uses the managed VPC connection to orchestrate private connectivity to the cluster. You simply need to create the managed VPC connection and pay standard AWS PrivateLink charges for the underlying endpoint.Create a connection
  2. Enter the AWS Resource Name (ARN) of the cluster that you want to connect to.
  3. Choose Verify to verify the cluster information and its minimum requirements for cross-connectivity.
  4. Select an authentication method from the provided values.
  5. Choose the VPC ID where your Kafka clients are located, and choose their subnet IDs. You can add more subnets using the Add subnet option.
    The specified client subnet must have Availability Zone IDs that match the cluster’s Availability Zone IDs. This makes sure the clients are located in a same physical Availability Zone as the cluster brokers. Amazon MSK uses the port range 14001:14100 for all authentication methods. You need to select a security group that allows outbound traffic to this port. The following screenshot shows an example.
  6. Review the settings and choose Create connection.Review and create a connection
    The process will take a few minutes.
  7. When it’s complete, you can obtain the clients’ connection string from the details page of your connection.
  8. The next step is to update the outbound rules for the VPC endpoint security group to allow communication to the port range 14001:14100.client setup review

Use the Amazon MSK-managed VPC connection

After you create the managed VPC connection, connecting privately to the cluster is easy. Simply use the new connection string to connect to the cluster. For example, you may connect from an Amazon Elastic Compute Cloud (Amazon EC2) instance on your client VPC. Then run the following command to verify if you can connect and perform actions against the topics in the MSK cluster:

export MSK_VPC=<YOUR CLIENT CONNECTION STRING GOES HERE>
bin/kafka-topics.sh --bootstrap-server $MSK_VPC -command-config /home/ec2-user/kafka/config/client-config.properties –list

console results

IAM authentication

Before the launch of Amazon MSK multi-VPC connectivity, Kafka clients in other AWS accounts who opted in IAM authentication, needed to assume another IAM role in the cluster’s account. To facilitate this, admins had to create multiple IAM roles and write a trust policy that allows authenticated principals from the client’s accounts to assume corresponding roles through the sts:AssumeRole API call. This approach was challenging to scale when the number of VPCs or AWS accounts grew. With the launch of this cluster policy, cross-account access control is now simplified because you can attach a cluster policy to your clusters to specify which cross-account clients have what permissions on resources within the cluster.

This capability allows you to manage all access to the cluster and topics in one place. For example, you can control which IAM principals have write access to certain topics, and which principals can only read from them. Users who are using IAM client authentication can also add permissions for required kafka-cluster actions in the cluster resource policy.

Availability and pricing

You can now use Amazon MSK multi-VPC connectivity in all commercial Regions where Amazon MSK is offered, including China and GovCloud (US) Regions.

You pay $0.006 per GB data processed for private connectivity and $0.0225 per private connectivity hour per authentication scheme in US East (Ohio). Refer to our Pricing page for more details.

Conclusion

With Amazon MSK multi-VPC private connectivity, you can now privately access your MSK brokers from your client applications in another VPC within the same AWS account or another AWS account, with minimal configuration. You no longer have to create, manage, and update multiple networking resources in multiple VPCs, or make Amazon MSK configuration changes to connect your Kafka clients across VPCs and accounts. Amazon MSK creates and manages the resources for you. With Cluster policy support, you can easily provide your cross-account client principals permissions to connect privately to your MSK cluster. Further, if you are using IAM client authentication, you can also leverage the cluster policy to centrally control clients’ permissions to perform operations on the cluster. Use the Amazon MSK multi-VPC connectivity and the cluster policy feature today to simplify your secure connectivity infrastructure.

For further reading on Amazon MSK, visit the official product page and our AWS Documentation.


About the authors

Ali Alemi is a Streaming Specialist Solutions Architect at AWS. Ali advises AWS customers with architectural best practices and helps them design real-time analytics data systems that are reliable, secure, efficient, and cost-effective. He works backward from customers’ use cases and designs data solutions to solve their business problems. Prior to joining AWS, Ali supported several public sector customers and AWS consulting partners in their application modernization journey and migration to the cloud.

Rajeev Chakrabarti is a Kinesis specialist solutions architect.

Use the Amazon Redshift Data API to interact with Amazon Redshift Serverless

Post Syndicated from Debu Panda original https://aws.amazon.com/blogs/big-data/use-the-amazon-redshift-data-api-to-interact-with-amazon-redshift-serverless/

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing ETL (extract, transform, and load), business intelligence (BI), and reporting tools. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads such as BI, predictive analytics, and real-time streaming analytics. Amazon Redshift Serverless makes it convenient for you to run and scale analytics without having to provision and manage data warehouses. With Redshift Serverless, data analysts, developers, and data scientists can now use Amazon Redshift to get insights from data in seconds by loading data into and querying records from the data warehouse.

As a data engineer or application developer, for some use cases, you want to interact with the Redshift Serverless data warehouse to load or query data with a simple API endpoint without having to manage persistent connections. With the Amazon Redshift Data API, you can interact with Redshift Serverless without having to configure JDBC or ODBC. This makes it easier and more secure to work with Redshift Serverless and opens up new use cases.

This post explains how to use the Data API with Redshift Serverless from the AWS Command Line Interface (AWS CLI) and Python. If you want to use the Data API with Amazon Redshift clusters, refer to Using the Amazon Redshift Data API to interact with Amazon Redshift clusters.

Introducing the Data API

The Data API enables you to seamlessly access data from Redshift Serverless with all types of traditional, cloud-native, and containerized serverless web service-based applications and event-driven applications.

The following diagram illustrates this architecture.

The Data API simplifies data access, ingest, and egress from programming languages and platforms supported by the AWS SDK such as Python, Go, Java, Node.js, PHP, Ruby, and C++.

The Data API simplifies access to Amazon Redshift by eliminating the need for configuring drivers and managing database connections. Instead, you can run SQL commands to Redshift Serverless by simply calling a secured API endpoint provided by the Data API. The Data API takes care of managing database connections and buffering data. The Data API is asynchronous, so you can retrieve your results later. Your query results are stored for 24 hours. The Data API federates AWS Identity and Access Management (IAM) credentials so you can use identity providers like Okta or Azure Active Directory or database credentials stored in Secrets Manager without passing database credentials in API calls.

For customers using AWS Lambda, the Data API provides a secure way to access your database without the additional overhead for Lambda functions to be launched in an Amazon VPC. Integration with the AWS SDK provides a programmatic interface to run SQL statements and retrieve results asynchronously.

Relevant use cases

The Data API is not a replacement for JDBC and ODBC drivers, and is suitable for use cases where you don’t need a persistent connection to a serverless data warehouse. It’s applicable in the following use cases:

  • Accessing Amazon Redshift from custom applications with any programming language supported by the AWS SDK. This enables you to integrate web service-based applications to access data from Amazon Redshift using an API to run SQL statements. For example, you can run SQL from JavaScript.
  • Building a serverless data processing workflow.
  • Designing asynchronous web dashboards because the Data API lets you run long-running queries without having to wait for them to complete.
  • Running your query one time and retrieving the results multiple times without having to run the query again within 24 hours.
  • Building your ETL pipelines with AWS Step Functions, Lambda, and stored procedures.
  • Having simplified access to Amazon Redshift from Amazon SageMaker and Jupyter notebooks.
  • Building event-driven applications with Amazon EventBridge and Lambda.
  • Scheduling SQL scripts to simplify data load, unload, and refresh of materialized views.

The Data API GitHub repository provides examples for different use cases for both Redshift Serverless and provisioned clusters.

Create a Redshift Serverless workgroup

If you haven’t already created a Redshift Serverless data warehouse, or want to create a new one, refer to the Getting Started Guide. This guide walks you through the steps of creating a namespace and workgroup with their names as default. Also, ensure that you have created an IAM role and make sure that the IAM role you attach to your Redshift Serverless namespace has AmazonS3ReadOnlyAccess permission. You can use the AWS Management Console to create an IAM role and assign Amazon Simple Storage Service (Amazon S3) privileges (refer to Loading in data from Amazon S3). In this post, we create a table and load data using the COPY command.

Prerequisites for using the Data API

You must be authorized to access the Data API. Amazon Redshift provides the RedshiftDataFullAccess managed policy, which offers full access to Data API. This policy also allows access to Redshift Serverless workgroups, Secrets Manager, and API operations needed to authenticate and access a Redshift Serverless workgroup by using IAM credentials.

You can also create your own IAM policy that allows access to specific resources by starting with RedshiftDataFullAccess as a template.

The Data API allows you to access your database either using your IAM credentials or secrets stored in Secrets Manager. In this post, we use IAM credentials.

When you federate your IAM credentials to connect with Amazon Redshift, it automatically creates a database user for the IAM user that is being used. It uses the GetCredentials API to get temporary database credentials. If you want to provide specific database privileges to your users with this API, you can use an IAM role with the tag name RedshiftDBRoles with a list of roles separated by colons. For example, if you want to assign database roles such as sales and analyst, you can have a value sales:analyst assigned to RedshiftDBRoles.

Use the Data API from the AWS CLI

You can use the Data API from the AWS CLI to interact with the Redshift Serverless workgroup and namespace. For instructions on configuring the AWS CLI, see Setting up the AWS CLI. The Amazon Redshift Serverless CLI (aws redshift-serverless) is a part of AWS CLI that lets you manage Amazon Redshift workgroups and namespaces, such as creating, deleting, setting usage limits, tagging resource, and more. The Data API provides a command line interface to the AWS CLI (aws redshift-data) that allows you to interact with the databases in Redshift Serverless.

You can invoke help using the following command:

aws redshift-data help

The following table shows you the different commands available with the Data API CLI.

Command Description
list-databases Lists the databases in a workgroup.
list-schemas Lists the schemas in a database. You can filter this by a matching schema pattern.
list-tables Lists the tables in a database. You can filter the tables list by a schema name pattern, a matching table name pattern, or a combination of both.
describe-table Describes the detailed information about a table including column metadata.
execute-statement Runs a SQL statement, which can be SELECT, DML, DDL, COPY, or UNLOAD.
batch-execute-statement Runs multiple SQL statements in a batch as a part of single transaction. The statements can be SELECT, DML, DDL, COPY, or UNLOAD.
cancel-statement Cancels a running query. To be canceled, a query must not be in the FINISHED or FAILED state.
describe-statement Describes the details of a specific SQL statement run. The information includes when the query started, when it finished, the number of rows processed, and the SQL statement.
list-statements Lists the SQL statements in the last 24 hours. By default, only finished statements are shown.
get-statement-result Fetches the temporarily cached result of the query. The result set contains the complete result set and the column metadata. You can paginate through a set of records to retrieve the entire result as needed.

If you want to get help on a specific command, run the following command:

aws redshift-data list-tables help

Now we look at how you can use these commands.

List databases

Most organizations use a single database in their Amazon Redshift workgroup. You can use the following command to list the databases in your Serverless endpoint. This operation requires you to connect to a database and therefore requires database credentials.

aws redshift-data list-databases --database dev --workgroup-name default

List schemas

Similar to listing databases, you can list your schemas by using the list-schemas command:

aws redshift-data list-schemas --database dev --workgroup-name default

If you have several schemas that match demo (demo, demo2, demo3, and so on), you can optionally provide a pattern to filter your results matching to that pattern:

aws redshift-data list-schemas --database dev --workgroup-name default --schema-pattern "demo%"

List tables

The Data API provides a simple command, list-tables, to list tables in your database. You might have thousands of tables in a schema; the Data API lets you paginate your result set or filter the table list by providing filter conditions.

You can search across your schema with table-pattern; for example, you can filter the table list by a table name prefix across all your schemas in the database or filter your tables list in a specific schema pattern by using schema-pattern.

The following is a code example that uses both:

aws redshift-data list-tables --database dev --workgroup-name default --schema-pattern "demo%" --table-pattern “orders%”

Run SQL commands

You can run SELECT, DML, DDL, COPY, or UNLOAD commands for Amazon Redshift with the Data API. You can optionally specify the –with-event option if you want to send an event to EventBridge after the query run, then the Data API will send the event with queryId and final run status.

Create a schema

Let’s use the Data API to see how you can create a schema. The following command lets you create a schema in your database. You don’t have to run this SQL if you have pre-created the schema. You have to specify –-sql to specify your SQL commands.

aws redshift-data execute-statement --database dev --workgroup-name default \
--sql "CREATE SCHEMA demo;"

The following shows an example output of execute-statement:

{
    "CreatedAt": "2023-04-07T17:14:43.038000+00:00",
    "Database": "dev",
    "DbUser": "IAMR:Admin",
    "Id": "8e4e5af3-9af9-4567-8e70-7849515b3a79",
    "WorkgroupName": "default"
}

We discuss later in this post how you can check the status of a SQL that you ran with execute-statement.

Create a table

You can use the following command to create a table with the CLI:

aws redshift-data execute-statement --database dev --workgroup-name default  \
   --sql "CREATE TABLE demo.green_201601( \
  vendorid                VARCHAR(4), \
  pickup_datetime         TIMESTAMP, \
  dropoff_datetime        TIMESTAMP, \
  store_and_fwd_flag      VARCHAR(1), \
  ratecode                INT, \
  pickup_longitude        FLOAT4, \
  pickup_latitude         FLOAT4, \
  dropoff_longitude       FLOAT4, \
  dropoff_latitude        FLOAT4, \
  passenger_count         INT, \
  trip_distance           FLOAT4, \
  fare_amount             FLOAT4, \
  extra                   FLOAT4, \
  mta_tax                 FLOAT4, \
  tip_amount              FLOAT4, \
  tolls_amount            FLOAT4, \
  ehail_fee               FLOAT4, \
  improvement_surcharge   FLOAT4, \
  total_amount            FLOAT4, \
  payment_type            VARCHAR(4),\
  trip_type               VARCHAR(4));" 

Load sample data

The COPY command lets you load bulk data into your table in Amazon Redshift. You can use the following command to load data into the table we created earlier:

aws redshift-data execute-statement --database dev --workgroup-name default --sql "COPY demo.green_201601 \
FROM 's3://us-west-2.serverless-analytics/NYC-Pub/green/green_tripdata_2016-01' \
IAM_ROLE default \
DATEFORMAT 'auto' \
IGNOREHEADER 1 \
DELIMITER ',' \
IGNOREBLANKLINES \
REGION 'us-west-2';" 

Retrieve data

The following query uses the table we created earlier:

aws redshift-data execute-statement --database dev --workgroup-name default --sql "SELECT ratecode,  \
COUNT(*) FROM demo.green_201601 WHERE \
trip_distance > 5 GROUP BY 1 ORDER BY 1;"

The following shows an example output:

{
    "CreatedAt": "2023-04-07T17:25:16.030000+00:00",
    "Database": "dev",
    "DbUser": "IAMR:Admin",
    "Id": "cae88c08-0bb4-4279-8845-d5a8fefafade",
    "WorkgroupName": "default"
}

You can fetch results using the statement ID that you receive as an output of execute-statement.

Check the status of a statement

You can check the status of your statement by using describe-statement. The output for describe-statement provides additional details such as PID, query duration, number of rows in and size of the result set, and the query ID given by Amazon Redshift. You have to specify the statement ID that you get when you run the execute-statement command. See the following command:

aws redshift-data describe-statement --id cae88c08-0bb4-4279-8845-d5a8fefafade \

The following is an example output:

{
     "CreatedAt": "2023-04-07T17:27:15.937000+00:00",
     "Duration": 2602410468,
     "HasResultSet": true,
     "Id": "cae88c08-0bb4-4279-8845-d5a8fefafade",
     "QueryString": " SELECT ratecode, COUNT(*) FROM 
     demo.green_201601 WHERE
     trip_distance > 5 GROUP BY 1 ORDER BY 1;",
     "RedshiftPid": 1073815670,
     "WorkgroupName": "default",
     "UpdatedAt": "2023-04-07T17:27:18.539000+00:00"
}

The status of a statement can be STARTED, FINISHED, ABORTED, or FAILED.

Run SQL statements with parameters

You can run SQL statements with parameters. The following example uses two named parameters in the SQL that is specified using a name-value pair:

aws redshift-data execute-statement --database dev --workgroup-name default --sql "select sellerid,sum(pricepaid) totalsales from sales where eventid >= :eventid and sellerid > :selrid group by sellerid"  --parameters "[{\"name\": \"selrid\", \"value\": \"100\"},{\"name\": \"eventid\", \"value\": \"100\"}]"

The describe-statement returns QueryParameters along with QueryString.

You can map the name-value pair in the parameters list to one or more parameters in the SQL text, and the name-value parameter can be in random order. You can’t specify a NULL value or zero-length value as a parameter.

Cancel a running statement

If your query is still running, you can use cancel-statement to cancel a SQL query. See the following command:

aws redshift-data cancel-statement --id 39a0de2f-e85e-45ff-a0d7-cd074c348120

Fetch results from your query

You can fetch the query results by using get-statement-result. The query result is stored for 24 hours. See the following command:

aws redshift-data get-statement-result --id 7b61da88-1b11-4ade-956a-21085a29118d

The output of the result contains metadata such as the number of records fetched, column metadata, and a token for pagination.

Run multiple SQL statements

You can run multiple SELECT, DML, DDL, COPY, or UNLOAD commands for Amazon Redshift in a single transaction with the Data API. The batch-execute-statement enables you to create tables and run multiple COPY commands or create temporary tables as part of your reporting system and run queries on that temporary table. See the following code:

aws redshift-data batch-execute-statement --database dev --workgroup-name default \
--sqls "create temporary table mysales \
(firstname, lastname, total_quantity ) as \
SELECT firstname, lastname, total_quantity \
FROM   (SELECT buyerid, sum(qtysold) total_quantity \
        FROM  sales  \
        GROUP BY buyerid \
        ORDER BY total_quantity desc limit 10) Q, users \
WHERE Q.buyerid = userid \ 
ORDER BY Q.total_quantity desc;" "select * from mysales limit 100;"

The describe-statement for a multi-statement query shows the status of all sub-statements:

{

{
"CreatedAt": "2023-04-10T14:01:11.257000-07:00",
"Duration": 30564173,
"HasResultSet": true,
"Id": "23d99d7f-fd13-4686-92c8-e2c279715c21",
"RedshiftPid": 1073922185,
"RedshiftQueryId": 0,
"ResultRows": -1,
"ResultSize": -1,
"Status": "FINISHED",
"SubStatements": [
{
"CreatedAt": "2023-04-10T14:01:11.357000-07:00",
"Duration": 12779028,
"HasResultSet": false,
"Id": "23d99d7f-fd13-4686-92c8-e2c279715c21:1",
"QueryString": "create temporary table mysales (firstname, lastname,
total_quantity ) as \nSELECT firstname, lastname, total_quantity \nFROM (SELECT
buyerid, sum(qtysold) total_quantity\nFROM sales\nGROUP BY
buyerid\nORDER BY total_quantity desc limit 10) Q, users\nWHERE Q.buyerid =
userid\nORDER BY Q.total_quantity desc;",
"RedshiftQueryId": 0,
"ResultRows": 0,
"ResultSize": 0,
"Status": "FINISHED",
"UpdatedAt": "2023-04-10T14:01:11.807000-07:00"
},
{
"CreatedAt": "2023-04-10T14:01:11.357000-07:00",
"Duration": 17785145,
"HasResultSet": true,
"Id": "23d99d7f-fd13-4686-92c8-e2c279715c21:2",
"QueryString": ""select *\nfrom mysales limit 100;",
"RedshiftQueryId": 0,
"ResultRows": 40,
"ResultSize": 1276,
"Status": "FINISHED",
"UpdatedAt": "2023-04-10T14:01:11.911000-07:00"
}
],
"UpdatedAt": "2023-04-10T14:01:11.970000-07:00",
"WorkgroupName": "default"
}

In the preceding example, we had two SQL statements and therefore the output includes the ID for the SQL statements as 23d99d7f-fd13-4686-92c8-e2c279715c21:1 and 23d99d7f-fd13-4686-92c8-e2c279715c21:2. Each sub-statement of a batch SQL statement has a status, and the status of the batch statement is updated with the status of the last sub-statement. For example, if the last statement has status FAILED, then the status of the batch statement shows as FAILED.

You can fetch query results for each statement separately. In our example, the first statement is a SQL statement to create a temporary table, so there are no results to retrieve for the first statement. You can retrieve the result set for the second statement by providing the statement ID for the sub-statement:

aws redshift-data get-statement-result --id 23d99d7f-fd13-4686-92c8-e2c279715c21:2

Use the Data API with Secrets Manager

The Data API allows you to use database credentials stored in Secrets Manager. You can create a secret type as Other type of secret and then specify username and password. Note you can’t choose an Amazon Redshift cluster because Redshift Serverless is different than a cluster.

Let’s assume that you created a secret key for your credentials as defaultWG. You can use the secret-arn parameter to pass your secret key as follows:

aws redshift-data list-tables --database dev --workgroup-name default --secret-arn defaultWG --region us-west-1

Export the data

Amazon Redshift allows you to export from database tables to a set of files in an S3 bucket by using the UNLOAD command with a SELECT statement. You can unload data in either text or Parquet format. The following command shows you an example of how to use the data lake export with the Data API:

aws redshift-data execute-statement --database dev --workgroup-name default --sql "unload ('select * from demo.green_201601') to '<your-S3-bucket>' iam_role '<your-iam-role>'; " 

You can use batch-execute-statement if you want to use multiple statements with UNLOAD or combine UNLOAD with other SQL statements.

Use the Data API from the AWS SDK

You can use the Data API in any of the programming languages supported by the AWS SDK. For this post, we use the AWS SDK for Python (Boto3) as an example to illustrate the capabilities of the Data API.

We first import the Boto3 package and establish a session:

import botocore.session as bc
import boto3

def get_client(service, endpoint=None, region="us-west-2"):
    session = bc.get_session()
    s = boto3.Session(botocore_session=session, region_name=region)
    if endpoint:
        return s.client(service, endpoint_url=endpoint)
    return s.client(service)

Get a client object

You can create a client object from the boto3.Session object and using RedshiftData:

rsd = get_client('redshift-data')

If you don’t want to create a session, your client is as simple as the following code:

import boto3
client = boto3.client('redshift-data')

Run a statement

The following example code uses the Secrets Manager key to run a statement. For this post, we use the table we created earlier. You can use DDL, DML, COPY, and UNLOAD in the SQL parameter:

resp = rsd.execute_statement(
    WorkgroupName ="default",
Database = "dev",
Sql = "SELECT ratecode, COUNT(*) totalrides FROM demo.green_201601 WHERE trip_distance > 5 GROUP BY 1 ORDER BY 1;" 
)

As we discussed earlier, running a query is asynchronous; running a statement returns an ExecuteStatementOutput, which includes the statement ID.

If you want to publish an event to EventBridge when the statement is complete, you can use the additional parameter WithEvent set to true:

resp = rsd.execute_statement(
    Database="dev",
    WorkgroupName="default",
    Sql="SELECT ratecode, COUNT(*) totalrides FROM demo.green_201601 WHERE trip_distance > 5 GROUP BY 1 ORDER BY 1;",
WithEvent=True
)

Describe a statement

You can use describe_statement to find the status of the query and number of records retrieved:

id=resp['Id']
desc = rsd.describe_statement(Id=id)
if desc["Status"] == "FINISHED":
    print(desc["ResultRows"])

Fetch results from your query

You can use get_statement_result to retrieve results for your query if your query is complete:

if desc and desc["ResultRows"]  > 0:
    result = rsd.get_statement_result(Id=qid)

The get_statement_result command returns a JSON object that includes metadata for the result and the actual result set. You might need to process the data to format the result if you want to display it in a user-friendly format.

Fetch and format results

For this post, we demonstrate how to format the results with the Pandas framework. The post_process function processes the metadata and results to populate a DataFrame. The query function retrieves the result from a database in an Amazon Redshift cluster. See the following code:

import pandas as pd

def post_process(meta, records):
    columns = [k["name"] for k in meta]
    rows = []
    for r in records:
        tmp = []
        for c in r:
            tmp.append(c[list(c.keys())[0]])
        rows.append(tmp)
    return pd.DataFrame(rows, columns=columns)

def query(sql, workgroup="default ", database="dev"):
    resp = rsd.execute_statement(
        Database=database,
        WorkgroupName=workgroup,
        Sql=sql
    )
    qid = resp["Id"]
    print(qid)
    desc = None
    while True:
        desc = rsd.describe_statement(Id=qid)
        if desc["Status"] == "FINISHED" or desc["Status"] == "FAILED":
            break
    	print(desc["ResultRows"])
    if desc and desc["ResultRows"]  > 0:
        result = rsd.get_statement_result(Id=qid)
        rows, meta = result["Records"], result["ColumnMetadata"]
        return post_process(meta, rows)

pf=query("select * from demo.customer_activity limit 100;")
print(pf)

In this post, we demonstrated using the Data API with Python with Redshift Serverless. However, you can use the Data API with other programming languages supported by the AWS SDK. You can read how Roche democratized access to Amazon Redshift data using the Data API with Google Sheets. You can also address this type of use case with Redshift Serverless.

Best practices

We recommend the following best practices when using the Data API:

  • Federate your IAM credentials to the database to connect with Amazon Redshift. Redshift Serverless allows users to get temporary database credentials with GetCredentials. Redshift Serverless scopes the access to the specific IAM user and the database user is automatically created.
  • Use a custom policy to provide fine-grained access to the Data API in the production environment if you don’t want your users to use temporary credentials. You have to use Secrets Manager to manage your credentials in such use cases.
  • Don’t retrieve a large amount of data from your client and use the UNLOAD command to export the query results to Amazon S3. You’re limited to retrieving only 100 MB of data with the Data API.
  • Don’t forget to retrieve your results within 24 hours; results are stored only for 24 hours.

Conclusion

In this post, we introduced how to use the Data API with Redshift Serverless. We also demonstrated how to use the Data API from the Amazon Redshift CLI and Python using the AWS SDK. Additionally, we discussed best practices for using the Data API.

To learn more, refer to Using the Amazon Redshift Data API or visit the Data API GitHub repository for code examples.


About the authors

Debu Panda is a Senior Manager, Product Management at AWS, is an industry leader in analytics, application platform, and database technologies, and has more than 25 years of experience in the IT world. Debu has published numerous articles on analytics, enterprise Java, and databases and has presented at multiple conferences such as re:Invent, Oracle Open World, and Java One. He is lead author of the EJB 3 in Action (Manning Publications 2007, 2014) and Middleware Management (Packt).

Fei Peng is a Software Dev Engineer working in the Amazon Redshift team.

How Novo Nordisk built distributed data governance and control at scale

Post Syndicated from Jonatan Selsing original https://aws.amazon.com/blogs/big-data/how-novo-nordisk-built-distributed-data-governance-and-control-at-scale/

This is a guest post co-written with Jonatan Selsing and Moses Arthur from Novo Nordisk.

This is the second post of a three-part series detailing how Novo Nordisk, a large pharmaceutical enterprise, partnered with AWS Professional Services to build a scalable and secure data and analytics platform. The first post of this series describes the overall architecture and how Novo Nordisk built a decentralized data mesh architecture, including Amazon Athena as the data query engine. The third post will show how end-users can consume data from their tool of choice, without compromising data governance. This will include how to configure Okta, AWS Lake Formation, and a business intelligence tool to enable SAML-based federated use of Athena for an enterprise BI activity.

When building a scalable data architecture on AWS, giving autonomy and ownership to the data domains are crucial for the success of the platform. By providing the right mix of freedom and control to those people with the business domain knowledge, your business can maximize value from the data as quickly and effectively as possible. The challenge facing organizations, however, is how to provide the right balance between freedom and control. At the same time, data is a strategic asset that needs to be protected with the highest degree of rigor. How can organizations strike the right balance between freedom and control?

In this post, you will learn how to build decentralized governance with Lake Formation and AWS Identity and Access Management (IAM) using attribute-based access control (ABAC). We discuss some of the patterns we use, including Amazon Cognito identity pool federation using ABAC in permission policies, and Okta-based SAML federation with ABAC enforcement on role trust policies.

Solution overview

In the first post of this series, we explained how Novo Nordisk and AWS Professional Services built a modern data architecture based on data mesh tenets. This architecture enables data governance on distributed data domains, using an end-to-end solution to create data products and providing federated data access control. This post dives into three elements of the solution:

  • How IAM roles and Lake Formation are used to manage data access across data domains
  • How data access control is enforced at scale, using a group membership mapping with an ABAC pattern
  • How the system maintains state across the different layers, so that the ecosystem of trust is configured appropriately

From the end-user perspective, the objective of the mechanisms described in this post is to enable simplified data access from the different analytics services adopted by Novo Nordisk, such as those provided by software as a service (SaaS) vendors like Databricks, or self-hosted ones such as JupyterHub. At the same time, the platform must guarantee that any change in a dataset is immediately reflected at the service user interface. The following figure illustrates at a high level the expected behavior.

High-level data platform expected behavior

Following the layer nomenclature established in the first post, the services are created and managed in the consumption layer. The domain accounts are created and managed in the data management layer. Because changes can occur from both layers, continuous communication in both directions is required. The state information is kept in the virtualization layer along with the communication protocols. Additionally, at sign-in time, the services need information about data resources required to provide data access abstraction.

Managing data access

The data access control in this architecture is designed around the core principle that all access is encapsulated in isolated IAM role sessions. The layer pattern that we described in the first post ensures that the creation and curation of the IAM role policies involved can be delegated to the different data management ecosystems. Each data management platform integrated can use their own data access mechanisms, with the unique requirement that the data is accessed via specific IAM roles.

To illustrate the potential mechanisms that can be used by data management solutions, we show two examples of data access permission mechanisms used by two different data management solutions. Both systems utilize the same trust policies as described in the following sections, but have a completely different permission space.

Example 1: Identity-based ABAC policies

The first mechanism we discuss is an ABAC role that provides access to a home-like data storage area, where users can share within their departments and with the wider organization in a structure that mimics the organizational structure. Here, we don’t utilize the group names, but instead forward user attributes from the corporate Active Directory directly into the permission policy through claim overrides. We do this by having the corporate Active Directory as the identity provider (IdP) for the Amazon Cognito user pool and mapping the relevant IdP attributes to user pool attributes. Then, in the Amazon Cognito identity pool, we map the user pool attributes to session tags to use them for access control. Custom overrides can be included in the claim mapping, through the use of a pre token generation Lambda trigger. This way, claims from AD can be mapped to Amazon Cognito user pool attributes and then ultimately used in the Amazon Cognito identity pool to control IAM role permissions. The following is an example of an IAM policy with sessions tags:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Condition": {
                "StringLike": {
                    "s3:prefix": [
                        "",
                        "public/",
                        "public/*",
                        "home/",
                        "home/${aws:PrincipalTag/initials}/*",
                        "home/${aws:PrincipalTag/department}/*"
                    ]
                }
            },
            "Action": "s3:ListBucket",
            "Resource": [
                "arn:aws:s3:::your-home-bucket"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:GetObject*",
                "s3:PutObject*",
                "s3:DeleteObject*"
            ],
            "Resource": [
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/initials}",
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/initials}/*",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/initials}",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/initials}/*",
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/department}",
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/department}/*",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/department}",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/department}/*"
            ],
            "Effect": "Allow"
        },
        {
            "Action": "s3:GetObject*",
            "Resource": [
                "arn:aws:s3:::your-home-bucket/public/",
                "arn:aws:s3:::your-home-bucket/public/*"
            ],
            "Effect": "Allow"
        }
    ]
}

This role is then embedded in the analytics layer (together with the data domain roles) and assumed on behalf of the user. This enables users to mix and match between data domains—as well as utilizing private and public data paths that aren’t necessarily tied to any data domain. For more examples of how ABAC can be used with permission policies, refer to How to scale your authorization needs by using attribute-based access control with S3.

Example 2: Lake Formation name-based access controls

In the data management solution named Novo Nordisk Enterprise Datahub (NNEDH), which we introduced in the first post, we use Lake Formation to enable standardized data access. The NNEDH datasets are registered in the Lake Formation Data Catalog as databases and tables, and permissions are granted using the named resource method. The following screenshot shows an example of these permissions.

Lakeformation named resource method for permissions management

In this approach, data access governance is delegated to Lake Formation. Every data domain in NNEDH has isolated permissions synthesized by NNEDH as the central governance management layer. This is a similar pattern to what is adopted for other domain-oriented data management solutions. Refer to Use an event-driven architecture to build a data mesh on AWS for an example of tag-based access control in Lake Formation.

These patterns don’t exclude implementations of peer-to-peer type data sharing mechanisms, such as those that can be achieved using AWS Resource Access Manager (AWS RAM), where a single IAM role session can have permissions that span across accounts.

Delegating role access to the consumption later

The following figure illustrates the data access workflow from an external service.

Data access workflow from external service

The workflow steps are as follows:

  1. A user authenticates on an IdP used by the analytics tool that they are trying to access. A wide range of analytics tools are supported by Novo Nordisk platform, such as Databricks and JupyterHub, and the IdP can be either SAML or OIDC type depending on the capabilities of the third-party tool. In this example, an Okta SAML application is used to sign into a third-party analytics tool, and an IAM SAML IdP is configured in the data domain AWS account to federate with the external IdP. The third post of this series describes how to set up an Okta SAML application for IAM role federation on Athena.
  2. The SAML assertion obtained during the sign-in process is used to request temporary security credentials of an IAM role through the AssumeRole operation. In this example, the SAML assertion is used onAssumeRoleWithSAMLoperation. For OpenID Connect-compatible IdPs, the operationAssumeRoleWithWebIdentitymust be used with the JWT. The SAML attributes in the assertion or the claims in the token can be generated at sign-in time, to ensure that the group memberships are forwarded, for the ABAC policy pattern described in the following sections.
  3. The analytics tool, such as Databricks or JupyterHub, abstracts the usage of the IAM role session credentials in the tool itself, and data can be accessed directly according to the permissions of the IAM role assumed. This pattern is similar in nature to IAM passthrough as implemented by Databricks, but in Novo Nordisk it’s extended across all analytics services. In this example, the analytics tool accesses the data lake on Amazon Simple Storage Service (Amazon S3) through Athena queries.

As the data mesh pattern expands across domains covering more downstream services, we need a mechanism to keep IdPs and IAM role trusts continuously updated. We come back to this part later in the post, but first we explain how role access is managed at scale.

Attribute-based trust policies

In previous sections, we emphasized that this architecture relies on IAM roles for data access control. Each data management platform can implement its own data access control method using IAM roles, such as identity-based policies or Lake Formation access control. For data consumption, it’s crucial that these IAM roles are only assumable by users that are part of Active Directory groups with the appropriate entitlements to use the role. To implement this at scale, the IAM role’s trust policy uses ABAC.

When a user authenticates on the external IdP of the consumption layer, we add in the access token a claim derived from their Active Directory groups. This claim is propagated by theAssumeRoleoperation into the trust policy of the IAM role, where it is compared with the expected Active Directory group. Only users that belong to the expected groups can assume the role. This mechanism is illustrated in the following figure.

Architecture of the integration with the identity provider

Translating group membership to attributes

To enforce the group membership entitlement at the role assumption level, we need a way to compare the required group membership with the group memberships that a user comes with in their IAM role session. To achieve this, we use a form of ABAC, where we have a way to represent the sum of context-relevant group memberships in a single attribute. A single IAM role session tag value is limited to 256 characters. The corresponding limit for SAML assertions is 100,000 characters, so for systems where a very large number of either roles or group-type mappings are required, SAML can support a wider range of configurations.

In our case, we have opted for a compression algorithm that takes a group name and compresses it to a 4-character string hash. This means that, together with a group-separation character, we can fit 51 groups in a single attribute. This gets pushed down to approximately 20 groups for OIDC type role assumption due to the PackedPolicySize, but is higher for a SAML-based flow. This has shown to be sufficient for our case. There is a risk that two different groups could hash to the same character combination; however, we have checked that there are no collisions in the existing groups. To mitigate this risk going forward, we have introduced guardrails in multiples places. First, before adding new groups entitlements in the virtualization layer, we check if there’s a hash collision with any existing group. When a duplicated group is attempted to be added, our service team is notified and we can react accordingly. But as stated earlier, there is a low probability of clashes, so the flexibility this provides outweighs the overhead associated with managing clashes (we have not had any yet). We additionally enforce this at SAML assertion creation time as well, to ensure that there are no duplicated groups in the users group list, and in cases of duplication, we remove both entirely. This means malicious actors can at most limit the access of other users, but not gain unauthorized access.

Enforcing audit functionality across sessions

As mentioned in the first post, on top of governance, there are strict requirements around auditability of data accesses. This means that for all data access requests, it must be possible to trace the specific user across services and retain this information. We achieve this by setting (and enforcing) a source identity for all role sessions and make sure to propagate enterprise identity to this attribute. We use a combination of Okta inline hooks and SAML session tags to achieve this. This means that the AWS CloudTrail logs for an IAM role session have the following information:

{
    "eventName": "AssumeRoleWithSAML",
    "requestParameters": {
        "SAMLAssertionlD": "id1111111111111111111111111",
        "roleSessionName": "[email protected]",
        "principalTags": {
            "nn-initials": "user",
            "department": "NNDepartment",
            "GroupHash": "xxxx",
            "email": "[email protected]",
            "cost-center": "9999"
        },
        "sourceIdentity": "[email protected]",
        "roleArn": "arn:aws:iam::111111111111:role/your-assumed-role",
        "principalArn": "arn:aws:iam,111111111111:saml-provider/your-saml-provider",
        ...
    },
    ...
}

On the IAM role level, we can enforce the required attribute configuration with the following example trust policy. This is an example for a SAML-based app. We support the same patterns through OpenID Connect IdPs.

We now go through the elements of an IAM role trust policy, based on the following example:

{
    "Version": "2008-10-17",
    "Statement": {
        "Effect": "Allow",
        "Principal": {
            "Federated": [SAML_IdP_ARN]
        },
        "Action": [
            "sts:AssumeRoleWithSAML",
            "sts:TagSession",
            "sts:SetSourceIdentity"
        ],
        "Condition": {
            "StringEquals": {
                "SAML:aud": "https://signin.aws.amazon.com/saml"
            },
            "StringLike": {
                "sts:SourceIdentity": "*@novonordisk.com",
                "aws:RequestTag/GroupHash": ["*xxxx*"]
            },
            "StringNotLike": {
                "sts:SourceIdentity": "*"
            }
        }
    }
}

The policy contains the following details:

  • ThePrincipalstatement should point to the list of apps that are served through the consumption layer. These can be Azure app registrations, Okta apps, or Amazon Cognito app clients. This means that SAML assertions (in the case of SAML-based flows) minted from these applications can be used to run the operationAssumeRoleWithSamlif the remaining elements are also satisfied.
  • TheActionstatement includes the required permissions for theAssumeRolecall to succeed, including adding the contextual information to the role session.
  • In the first condition, the audience of the assertion needs to be targeting AWS.
  • In the second condition, there are twoStringLikerequirements:
    • A requirement on the source identity as the naming convention to follow at Novo Nordisk (users must come with enterprise identity, following our audit requirements).
    • Theaws:RequestTag/GroupHashneeds to bexxxx, which represents the hashed group name mentioned in the upper section.
  • Lastly, we enforce that sessions can’t be started without setting the source identity.

This policy enforces that all calls are from recognized services, include auditability, have the right target, and enforces that the user has the right group memberships.

Building a central overview of governance and trust

In this section, we discuss how Novo Nordisk keeps track of the relevant group-role relations and maps these at sign-in time.

Entitlements

In Novo Nordisk, all accesses are based on Active Directory group memberships. There is no user-based access. Because this pattern is so central, we have extended this access philosophy into our data accesses. As mentioned earlier, at sign-in time, the hooks need to be able to know which roles to assume for a given user, given this user’s group membership. We have modeled this data in Amazon DynamoDB, where just-in-time provisioning ensures that only the required user group memberships are available. By building our application around the use of groups, and by having the group propagation done by the application code, we avoid having to make a more general Active Directory integration, which would, for a company the size of Novo Nordisk, severely impact the application, simply due to the volume of users and groups.

The DynamoDB entitlement table contains all relevant information for all roles and services, including role ARNs and IdP ARNs. This means that when users log in to their analytics services, the sign-in hook can construct the required information for the Roles SAML attribute.

When new data domains are added to the data management layer, the data management layer needs to communicate both the role information and the group name that gives access to the role.

Single sign-on hub for analytics services

When scaling this permission model and data management pattern to a large enterprise such as Novo Nordisk, we ended up creating a large number of IAM roles distributed across different accounts. Then, a solution is required to map and provide access for end-users to the required IAM role. To simplify user access to multiple data sources and analytics tools, Novo Nordisk developed a single sign-on hub for analytics services. From the end-user perspective, this is a web interface that glues together different offerings in a unified system, making it a one-stop tool for data and analytics needs. When signing in to each of the analytical offerings, the authenticated sessions are forwarded, so users never have to reauthenticate.

Common for all the services supported in the consumption layer is that we can run a piece of application code at sign-in time, allowing sign-in time permissions to be calculated. The hooks that achieve this functionality can, for instance, be run by Okta inline hooks. This means that each of the target analytics services can have custom code to translate relevant contextual information or provide other types of automations for the role forwarding.

The sign-in flow is demonstrated in the following figure.

Sign-in flow

The workflow steps are as follows:

  1. A user accesses an analytical service such as Databricks in the Novo Nordisk analytics hub.
  2. The service uses Okta as the SAML-based IdP.
  3. Okta invokes an AWS Lambda-based SAML assertion inline hook.
  4. The hook uses the entitlement database, converting application-relevant group memberships into role entitlements.
  5. Relevant contextual information is returned from the entitlement database.
  6. The Lambda-based hook adds new SAML attributes to the SAML assertion, including the hashed group memberships and other contextual information such as source identity.
  7. A modified SAML assertion is used to sign users in to the analytical service.
  8. The user can now use the analytical tool with active IAM role sessions.

Synchronizing role trust

The preceding section gives an overview of how federation works in this solution. Now we can go through how we ensure that all participating AWS environments and accounts are in sync with the latest configuration.

From the end-user perspective, the synchronization mechanism must ensure that every analytics service instantiated can access the data domains assigned to the groups that the user belongs to. Also, changes in data domains—such as granting data access to an Active Directory group—must be effective immediately to every analytics service.

Two event-based mechanisms are used to maintain all the layers synchronized, as detailed in this section.

Synchronize data access control on the data management layer with changes to services in the consumption layer

As describe in the previous section, the IAM roles used for data access are created and managed by the data management layer. These IAM roles have a trust policy providing federated access to the external IdPs used by the analytics tools of the consumption layer. It implies that for every new analytical service created with a different IDP, the IAM roles used for data access on data domains must be updated to trust this new IdP.

Using NNEDH as an example of a data management solution, the synchronization mechanism is demonstrated in the following figure.

Synchronization mechanism in a data management solution

Taking as an example a scenario where a new analytics service is created, the steps in this workflow are as follows:

  1. A user with access to the administration console of the consumption layer instantiates a new analytics service, such as JupyterHub.
  2. A job running on AWS Fargate creates the resources needed for this new analytics service, such as an Amazon Elastic Compute Cloud (Amazon EC2) instance for JupyterHub, and the IdP required, such as a new SAML IdP.
  3. When the IdP is created in the previous step, an event is added in an Amazon Simple Notification Service (Amazon SNS) topic with its details, such as name and SAML metadata.
  4. In the NNEDH control plane, a Lambda job is triggered by new events on this SNS topic. This job creates the IAM IdP, if needed, and updates the trust policy of the required IAM roles in all the AWS accounts used as data domains, adding the trust on the IdP used by the new analytics service.

In this architecture, all the update steps are event-triggered and scalable. This means that users of new analytics services can access their datasets almost instantaneously when they are created. In the same way, when a service is removed, the federation to the IdP is automatically removed if not used by other services.

Propagate changes on data domains to analytics services

Changes to data domains, such as the creation of a new S3 bucket used as a dataset, or adding or removing data access to a group, must be reflected immediately on analytics services of the consumption layer. To accomplish it, a mechanism is used to synchronize the entitlement database with the relevant changes made in NNEDH. This flow is demonstrated in the following figure.

Changes propagation flow

Taking as an example a scenario where access to a specific dataset is granted to a new group, the steps in this workflow are as follows:

  1. Using the NNEDH admin console, a data owner approves a dataset sharing request that grants access on a dataset to an Active Directory group.
  2. In the AWS account of the related data domain, the dataset components such as the S3 bucket and Lake Formation are updated to provide data access to the new group. The cross-account data sharing in Lake Formation uses AWS RAM.
  3. An event is added in an SNS topic with the current details about this dataset, such as the location of the S3 bucket and the groups that currently have access to it.
  4. In the virtualization layer, the updated information from the data management layer is used to update the entitlement database in DynamoDB.

These steps make sure that changes on data domains are automatically and immediately reflected on the entitlement database, which is used to provide data access to all the analytics services of the consumption layer.

Limitations

Many of these patterns rely on the analytical tool to support a clever use of IAM roles. When this is not the case, the platform teams themselves need to develop custom functionality at the host level to ensure that role accesses are correctly controlled. This, for example, includes writing custom authenticators for JupyterHub.

Conclusion

This post shows an approach to building a scalable and secure data and analytics platform. It showcases some of the mechanisms used at Novo Nordisk and how to strike the right balance between freedom and control. The architecture laid out in the first post in this series enables layer independence, and exposes some extremely useful primitives for data access and governance. We make heavy use of contextual attributes to modulate role permissions at the session level, which provide just-in-time permissions. These permissions are propagated at a scale, across data domains. The upside is that a lot of the complexity related to managing data access permission can be delegated to the relevant business groups, while enabling the end-user consumers of data to think as little as possible about data accesses and focus on providing value for the business use cases. In the case of Novo Nordisk, they can provide better outcomes for patients and acceleration innovation.

The next post in this series describes how end-users can consume data from their analytics tool of choice, aligned with the data access controls detailed in this post.


About the Authors

Jonatan Selsing is former research scientist with a PhD in astrophysics that has turned to the cloud. He is currently the Lead Cloud Engineer at Novo Nordisk, where he enables data and analytics workloads at scale. With an emphasis on reducing the total cost of ownership of cloud-based workloads, while giving full benefit of the advantages of cloud, he designs, builds, and maintains solutions that enable research for future medicines.

Hassen Riahi is a Sr. Data Architect at AWS Professional Services. He holds a PhD in Mathematics & Computer Science on large-scale data management. He works with AWS customers on building data-driven solutions.

Alessandro Fior is a Sr. Data Architect at AWS Professional Services. He is passionate about designing and building modern and scalable data platforms that accelerate companies to extract value from their data.

Moses Arthur comes from a mathematics and computational research background and holds a PhD in Computational Intelligence specialized in Graph Mining. He is currently a Cloud Product Engineer at Novo Nordisk, building GxP-compliant enterprise data lakes and analytics platforms for Novo Nordisk global factories producing digitalized medical products.

Anwar RizalAnwar Rizal is a Senior Machine Learning consultant based in Paris. He works with AWS customers to develop data and AI solutions to sustainably grow their business.

Kumari RamarKumari Ramar is an Agile certified and PMP certified Senior Engagement Manager at AWS Professional Services. She delivers data and AI/ML solutions that speed up cross-system analytics and machine learning models, which enable enterprises to make data-driven decisions and drive new innovations.

Top strategies for high volume tracing with Amazon OpenSearch Ingestion

Post Syndicated from Muthu Pitchaimani original https://aws.amazon.com/blogs/big-data/top-strategies-for-high-volume-tracing-with-amazon-opensearch-ingestion/

Amazon OpenSearch Ingestion is a serverless, auto-scaled, managed data collector that receives, transforms, and delivers data to Amazon OpenSearch Service domains or Amazon OpenSearch Serverless collections. OpenSearch Ingestion is powered by Data Prepper, an open-source, streaming ETL (extract, transform, and load) solution that’s part of the OpenSearch project. When you use OpenSearch Ingestion, you don’t need to maintain self-managed data pipelines to ingest logs, traces, metrics, and other data with OpenSearch Service. Amazon OpenSearch Ingestion responds to changing volumes of data, automatically scaling your ingest pipeline.

Distributed tracing is the leading way to locate, alert on, and remediate problems with your application and infrastructure. Distributed tracing is part of a broader observability solution, often combined with metrics and log data. OpenSearch Service gives you a native toolset to store and analyze large volumes of log, metric, and trace data. However, moving these large volumes of data is non-trivial to set up, monitor, and maintain.

In this post, we outline steps to set up a trace pipeline and strategies to deal with high volume tracing with Amazon OpenSearch Ingestion.

Solution overview

There is now a new option on the OpenSearch Service console called Pipelines under Ingestion in the navigation pane. We use this feature to create a trace pipeline.

You can also use the AWS Command Line Interface (AWS CLI), AWS CloudFormation, or AWS APIs to create a trace pipeline.

Prerequisites

Refer to Security in OpenSearch Ingestion to set up the permissions you need to create a pipeline and write to a pipeline, and the permissions the pipeline needs to write to a sink.

Create a trace pipeline

To create a trace pipeline, complete the following steps:

  1. On the OpenSearch Service console, choose Pipelines under Ingestion in the navigation pane.
  2. Choose Create pipeline.

Amazon OpenSearch Ingestion, powered by Data Prepper, uses pipelines as a mechanism to move the data from a source to a sink, with optional processors to mutate, route, sample, and detect anomalies for the data in the pipe. For more information, refer to Data Prepper. When you use Data Prepper, you build a YAML configuration file. When you use OpenSearch Ingestion, you upload your YAML configuration to the service. If you’re using the OpenSearch Service console, you can use one of the configuration blueprints that we provide. For distributed tracing, you will use an otel_trace_source and an OpenSearch Service domain as the sink.

  1. On the Configuration blueprints menu, choose AWS-TraceAnalyticsPipeline.

Choosing this blueprint will create a sample pipeline with otel_trace_source, an OpenSearch sink, along with span-pipeline and service-map-pipeline.

  1. Enter a name for this pipeline along with a minimum (1) and maximum (96) capacity value for Ingestion-OCUs.

Amazon OpenSearch Ingestion will scale automatically between these values to suit the volume of data you are ingesting.

  1. Edit the configuration’s hosts, aws.sts_role_arn, and region fields of the OpenSearch Service sink.
  2. Follow rest of the steps to complete the trace pipeline creation.

Sample trace pipeline

The following code shows the components of a sample trace pipeline:

version: "2"
entry-pipeline: 
  source:
    otel_trace_source:
      path: "/${pipelineName}/v1/traces"
  processor:
    - trace_peer_forwarder:
  sink:
    - pipeline:
        name: "span-pipeline"
    - pipeline:
        name: "service-map-pipeline"
span-pipeline:
  source:
    pipeline:
      name: "entry-pipeline"
  processor:
    - otel_trace_raw:
  sink:
    - opensearch:
        hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
        aws:
          sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role"
          region: "us-east-1"
        index_type: "trace-analytics-raw"
service-map-pipeline:
  source:
    pipeline:
      name: "entry-pipeline"
  processor:
    - service_map_stateful:
  sink:
    - opensearch:
        hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ]
        aws:
          sts_role_arn: "arn:aws:iam::123456789012:role/Example-Role"
          region: "us-east-1"
        index_type: "trace-analytics-service-map"

The sample trace pipeline has three sub-pipelines in its configuration. These are entry-pipeline, span-pipeline, and service-map-pipeline. The following diagram illustrates the workflow.

entry-pipeline specifies the source of data as otel_trace_source, which creates an HTTP listener for receiving OpenTelemetry traces at the ingestion URL for the pipeline. You use a trace_peer_forwarder processor to eliminate duplicate HTTP requests and forward the data to the span-pipeline and service-map pipelines. span-pipeline gets the raw trace data from entry-pipeline and uses the otel_trace_raw processor to complete trace group-related fields for the incoming span records. You use the service_map_stateful processor to have Data Prepper create the distributed service map for visualization in OpenSearch Dashboards. After the sample trace pipeline is created, it’s ready to receive OpenTelemetry traces at its ingestion URL!

Reduce your storage footprint and optimize for cost

The volume of traces collected from instrumenting a modern production enterprise application can reach tens or hundreds of terabytes very quickly, especially when you store every trace from every request. The problem of managing the storage footprint becomes important. In this section, we discuss strategies for reducing your storage footprint and optimizing for cost.

Use storage tiering

OpenSearch Service has three storage tiers: hot, UltraWarm, and cold. You use the hot tier to store frequently accessed data for quick reading and writing, the UltraWarm tier for infrequently used, read-only data backed by Amazon Simple Storage Service (Amazon S3) for lower cost, and the cold tier to maintain re-attachable data at near-Amazon S3 cost. By adjusting relative retention periods between these tiers, you can store a high volume of traces. For example, instead of storing 1 weeks’ worth of traces in the hot tier, you can store 2 days of traces in the hot tier and 15 days in the UltraWarm tier.

Extract metrics without storing traces

You can also use Data Prepper’s aggregation process to extract metrics in the pipeline to avoid delivering all of your data to OpenSearch Service. For example, you may want to analyze request, error, and duration (RED) metrics of your traces to know the current state of your services. OpenSearch Ingestion can calculate these metrics in the pipeline, aggregating them and storing them in separate indexes for analysis, reducing the ingestion and storage footprint of your traces. The following pipeline configuration snippet shows how to use the aggregate processor to calculate a histogram of the duration metric:

...
  processor:
    - aggregate:
        identification_keys: ["serviceName", "traceId"]
        action:
          histogram:
            key: "durationInNanos"
            record_minmax: true
            units: "nanoseconds"
            buckets: [1000000000, 1500000000, 2000000000]
        group_duration: "20s"
   sink:
    - opensearch:
        hosts: ...
        aws_sts_role_arn: ...
        aws_region: ...
        aws_sigv4: true
        index: "red_metrics_from_traces"
  ...

Use sampling

When your application is running without issues, the proportion of error traces is just a small percentage of your overall trace volume. Storing all of the traces for successful requests increases the cost substantially, while offering low value. To reduce cost, you can sample your trace data, reducing the number of traces you store in OpenSearch Service. There are generally two techniques for sampling:

  • Head sampling – When you do head sampling, you ask OpenSearch Ingestion to make a sampling decision without looking at the whole trace. Head sampling is easy to configure and is efficient, but has a downside of possibly missing important traces.
  • Tail sampling – Tail sampling is where you analyze the entirety of the trace and then decide whether to sample the trace or not. This accurately captures all the needed traces at the cost of complexity in configuring and implementing.

The following configuration snippet shows an example of the percent_sampler, from the aggregate processor. In this example, you send only 25% of your traces to OpenSearch Service, based on head sampling:

  ...
  processor:
    - aggregate:
        identification_keys: ["serviceName"]
        action:
          percent_sampler:
            percent: 25
        group_duration: "30s"
  sink:
    - opensearch:
        hosts: ...
        aws_sts_role_arn: ...
        aws_region: ...
        aws_sigv4: true
        index: "sampled-traces"
  ...

Use conditional routing with sampling

Head sampling using the percentage_sampler is simple and straightforward, but is a blunt tool. A better way to sample would be to gather, for example, 10% of successful responses, and 100% of failed responses or 100% high duration traces. To solve this, use conditional routing. Routes define conditions that can be used within processors and sinks to direct the data flowing through different parts of pipeline. For example, the following configuration snippet routes traces whose status code indicates a failure to the error_trace pipeline. You forward 100% of the data in that pipe. You route traces whose duration metric is more than 1 second to the high_latency pipeline where you sample them at 80%. Other normal traces are only sampled at 20%.

  processor:
    - otel_trace_raw:
  route:
    - error_traces: "/traceGroupFields/statusCode == 2"
    - high_latency_traces: '/durationInNanos >= 1000000000'
    - normal_traces: '/traceGroupFields/statusCode!= 2 and /durationInNanos < 1000000000'
  sink:
    - pipeline:
        name: "trace-error-pipeline"
        routes:
          - error_traces
    - pipeline: 
        name: "trace-high-latency-metrics-pipeline"
        routes: 
          - high_latency_traces
    - pipeline: 
        name: "trace-normal-pipeline"
        routes: 
          - normal_traces
  ...

Conclusion

In this post, you learned how to configure an OpenSearch Ingestion pipeline and several strategies to keep in mind that help minimize cost while supporting a large-scale production system for distributed tracing. As next step, refer to the Amazon OpenSearch Developer Guide to explore logs and metric pipelines that you can use to build a scalable observability solution for your enterprise applications.


About the author

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search applications and solutions. Muthu is interested in the topics of networking and security, and is based out of Austin, Texas.

Perform upserts in a data lake using Amazon Athena and Apache Iceberg

Post Syndicated from Ranjit Rajan original https://aws.amazon.com/blogs/big-data/perform-upserts-in-a-data-lake-using-amazon-athena-and-apache-iceberg/

Amazon Athena supports the MERGE command on Apache Iceberg tables, which allows you to perform inserts, updates, and deletes in your data lake at scale using familiar SQL statements that are compliant with ACID (Atomic, Consistent, Isolated, Durable). Apache Iceberg is an open table format for data lakes that manages large collections of files as tables. It supports modern analytical data lake operations such as create table as select (CTAS), upsert and merge, and time travel queries. Athena also supports the ability to create views and perform VACUUM (snapshot expiration) on Apache Iceberg tables to optimize storage and performance. With these features, you can now build data pipelines completely in standard SQL that are serverless, more simple to build, and able to operate at scale. This enables developers to:

  • Focus on writing business logic and not worry about setting up and managing the underlying infrastructure
  • Perform data transformations with Athena
  • Help comply with certain data deletion requirements
  • Apply change data capture (CDC) from sources databases

With data lakes, data pipelines are typically configured to write data into a raw zone, which is an Amazon Simple Storage Service (Amazon S3) bucket or folder that contains data as is from source systems. Data is accumulated in this zone, such that inserts, updates, or deletes on the sources database appear as records in new files as transactions occur on the source. Although the raw zone can be queried, any downstream processing or analytical queries typically need to deduplicate data to derive a current view of the source table. For example, if a single record is updated multiple times in the source database, these be need to be deduplicated and the most recent record selected.

Typically, data transformation processes are used to perform this operation, and a final consistent view is stored in an S3 bucket or folder. Data transformation processes can be complex requiring more coding, more testing and are also error prone. This was a challenge because data lakes are based on files and have been optimized for appending data. Previously, you had to overwrite the complete S3 object or folder, which was not only inefficient but also interrupted users who were querying the same data. With the evolution of frameworks such as Apache Iceberg, you can perform SQL-based upsert in-place in Amazon S3 using Athena, without blocking user queries and while still maintaining query performance.

In this post, we demonstrate how you can use Athena to apply CDC from a relational database to target tables in an S3 data lake.

Overview of solution

For this post, consider a mock sports ticketing application based on the following project. We use a single table in that database that contains sporting events information and ingest it into an S3 data lake on a continuous basis (initial load and ongoing changes). This data ingestion pipeline can be implemented using AWS Database Migration Service (AWS DMS) to extract both full and ongoing CDC extracts. With CDC, you can determine and track data that has changed and provide it as a stream of changes that a downstream application can consume. Most databases use a transaction log to record changes made to the database. AWS DMS reads the transaction log by using engine-specific API operations and captures the changes made to the database in a nonintrusive manner.

Specifically, to extract changed data including inserts, updates, and deletes from the database, you can configure AWS DMS with two replication tasks, as described in the following workshop. The first task performs an initial copy of the full data into an S3 folder. The second task is configured to replicate ongoing CDC into a separate folder in S3, which is further organized into date-based subfolders based on the source databases’ transaction commit date. With full and CDC data in separate S3 folders, it’s easier to maintain and operate data replication and downstream processing jobs. To enable this, you can apply the following extra connection attributes to the S3 endpoint in AWS DMS, (refer to S3Settings for other CSV and related settings):

  • TimestampColumnName – AWS DMS adds a column that you name with timestamp information for the commit of that row in the source database.
  • includeOpForFullLoad – AWS DMS adds a column named Op to every file to indicate if the record is an I (INSERT), U (UPDATE), or D (DELETE).
  • DatePartitionEnabled, DatePartitionSequence, DatePartitionDelimiter – These settings are used to configure AWS DMS to write changed data to date/time-based folders in the data lake. By partitioning folders, you can better manage S3 objects and optimize data lake queries for subsequent downstream processing.

We use the support in Athena for Apache Iceberg tables called MERGE INTO, which can express row-level updates. Apache Iceberg supports MERGE INTO by rewriting data files that contain rows that need to be updated. After the data is merged, we demonstrate how to use Athena to perform time travel on the sporting_event table, and use views to abstract and present different versions of the data to end-users. Finally, to simplify table maintenance, we demonstrate performing VACUUM on Apache Iceberg tables to delete older snapshots, which will optimize latency and cost of both read and write operations.

The following diagram illustrates the solution architecture.

The solution workflow consists of the following steps:

  • Data ingestion:
    • Steps 1 and 2 use AWS DMS, which connects to the source database to load initial data and ongoing changes (CDC) to Amazon S3 in CSV format. For this post, we have provided sample full and CDC datasets in CSV format that have been generated using AWS DMS.
    • Step 3 is comprised of the following actions:
      • Create an external table in Athena pointing to the source data ingested in Amazon S3.
      • Create an Apache Iceberg target table and load data from the source table.
      • Merge CDC data into the Apache Iceberg table using MERGE INTO.
  • Data access:
    • In Step 4, create a view on the Apache Iceberg table.
    • Use the view to query data using standard SQL.

Prerequisites

Before getting started, make sure you have the required permissions to perform the following in your AWS account:

Create tables on the raw data

First, create a database for this demo.

  1. Navigate to the Athena console and choose Query editor.
    If this is your first time using the Athena query editor, you need to configure and specify an S3 bucket to store the query results.
  2. Create a database with the following code:
    CREATE DATABASE raw_demo;

  3. Next, create a folder in an S3 bucket that you can use for this demo. Name this folder sporting_event_full.
  4. Upload LOAD00000001.csv into the folder.
  5. Switch to the raw_demo database and create a table to point to the raw input data:
    CREATE EXTERNAL TABLE raw_demo.sporting_event(
      op string,
      cdc_timestamp timestamp, 
      id bigint, 
      sport_type_name string, 
      home_team_id int, 
      away_team_id int, 
      location_id smallint, 
      start_date_time timestamp, 
      start_date date, 
      sold_out smallint)
    ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY ',' 
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://<your bucket>/sporting_event_full/'
      ;

  6. Run the following query to review the data:
    SELECT * FROM raw_demo.sporting_event LIMIT 5;

  7. Next, create another folder in the same S3 bucket called sporting_event_cdc.
  8. Within this folder, create three subfolders in a time hierarchy folder structure such that the final S3 folder URI looks like s3://<your-bucket>/sporting_event_cdc/2022/09/22/.
  9. Upload 20220922-184314489.csv into this folder.This folder structure is similar to how AWS DMS stores CDC data when you enable date-based folder partitioning.
  10. Create a table to point to the CDC data. This table also includes a partition column because the source data in Amazon S3 is organized into date-based folders.
    CREATE EXTERNAL TABLE raw_demo.sporting_event_cdc(
    op string,
    cdc_timestamp timestamp,
    id bigint,
    sport_type_name string,
    home_team_id int,
    away_team_id int,
    location_id smallint,
    start_date_time timestamp,
    start_date date,
    sold_out smallint)
    PARTITIONED BY (partition_date string)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ','
    STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION 's3://<your-bucket>/sporting_event_cdc/'
    ;

  11. Next, alter the table to add new partitions. Because the data is stored in non-Hive style format by AWS DMS, to query this data, add this partition manually or use an AWS Glue crawler. As data accumulates, continue to add new partitions to query this data.
    ALTER TABLE raw_demo.sporting_event_cdc ADD PARTITION (partition_date='2022-09-22') location 's3://<your-bucket>/sporting_event_cdc/2022/09/22/'

  12. Run the following query to review the CDC data:
    SELECT * FROM raw_demo.sporting_event_cdc;

There are two records with IDs 1 and 11 that are updates with op code U. The record with ID 21 has a delete (D) op code, and the record with ID 5 is an insert (I).

cdc data

Use CTAS to create the target Iceberg table in Parquet format

CTAS statements create new tables using standard SELECT queries. The resultant table is added to the AWS Glue Data Catalog and made available for querying.

  1. First, create another database to store the target table:
    CREATE DATABASE curated_demo;

  2. Next, switch to this database and run the CTAS statement to select data from the raw input table to create the target Iceberg table (replace the location with an appropriate S3 bucket in your account):
    CREATE TABLE curated_demo.sporting_event
    WITH (table_type='ICEBERG',
    location='s3://<your-bucket>/curated/sporting_event',
    format='PARQUET',
    is_external=false)
    AS SELECT
    id,
    sport_type_name,
    home_team_id,
    away_team_id,
    cast(location_id as int) as location_id,
    cast(start_date_time as timestamp(6)) as start_date_time,
    start_date,
    cast(sold_out as int) as sold_out
    FROM raw_demo.sporting_event
    ;

  3. Run the following query to review data in the Iceberg table:
    SELECT * FROM curated_demo.sporting_event LIMIT 5;

iceberg data

Use MERGE INTO to insert, update, and delete data into the Iceberg table

The MERGE INTO command updates the target table with data from the CDC table. The following statement uses a combination of primary keys and the Op column in the source data, which indicates if the source row is an insert, update, or delete. We use the id column as the primary key to join the target table to the source table, and we use the Op column to determine if a record needs to be deleted.

MERGE INTO curated_demo.sporting_event t
USING (SELECT op,
cdc_timestamp,
id,
sport_type_name,
home_team_id,
away_team_id,
location_id,
start_date_time,
start_date,
sold_out
FROM raw_demo.sporting_event_cdc
WHERE partition_date ='2022-09-22') s
ON t.id = s.id
WHEN MATCHED AND s.op = 'D' THEN DELETE
WHEN MATCHED THEN
UPDATE SET
sport_type_name = s.sport_type_name,
home_team_id = s.home_team_id,
location_id = s.location_id,
start_date_time = s.start_date_time,
start_date = s.start_date,
sold_out = s.sold_out
WHEN NOT MATCHED THEN
INSERT (id,
sport_type_name,
home_team_id,
away_team_id,
location_id,
start_date_time,
start_date)
VALUES
(s.id,
s.sport_type_name,
s.home_team_id,
s.away_team_id,
s.location_id,
s.start_date_time,
s.start_date)

Run the following query to verify data in the Iceberg table:

SELECT * FROM curated_demo.sporting_event WHERE id in (1, 5, 11, 21);

The record with ID 21 has been deleted, and the other records in the CDC dataset have been updated and inserted, as expected.

merge and delete

Create a view that contains the previous state

When you write to an Iceberg table, a new snapshot or version of a table is created each time.

A snapshot represents the state of a table at a point in time and is used to access the complete set of data files in the table. Time travel queries in Athena query Amazon S3 for historical data from a consistent snapshot as of a specified date and time or a specified snapshot ID. However, this requires knowledge of a table’s current snapshots. To abstract this information from users, you can create views on top of Iceberg tables:

CREATE VIEW curated_demo.v_sporting_event_previous_snapshot AS
SELECT id,
sport_type_name,
home_team_id,
away_team_id,
location_id,
cast(start_date_time as timestamp(3)) as start_date_time,
start_date,
sold_out
FROM curated_demo.sporting_event
FOR TIMESTAMP AS OF current_timestamp + interval '-5' minute;

Run the following query using this view to retrieve the snapshot of data before the CDC was applied:

SELECT * FROM curated_demo.v_sporting_event_previous_snapshot WHERE id = 21;

You can see the record with ID 21, which was deleted earlier.

view data

Compliance with privacy regulations may require that you permanently delete records in all snapshots. To accomplish this, you can set properties for snapshot retention in Athena when creating the table, or you can alter the table:

ALTER TABLE curated_demo.sporting_event SET TBLPROPERTIES (
'vacuum_min_snapshots_to_keep'='1',
'vacuum_max_snapshot_age_seconds'='1'
)

This instructs Athena to store only one version of the data and not maintain any transaction history. After a table has been updated with these properties, run the VACUUM command to remove the older snapshots and clean up storage:

VACUUM curated_demo.sporting_event;

Run the following query again:

SELECT * FROM curated_demo.v_sporting_event_previous_snapshot WHERE id = 21;

The record with ID 21 has been permanently deleted.

final validation

Considerations

As data accumulates in the CDC folder of your raw zone, older files can be archived to Amazon S3 Glacier. Subsequently, the MERGE INTO statement can also be run on a single source file if needed by using $path in the WHERE condition of the USING clause:

MERGE INTO curated_demo.sporting_event t
USING (SELECT op, cdc_timestamp,id,sport_type_name, home_team_id, away_team_id, location_id, start_date_time, start_date, sold_out FROM raw_demo.sporting_event_cdc WHERE partition_date='2022-09-22' AND regexp_like("$path", ‘/sporting_event_cdc/2022/09/22/20220922-184314489.csv')
………..

This results in Athena scanning all files in the partition’s folder before the filter is applied, but can be minimized by choosing fine-grained hourly partitions. With this approach, you can trigger the MERGE INTO to run on Athena as files arrive in your S3 bucket using Amazon S3 event notifications. This could enable near-real-time use cases where users need to query a consistent view of data in the data lake as soon it is created in source systems.

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. Run the following SQL to drop the tables and views:
    DROP TABLE raw_demo.sporting_event;
    DROP TABLE raw_demo.sporting_event_cdc;
    DROP TABLE curated_demo.sporting_event;
    DROP VIEW curated_demo.v_sporting_event_previous_snapshot;

    Because Iceberg tables are considered managed tables in Athena, dropping an Iceberg table also removes all the data in the corresponding S3 folder.

  2. Run the following SQL to drop the databases:
    DROP DATABASE raw_demo;
    DROP DATABASE curated_demo;

  3. Delete the S3 folders and CSV files that you had uploaded.

Conclusion

This post showed you how to apply CDC to a target Iceberg table using CTAS and MERGE INTO statements in Athena. You can perform bulk load using a CTAS statement. When new data or changed data arrives, use the MERGE INTO statement to merge the CDC changes. To optimize storage and improve performance of queries, use the VACUUM command regularly.

As next steps, you can orchestrate these SQL statements using AWS Step Functions to implement end-to-end data pipelines for your data lake. For more information, refer to Build and orchestrate ETL pipelines using Amazon Athena and AWS Step Functions.


About the Authors

Ranjit Rajan is a Principal Data Lab Solutions Architect with AWS. Ranjit works with AWS customers to help them design and build data and analytics applications in the cloud.

Kannan Iyer is a Senior Data Lab Solutions Architect with AWS. Kannan works with AWS customers to help them design and build data and analytics applications in the cloud.

Alexandre Rezende is a Data Lab Solutions Architect with AWS. Alexandre works with customers on their Business Intelligence, Data Warehouse, and Data Lake use cases, design architectures to solve their business problems, and helps them build MVPs to accelerate their path to production.

How the BMW Group analyses semiconductor demand with AWS Glue

Post Syndicated from Göksel SARIKAYA original https://aws.amazon.com/blogs/big-data/how-the-bmw-group-analyses-semiconductor-demand-with-aws-glue/

This is a guest post co-written by Maik Leuthold and Nick Harmening from BMW Group.

The BMW Group is headquartered in Munich, Germany, where the company oversees 149,000 employees and manufactures cars and motorcycles in over 30 production sites across 15 countries. This multinational production strategy follows an even more international and extensive supplier network.

Like many automobile companies across the world, the BMW Group has been facing challenges in its supply chain due to the worldwide semiconductor shortage. Creating transparency about BMW Group’s current and future demand of semiconductors is one key strategic aspect to resolve shortages together with suppliers and semiconductor manufacturers. The manufacturers need to know BMW Group’s exact current and future semiconductor volume information, which will effectively help steer the available worldwide supply.

The main requirement is to have an automated, transparent, and long-term semiconductor demand forecast. Additionally, this forecasting system needs to provide data enrichment steps including byproducts, serve as the master data around the semiconductor management, and enable further use cases at the BMW Group.

To enable this use case, we used the BMW Group’s cloud-native data platform called the Cloud Data Hub. In 2019, the BMW Group decided to re-architect and move its on-premises data lake to the AWS Cloud to enable data-driven innovation while scaling with the dynamic needs of the organization. The Cloud Data Hub processes and combines anonymized data from vehicle sensors and other sources across the enterprise to make it easily accessible for internal teams creating customer-facing and internal applications. To learn more about the Cloud Data Hub, refer to BMW Group Uses AWS-Based Data Lake to Unlock the Power of Data.

In this post, we share how the BMW Group analyzes semiconductor demand using AWS Glue.

Logic and systems behind the demand forecast

The first step towards the demand forecast is the identification of semiconductor-relevant components of a vehicle type. Each component is described by a unique part number, which serves as a key in all systems to identify this component. A component can be a headlight or a steering wheel, for example.

For historic reasons, the required data for this aggregation step is siloed and represented differently in diverse systems. Because each source system and data type have its own schema and format, it’s particularly difficult to perform analytics based on this data. Some source systems are already available in the Cloud Data Hub (for example, part master data), therefore it’s straightforward to consume from our AWS account. To access the remaining data sources, we need to build specific ingest jobs that read data from the respective system.

The following diagram illustrates the approach.

The data enrichment starts with an Oracle Database (Software Parts) that contains part numbers that are related to software. This can be the control unit of a headlight or a camera system for automated driving. Because semiconductors are the basis for running software, this database builds the foundation of our data processing.

In the next step, we use REST APIs (Part Relations) to enrich the data with further attributes. This includes how parts are related (for example, a specific control unit that will be installed into a headlight) and over which timespan a part number will be built into a vehicle. The knowledge about the part relations is essential to understand how a specific semiconductor, in this case the control unit, is relevant for a more general part, the headlight. The temporal information about the use of part numbers allows us to filter out outdated part numbers, which will not be used in the future and therefore have no relevance in the forecast.

The data (Part Master Data) can directly be consumed from the Cloud Data Hub. This database includes attributes about the status and material types of a part number. This information is required to filter out part numbers that we gathered in the previous steps but have no relevance for semiconductors. With the information that was gathered from the APIs, this data is also queried to extract further part numbers that weren’t ingested in the previous steps.

After data enrichment and filtering, a third-party system reads the filtered part data and enriches the semiconductor information. Subsequently, it adds the volume information of the components. Finally, it provides the overall semiconductor demand forecast centrally to the Cloud Data Hub.

Applied services

Our solution uses the serverless services AWS Glue and Amazon Simple Storage Service (Amazon S3) to run ETL (extract, transform, and load) workflows without managing an infrastructure. It also reduces the costs by paying only for the time jobs are running. The serverless approach fits our workflow’s schedule very well because we run the workload only once a week.

Because we’re using diverse data source systems as well as complex processing and aggregation, it’s important to decouple ETL jobs. This allows us to process each data source independently. We also split the data transformation into several modules (Data Aggregation, Data Filtering, and Data Preparation) to make the system more transparent and easier to maintain. This approach also helps in case of extending or modifying existing jobs.

Although each module is specific to a data source or a particular data transformation, we utilize reusable blocks inside of every job. This allows us to unify each type of operation and simplifies the procedure of adding new data sources and transformation steps in the future.

In our setup, we follow the security best practice of the least privilege principle, to ensure the information is protected from accidental or unnecessary access. Therefore, each module has AWS Identity and Access Management (IAM) roles with only the necessary permissions, namely access to only data sources and buckets the job deals with. For more information regarding security best practices, refer to Security best practices in IAM.

Solution overview

The following diagram shows the overall workflow where several AWS Glue jobs are interacting with each other sequentially.

As we mentioned earlier, we used the Cloud Data Hub, Oracle DB, and other data sources that we communicate with via the REST API. The first step of the solution is the Data Source Ingest module, which ingests the data from different data sources. For that purpose, AWS Glue jobs read information from different data sources and writes into the S3 source buckets. Ingested data is stored in the encrypted buckets, and keys are managed by AWS Key Management Service (AWS KMS).

After the Data Source Ingest step, intermediate jobs aggregate and enrich the tables with other data sources like components version and categories, model manufacture dates, and so on. Then they write them into the intermediate buckets in the Data Aggregation module, creating comprehensive and abundant data representation. Additionally, according to the business logic workflow, the Data Filtering and Data Preparation modules create the final master data table with only actual and production-relevant information.

The AWS Glue workflow manages all these ingestion jobs and filtering jobs end to end. An AWS Glue workflow schedule is configured weekly to run the workflow on Wednesdays. While the workflow is running, each job writes execution logs (info or error) into Amazon Simple Notification Service (Amazon SNS) and Amazon CloudWatch for monitoring purposes. Amazon SNS forwards the execution results to the monitoring tools, such as Mail, Teams, or Slack channels. In case of any error in the jobs, Amazon SNS also alerts the listeners about the job execution result to take action.

As the last step of the solution, the third-party system reads the master table from the prepared data bucket via Amazon Athena. After further data engineering steps like semiconductor information enrichment and volume information integration, the final master data asset is written into the Cloud Data Hub. With the data now provided in the Cloud Data Hub, other use cases can use this semiconductor master data without building several interfaces to different source systems.

Business outcome

The project results provide the BMW Group a substantial transparency about their semiconductor demand for their entire vehicle portfolio in the present and in the future. The creation of a database with that magnitude enables the BMW Group to establish even further use cases to the benefit of more supply chain transparency and clearer and deeper exchange with first-tier suppliers and semiconductor manufacturers. It helps not only to resolve the current demanding market situation, but also to be more resilient in the future. Therefore, it’s one major step to a digital, transparent supply chain.

Conclusion

This post describes how to analyze semiconductor demand from many data sources with big data jobs in an AWS Glue workflow. A serverless architecture with minimal diversity of services makes the code base and architecture simple to understand and maintain. To learn more about how to use AWS Glue workflows and jobs for serverless orchestration, visit the AWS Glue service page.


About the authors

Maik Leuthold is a Project Lead at the BMW Group for advanced analytics in the business field of supply chain and procurement, and leads the digitalization strategy for the semiconductor management.

Nick Harmening is an IT Project Lead at the BMW Group and an AWS certified Solutions Architect. He builds and operates cloud-native applications with a focus on data engineering and machine learning.

Göksel Sarikaya is a Senior Cloud Application Architect at AWS Professional Services. He enables customers to design scalable, cost-effective, and competitive applications through the innovative production of the AWS platform. He helps them to accelerate customer and partner business outcomes during their digital transformation journey.

Alexander Tselikov is a Data Architect at AWS Professional Services who is passionate about helping customers to build scalable data, analytics and ML solutions to enable timely insights and make critical business decisions.

Rahul Shaurya is a Senior Big Data Architect at Amazon Web Services. He helps and works closely with customers building data platforms and analytical applications on AWS. Outside of work, Rahul loves taking long walks with his dog Barney.

How Huron built an Amazon QuickSight Asset Catalogue with AWS CDK Based Deployment Pipeline

Post Syndicated from Corey Johnson original https://aws.amazon.com/blogs/big-data/how-huron-built-an-amazon-quicksight-asset-catalogue-with-aws-cdk-based-deployment-pipeline/

This is a guest blog post co-written with Corey Johnson from Huron.

Having an accurate and up-to-date inventory of all technical assets helps an organization ensure it can keep track of all its resources with metadata information such as their assigned oners, last updated date, used by whom, how frequently and more. It helps engineers, analysts and businesses access the most up-to-date release of the software asset that bring accuracy to the decision-making process. By keeping track of this information, organizations will be able to identify technology gaps, refresh cycles, and expire assets as needed for archival.

In addition, an inventory of all assets is one of the foundational elements of an organization that facilitates the security and compliance team to audit the assets for improving privacy, security posture and mitigate risk to ensure the business operations run smoothly. Organizations may have different ways of maintaining an asset inventory, that may be an Excel spreadsheet or a database with a fully automated system to keep it up-to-date, but with a common objective of keeping it accurate. Even if organizations can follow manual approaches to update the inventory records but it is recommended to build automation, so that it is accurate at any point of time.

The DevOps practices which revolutionized software engineering in the last decade have yet to come to the world of Business Intelligence solutions. Business intelligence tools by their nature use a paradigm of UI driven development with code-first practices being secondary or nonexistent. As the need for applications that can leverage the organizations internal and client data increases, the same DevOps practices (BIOps) can drive and delivery quality insights more reliably

In this post, we walk you through a solution that Huron and manage lifecycle for all Amazon QuickSight resources across the organization by collaborating with AWS Data Lab Resident Architect & AWS Professional Services team.

About Huron

Huron is a global professional services firm that collaborates with clients to put possible into practice by creating sound strategies, optimizing operations, accelerating digital transformation, and empowering businesses and their people to own their future. By embracing diverse perspectives, encouraging new ideas, and challenging the status quo, Huron creates sustainable results for the organizations we serve. To help address its clients’ growing cloud needs, Huron is an AWS Partner.

Use Case Overview

Huron’s Business Intelligence use case represents visualizations as a service, where Huron has core set of visualizations and dashboards available as products for its customers. The products exist in different industry verticals (healthcare, education, commercial) with independent development teams. Huron’s consultants leverage the products to provide insights as part of consulting engagements. The insights from the product help Huron’s consultants accelerate their customer’s transformation. As part of its overall suite of offerings, there are product dashboards that are featured in a software application following a standardized development lifecycle. In addition, these product dashboards may be forked for customer-specific customization to support a consulting engagement while still consuming from Huron’s productized data assets and datasets. In the next stage of the cycle, Huron’s consultants experiment with new data sources and insights that in turn fed back into the product dashboards.

When changes are made to a product analysis, challenges arise when a base reference analysis gets updated because of new feature releases or bug fixes, and all the customer visualizations that are created from it also need to be updated. To maintain the integrity of embedded visualizations, all metadata and lineage must be available to the parent application. This access to the metadata supports the need for updating visuals based on changes as well as automating row and column level security ensuring customer data is properly governed.

In addition, few customers request customizations on top of the base visualizations, for which Huron team needs to create a replica of the base reference and then customize it for the customer. These are maintained by Huron’s in the field consultants rather than the product development team. These customer specific visualizations create operational overhead because they require Huron to keep track of new customer specific visualizations and maintain them for future releases when the product visuals change.

Huron leverages Amazon QuickSight for their Business Intelligence (BI) reporting needs, enabling them to embed visualizations at scale with higher efficiency and lower cost. A large attraction for Huron to adopt QuickSight came from the forward-looking API capabilities that enable and set the foundation for a BIOps culture and technical infrastructure. To address the above requirement, Huron Global Product team decided to build a QuickSight Asset Tracker and QuickSight Asset Deployment Pipeline.

The QuickSight Asset tracker serves as a catalogue of all QuickSight resources (datasets, analysis, templates, dashboards etc.) with its interdependent relationship. It will help;

  • Create an inventory of all QuickSight resources across all business units
  • Enable dynamic embedding of visualizations and dashboards based on logged in user
  • Enable dynamic row and column level security on the dashboards and visualizations based on the logged-in user
  • Meet compliance and audit requirements of the organization
  • Maintain the current state of all customer specific QuickSight resources

The solution integrates an AWS CDK based pipeline to deploy QuickSight Assets that:

  • Supports Infrastructure-as-a-code for QuickSight Asset Deployment and enables rollbacks if required.
  • Enables separation of development, staging and production environments using QuickSight folders that reduces the burden of multi-account management of QuickSight resources.
  • Enables a hub-and-spoke model for Data Access in multiple AWS accounts in a data mesh fashion.

QuickSight Asset Tracker and QuickSight Asset Management Pipeline – Architecture Overview

The QuickSight Asset Tracker was built as an independent service, which was deployed in a shared AWS service account that integrated Amazon Aurora Serverless PostgreSQL to store metadata information, AWS Lambda as the serverless compute and Amazon API Gateway to provide the REST API layer.

It also integrated AWS CDK and AWS CloudFormation to deploy the product and customer specific QuickSight resources and keep them in consistent and stable state. The metadata of QuickSight resources, created using either AWS console or the AWS CDK based deployment were maintained in Amazon Aurora database through the QuickSight Asset Tracker REST API service.

The CDK based deployment pipeline is triggered via a CI/CD pipeline which performs the following functions:

  1. Takes the ARN of the QuickSight assets (dataset, analysis, etc.)
  2. Describes the asset and dependent resources (if selected)
  3. Creates a copy of the resource in another environment (in this case a QuickSight folder) using CDK

The solution architecture integrated the following AWS services.

  • Amazon Aurora Serverless integrated as the backend database to store metadata information of all QuickSight resources with customer and product information they are related to.
  • Amazon QuickSight as the BI service using which visualization and dashboards can be created and embedded into the online applications.
  • AWS Lambda as the serverless compute service that gets invoked by online applications using Amazon API Gateway service.
  • Amazon SQS to store customer request messages, so that the AWS CDK based pipeline can read from it for processing.
  • AWS CodeCommit is integrated to store the AWS CDK deployment scripts and AWS CodeBuild, AWS CloudFormation integrated to deploy the AWS resources using an infrastructure as a code approach.
  • AWS CloudTrail is integrated to audit user actions and trigger Amazon EventBridge rules when a QuickSight resource is created, updated or deleted, so that the QuickSight Asset Tracker is up-to-date.
  • Amazon S3 integrated to store metadata information, which is used by AWS CDK based pipeline to deploy the QuickSight resources.
  • AWS LakeFormation enables cross-account data access in support of the QuickSight Data Mesh

The following provides a high-level view of the solution architecture.

Architecture Walkthrough:

The following provides a detailed walkthrough of the above architecture.

  • QuickSight Dataset, Template, Analysis, Dashboard and visualization relationships:
    • Steps 1 to 2 represent QuickSight reference analysis reading data from different data sources that may include Amazon S3, Amazon Athena, Amazon Redshift, Amazon Aurora or any other JDBC based sources.
    • Step 3 represents QuickSight templates being created from reference analysis when a customer specific visualization needs to be created and step 4.1 to 4.2 represents customer analysis and dashboards being created from the templates.
    • Steps 7 to 8 represent QuickSight visualizations getting generated from analysis/dashboard and step 6 represents the customer analysis/dashboard/visualizations referring their own customer datasets.
    • Step 10 represents a new fork being created from the base reference analysis for a specific customer, which will create a new QuickSight template and reference analysis for that customer.
    • Step 9 represents end users accessing QuickSight visualizations.
  • Asset Tracker REST API service:
    • Step 15.2 to 15.4 represents the Asset Tracker service, which is deployed in a shared AWS service account, where Amazon API Gateway provides the REST API layer, which invokes AWS Lambda function to read from or write to backend Aurora database (Aurora Serverless v2 – PostgreSQL engine). The database captures all relationship metadata between QuickSight resources, its owners, assigned customers and products.
  • Online application – QuickSight asset discovery and creation
    • Step 15.1 represents the front-end online application reading QuickSight metadata information from the Asset Tracker service to help customers or end users discover visualizations available and be able to dynamically render based on the user login.
    • Step 11 to 12 represents the online application requesting creation of new QuickSight resources, which pushes requests to Amazon SQS and then AWS Lambda triggers AWS CodeBuild to deploy new QuickSight resources. Step 13.1 and 13.2 represents the CDK based pipeline maintaining the QuickSight resources to keep them in a consistent state. Finally, the AWS CDK stack invokes the Asset Tracker service to update its metadata as represented in step 13.3.
  • Tracking QuickSight resources created outside of the AWS CDK Stack
    • Step 14.1 represents users creating QuickSight resources using the AWS Console and step 14.2 represents that activity getting logged into AWS CloudTrail.
    • Step 14.3 to 14.5 represents triggering EventBridge rule for CloudTrail activities that represents QuickSight resource being created, updated or deleted and then invoke the Asset Tracker REST API to register the QuickSight resource metadata.

Architecture Decisions:

The following are few architecture decisions we took while designing the solution.

  • Choosing Aurora database for Asset Tracker: We have evaluated Amazon Neptune for the Asset Tracker database as most of the metadata information we capture are primarily maintaining relationship between QuickSight resources. But when we looked at the query patterns, we found the query pattern is always just one level deep to find who is the parent of a specific QuickSight resource and that can be solved with a relational database’s Primary Key / Foreign Key relationship and with simple self-join SQL query. Knowing the query pattern does not require a graph database, we decided to go with Amazon Aurora to keep it simple, so that we can avoid introducing a new database technology and can reduce operational overhead of maintaining it. In future as the use case evolve, we can evaluate the need for a Graph database and plan for integrating it. For Amazon Aurora, we choose Amazon Aurora Serverless as the usage pattern is not consistent to reserve a server capacity and the serverless tech stack will help reduce operational overhead.
  • Decoupling Asset Tracker as a common REST API service: The Asset Tracker has future scope to be a centralized metadata layer to keep track of all the QuickSight resources across all business units of Huron. So instead of each business unit having its own metadata database, if we build it as a service and deploy it in a shared AWS service account, then we will get benefit from reduced operational overhead, duplicate infrastructure cost and will be able to get a consolidated view of all assets and their integrations. The service provides the ability of applications to consume metadata about the QuickSight assets and then apply their own mapping of security policies to the assets based on their own application data and access control policies.
  • Central QuickSight account with subfolder for environments: The choice was made to use a central account which reduces developer friction of having multiple accounts with multiple identities, end users having to manage multiple accounts and access to resources. QuickSight folders allow for appropriate permissions for separating “environments”. Furthermore, by using folder-based sharing with QuickSight groups, users with appropriate permissions already have access to the latest versions of QuickSight assets without having to share their individual identities.

The solution included an automated Continuous Integration (CI) and Continuous Deployment (CD) pipeline to deploy the resources from development to staging and then finally to production. The following provides a high-level view of the QuickSight CI/CD deployment strategy.

Aurora Database Tables and Reference Analysis update flow

The following are the database tables integrated to capture the QuickSight resource metadata.

  • QS_Dataset: This captures metadata of all QuickSight datasets that are integrated in the reference analysis or customer analysis. This includes AWS ARN (Amazon Resource Name), data source type, ID and more.
  • QS_Template: This table captures metadata of all QuickSight templates, from which customer analysis and dashboards will be created. This includes AWS ARN, parent reference analysis ID, name, version number and more.
  • QS_Folder: This table captures metadata about QuickSight folders which logically groups different visualizations. This includes AWS ARN, name, and description.
  • QS_Analysis: This table captures metadata of all QuickSight analysis that includes AWS ARN, name, type, dataset IDs, parent template ID, tags, permissions and more.
  • QS_Dashboard: This table captures metadata information of QuickSight dashboards that includes AWS ARN, parent template ID, name, dataset IDs, tags, permissions and more.
  • QS_Folder_Asset_Mapping: This table captures folder to QuickSight asset mapping that includes folder ID, Asset ID, and asset type.

As the solution moves to the next phase of implementation, we plan to introduce additional database tables to capture metadata information about QuickSight sheets and asset mapping to customers and products. We will extend the functionality to support visual based embedding to enable truly integrated customer data experiences where embedded visuals mesh with the native content on a web page.

While explaining the use case, we have highlighted it creates a challenge when a base reference analysis gets updated and we need to track the templates that are inherited from it make sure the change is pushed to the linked customer analysis and dashboards. The following example scenarios explains, how the database tables change when a reference analysis is updated.

Example Scenario: When “reference analysis” is updated with a new release

When a base reference analysis is updated because of a new feature release, then a new QuickSight reference analysis and template needs to be created. Then we need to update all customer analysis and dashboard records to point to the new template ID to form the lineage.

The following sequential steps represent the database changes that needs to happen.

  • Insert a new record to the “Analysis” table to represent the new reference analysis creation.
  • Insert a new record to the “Template” table with new reference analysis ID as parent, created in step 1.
  • Retrieve “Analysis” and “Dashboard” table records that points to previous template ID and then update those records with the new template ID, created in step 2.

How will it enable a more robust embedding experience

The QuickSight asset tracker integration with Huron’s products provide users with a personalized, secure and modern analytics experience. When user’s login through Huron’s online application, it will use logged in user’s information to dynamically identify the products they are mapped to and then render the QuickSight visualizations & dashboards that the user is entitled to see. This will improve user experience, enable granular permission management and will also increase performance.

How AWS collaborated with Huron to help build the solution

AWS team collaborated with Huron team to design and implement the solution. AWS Data Lab Resident Architect collaborated with Huron’s lead architect for initial architecture design that compared different options for integration and deriving tradeoffs between them, before finalizing the final architecture. Then with the help of AWS Professional service engineer, we could build the base solution that can be extended by Huron team to roll it out to all business units and integrate additional reporting features on top of it.

The AWS Data Lab Resident Architect program provides AWS customers with guidance in refining and executing their data strategy and solutions roadmap. Resident Architects are dedicated to customers for 6 months, with opportunities for extension, and help customers (Chief Data Officers, VPs of Data Architecture, and Builders) make informed choices and tradeoffs about accelerating their data and analytics workloads and implementation.

The AWS Professional Services organization is a global team of experts that can help customers realize their desired business outcomes when using the AWS Cloud. The Professional Services team work together with customer’s team and their chosen member of the AWS Partner Network (APN) to execute their enterprise cloud computing initiatives.

Next Steps

Huron has rolled out the solution for one business unit and as a next step we plan to roll it out to all business units, so that the asset tracker service is populated with assets available across all business units of the organization to provide consolidated view.

In addition, Huron will be building a reporting layer on top of the Amazon Aurora asset tracker database, so that the leadership has a way to discover assets by business unit, by owner, created between specific date range or the reports that are not updated since a while.

Once the asset tracker is populated with all QuickSight assets, it will be integrated into the front-end online application that can help end users discover existing assets and request creation of new assets.

Newer QuickSight API’s such as assets-as-a-bundle and assets-as-code further accelerate the capabilities of the service by improving the development velocity and reliability of making changes.

Conclusion

This blog explained how Huron built an Asset Tracker to keep track of all QuickSight resources across the organization. This solution may provide a reference to other organizations who would like to build an inventory of visualization reports, ML models or other technical assets. This solution leveraged Amazon Aurora as the primary database, but if an organization would also like to build a detailed lineage of all the assets to understand how they are interrelated then they can consider integrating Amazon Neptune as an alternate database too.

If you have a similar use case and would like to collaborate with AWS Data Analytics Specialist Architects to brainstorm on the architecture, rapidly prototype it and implement a production ready solution then connect with your AWS Account Manager or AWS Solution Architect to start an engagement with AWS Data Lab team.


About the Authors

Corey Johnson is the Lead Data Architect at Huron, where he leads its data architecture for their Global Products Data and Analytics initiatives.

Sakti Mishra is a Principal Data Analytics Architect at AWS, where he helps customers modernize their data architecture, help define end to end data strategy including data security, accessibility, governance, and more. He is also the author of the book Simplify Big Data Analytics with Amazon EMR. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places with family.

Simplify and speed up Apache Spark applications on Amazon Redshift data with Amazon Redshift integration for Apache Spark

Post Syndicated from Gagan Brahmi original https://aws.amazon.com/blogs/big-data/simplify-and-speed-up-apache-spark-applications-on-amazon-redshift-data-with-amazon-redshift-integration-for-apache-spark/

Customers use Amazon Redshift to run their business-critical analytics on petabytes of structured and semi-structured data. Apache Spark is a popular framework that you can use to build applications for use cases such as ETL (extract, transform, and load), interactive analytics, and machine learning (ML). Apache Spark enables you to build applications in a variety of languages, such as Java, Scala, and Python, by accessing the data in your Amazon Redshift data warehouse.

Amazon Redshift integration for Apache Spark helps developers seamlessly build and run Apache Spark applications on Amazon Redshift data. Developers can use AWS analytics and ML services such as Amazon EMR, AWS Glue, and Amazon SageMaker to effortlessly build Apache Spark applications that read from and write to their Amazon Redshift data warehouse. You can do so without compromising on the performance of your applications or transactional consistency of your data.

In this post, we discuss why Amazon Redshift integration for Apache Spark is critical and efficient for analytics and ML. In addition, we discuss use cases that use Amazon Redshift integration with Apache Spark to drive business impact. Finally, we walk you through step-by-step examples of how to use this official AWS connector in an Apache Spark application.

Amazon Redshift integration for Apache Spark

The Amazon Redshift integration for Apache Spark minimizes the cumbersome and often manual process of setting up a spark-redshift connector (community version) and shortens the time needed to prepare for analytics and ML tasks. You only need to specify the connection to your data warehouse, and you can start working with Amazon Redshift data from your Apache Spark-based applications within minutes.

You can use several pushdown capabilities for operations such as sort, aggregate, limit, join, and scalar functions so that only the relevant data is moved from your Amazon Redshift data warehouse to the consuming Apache Spark application. This allows you to improve the performance of your applications. Amazon Redshift admins can easily identify the SQL generated from Spark-based applications. In this post, we show how you can find out the SQL generated by the Apache Spark job.

Moreover, Amazon Redshift integration for Apache Spark uses Parquet file format when staging the data in a temporary directory. Amazon Redshift uses the UNLOAD SQL statement to store this temporary data on Amazon Simple Storage Service (Amazon S3). The Apache Spark application retrieves the results from the temporary directory (stored in Parquet file format), which improves performance.

You can also help make your applications more secure by utilizing AWS Identity and Access Management (IAM) credentials to connect to Amazon Redshift.

Amazon Redshift integration for Apache Spark is built on top of the spark-redshift connector (community version) and enhances it for performance and security, helping you gain up to 10 times faster application performance.

Use cases for Amazon Redshift integration with Apache Spark

For our use case, the leadership of the product-based company wants to know the sales for each product across multiple markets. As sales for the company fluctuate dynamically, it has become a challenge for the leadership to track the sales across multiple markets. However, the overall sales are declining, and the company leadership wants to find out which markets aren’t performing so that they can target these markets for promotion campaigns.

For sales across multiple markets, the product sales data such as orders, transactions, and shipment data is available on Amazon S3 in the data lake. The data engineering team can use Apache Spark with Amazon EMR or AWS Glue to analyze this data in Amazon S3.

The inventory data is available in Amazon Redshift. Similarly, the data engineering team can analyze this data with Apache Spark using Amazon EMR or an AWS Glue job by using the Amazon Redshift integration for Apache Spark to perform aggregations and transformations. The aggregated and transformed dataset can be stored back into Amazon Redshift using the Amazon Redshift integration for Apache Spark.

Using a distributed framework like Apache Spark with the Amazon Redshift integration for Apache Spark can provide the visibility across the data lake and data warehouse to generate sales insights. These insights can be made available to the business stakeholders and line of business users in Amazon Redshift to make informed decisions to run targeted promotions for the low revenue market segments.

Additionally, we can use the Amazon Redshift integration with Apache Spark in the following use cases:

  • An Amazon EMR or AWS Glue customer running Apache Spark jobs wants to transform data and write that into Amazon Redshift as a part of their ETL pipeline
  • An ML customer uses Apache Spark with SageMaker for feature engineering for accessing and transforming data in Amazon Redshift
  • An Amazon EMR, AWS Glue, or SageMaker customer uses Apache Spark for interactive data analysis with data on Amazon Redshift from notebooks

Examples for Amazon Redshift integration for Apache Spark in an Apache Spark application

In this post, we show the steps to connect Amazon Redshift from Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2), Amazon EMR Serverless, and AWS Glue using a common script. In the following sample code, we generate a report showing the quarterly sales for the year 2008. To do that, we join two Amazon Redshift tables using an Apache Spark DataFrame, run a predicate pushdown, aggregate and sort the data, and write the transformed data back to Amazon Redshift. The script uses PySpark

The script uses IAM-based authentication for Amazon Redshift. IAM roles used by Amazon EMR and AWS Glue should have the appropriate permissions to authenticate Amazon Redshift, and access to an S3 bucket for temporary data storage.

The following example policy allows the IAM role to call the GetClusterCredentials operations:

{
  "Version": "2012-10-17",
  "Statement": {
    "Effect": "Allow",
    "Action": "redshift:GetClusterCredentials",
    "Resource": "arn:aws:redshift:<aws_region_name>:xxxxxxxxxxxx:dbuser:*/temp_*"
  }
}

The following example policy allows access to an S3 bucket for temporary data storage:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": "arn:aws:s3:::<s3_bucket_name>"
        }
    ]
}

The complete script is as follows:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initiate Apache Spark session
spark = SparkSession \
        .builder \
        .appName("SparkRedshiftConnector") \
        .enableHiveSupport() \
        .getOrCreate()

# Set connection options for Amazon Redshift
jdbc_iam_url = "jdbc:redshift:iam://redshift-spark-connector-1.xxxxxxxxxxx.<aws_region_name>.redshift.amazonaws.com:5439/sample_data_dev"
temp_dir = 's3://<s3_bucket_name>/redshift-temp-dir/'
aws_role = 'arn:aws:iam::xxxxxxxxxxxx:role/redshift-s3'

# Set query group for the query. More details on Amazon Redshift WLM https://docs.aws.amazon.com/redshift/latest/dg/cm-c-executing-queries.html
queryGroup = "emr-redshift"
jdbc_iam_url_withQueryGroup = jdbc_iam_url+'?queryGroup='+queryGroup

# Set User name for the query
userName = 'awsuser'
jdbc_iam_url_withUserName = jdbc_iam_url_withQueryGroup+';user='+userName

# Define the Amazon Redshift context
redshiftOptions = {
    "url": jdbc_iam_url_withUserName,
    "tempdir": temp_dir,
    "aws_iam_role" : aws_role
}

# Create the sales DataFrame from Amazon Redshift table using io.github.spark_redshift_community.spark.redshift class
sales_df = (
    spark.read
        .format("io.github.spark_redshift_community.spark.redshift")
        .options(**redshiftOptions)
        .option("dbtable", "tickit.sales")
        .load()
)

# Create the date Data Frame from Amazon Redshift table
date_df = (
    spark.read
        .format("io.github.spark_redshift_community.spark.redshift")
        .options(**redshiftOptions)
        .option("dbtable", "tickit.date")
        .load()
)

# Assign a Data Frame to the above output which will be written back to Amazon Redshift
output_df= sales_df.join(date_df, sales_df.dateid == date_df.dateid, 'inner').where(
    col("year") == 2008).groupBy("qtr").sum("qtysold").select(
        col("qtr"), col("sum(qtysold)")).sort(["qtr"], ascending=[1]).withColumnRenamed("sum(qtysold)","total_quantity_sold")

# Display the output
output_df.show()

## Lets drop the queryGroup for easy validation of push down queries
# Set User name for the query
userName = 'awsuser'
jdbc_iam_url_withUserName = jdbc_iam_url+'?user='+userName

# Define the Amazon Redshift context
redshiftWriteOptions = {
    "url": jdbc_iam_url_withUserName,
    "tempdir": temp_dir,
    "aws_iam_role" : aws_role
}

# Write the Data Frame back to Amazon Redshift
output_df.write \
    .format("io.github.spark_redshift_community.spark.redshift") \
    .mode("overwrite") \
    .options(**redshiftWriteOptions) \
    .option("dbtable", "tickit.test") \
    .save()

If you plan to use the preceding script in your environment, make sure you replace the values for the following variables with the appropriate values for your environment: jdbc_iam_url, temp_dir, and aws_role.

In the next section, we walk through the steps to run this script to aggregate a sample dataset that is made available in Amazon Redshift.

Prerequisites

Before we begin, make sure the following prerequisites are met:

Deploy resources using AWS CloudFormation

Complete the following steps to deploy the CloudFormation stack:

  1. Sign in to the AWS Management Console, then launch the CloudFormation stack:
    BDB-2063-launch-cloudformation-stack

You can also download the CloudFormation template to create the resources mentioned in this post through infrastructure as code (IaC). Use this template when launching a new CloudFormation stack.

  1. Scroll down to the bottom of the page to select I acknowledge that AWS CloudFormation might create IAM resources under Capabilities, then choose Create stack.

The stack creation process takes 15–20 minutes to complete. The CloudFormation template creates the following resources:

    • An Amazon VPC with the needed subnets, route tables, and NAT gateway
    • An S3 bucket with the name redshift-spark-databucket-xxxxxxx (note that xxxxxxx is a random string to make the bucket name unique)
    • An Amazon Redshift cluster with sample data loaded inside the database dev and the primary user redshiftmasteruser. For the purpose of this blog post, redshiftmasteruser with administrative permissions is used. However, it is recommended to use a user with fine grained access control in production environment.
    • An IAM role to be used for Amazon Redshift with the ability to request temporary credentials from the Amazon Redshift cluster’s dev database
    • Amazon EMR Studio with the needed IAM roles
    • Amazon EMR release version 6.9.0 on an EC2 cluster with the needed IAM roles
    • An Amazon EMR Serverless application release version 6.9.0
    • An AWS Glue connection and AWS Glue job version 4.0
    • A Jupyter notebook to run using Amazon EMR Studio using Amazon EMR on an EC2 cluster
    • A PySpark script to run using Amazon EMR Studio and Amazon EMR Serverless
  1. After the stack creation is complete, choose the stack name redshift-spark and navigate to the Outputs

We utilize these output values later in this post.

In the next sections, we show the steps for Amazon Redshift integration for Apache Spark from Amazon EMR on Amazon EC2, Amazon EMR Serverless, and AWS Glue.

Use Amazon Redshift integration with Apache Spark on Amazon EMR on EC2

Starting from Amazon EMR release version 6.9.0 and above, the connector using Amazon Redshift integration for Apache Spark and Amazon Redshift JDBC driver are available locally on Amazon EMR. These files are located under the /usr/share/aws/redshift/ directory. However, in the previous versions of Amazon EMR, the community version of the spark-redshift connector is available.

The following example shows how to connect Amazon Redshift using a PySpark kernel via an Amazon EMR Studio notebook. The CloudFormation stack created Amazon EMR Studio, Amazon EMR on an EC2 cluster, and a Jupyter notebook available to run. To go through this example, complete the following steps:

  1. Download the Jupyter notebook made available in the S3 bucket for you:
    • In the CloudFormation stack outputs, look for the value for EMRStudioNotebook, which should point to the redshift-spark-emr.ipynb notebook available in the S3 bucket.
    • Choose the link or open the link in a new tab by copying the URL for the notebook.
    • After you open the link, download the notebook by choosing Download, which will save the file locally on your computer.
  1. Access Amazon EMR Studio by choosing or copying the link provided in the CloudFormation stack outputs for the key EMRStudioURL.
  2. In the navigation pane, choose Workspaces.
  3. Choose Create Workspace.
  4. Provide a name for the Workspace, for instance redshift-spark.
  5. Expand the Advanced configuration section and select Attach Workspace to an EMR cluster.
  6. Under Attach to an EMR cluster, choose the EMR cluster with the name emrCluster-Redshift-Spark.
  7. Choose Create Workspace.
  8. After the Amazon EMR Studio Workspace is created and in Attached status, you can access the Workspace by choosing the name of the Workspace.

This should open the Workspace in a new tab. Note that if you have a pop-up blocker, you may have to allow the Workspace to open or disable the pop-up blocker.

In the Amazon EMR Studio Workspace, we now upload the Jupyter notebook we downloaded earlier.

  1. Choose Upload to browse your local file system and upload the Jupyter notebook (redshift-spark-emr.ipynb).
  2. Choose (double-click) the redshift-spark-emr.ipynb notebook within the Workspace to open the notebook.

The notebook provides the details of different tasks that it performs. Note that in the section Define the variables to connect to Amazon Redshift cluster, you don’t need to update the values for jdbc_iam_url, temp_dir, and aws_role because these are updated for you by AWS CloudFormation. AWS CloudFormation has also performed the steps mentioned in the Prerequisites section of the notebook.

You can now start running the notebook.

  1. Run the individual cells by selecting them and then choosing Play.

You can also use the key combination of Shift+Enter or Shift+Return. Alternatively, you can run all the cells by choosing Run All Cells on the Run menu.

  1. Find the predicate pushdown operation performed on the Amazon Redshift cluster by the Amazon Redshift integration for Apache Spark.

We can also see the temporary data stored on Amazon S3 in the optimized Parquet format. The output can be seen from running the cell in the section Get the last query executed on Amazon Redshift.

  1. To validate the table created by the job from Amazon EMR on Amazon EC2, navigate to the Amazon Redshift console and choose the cluster redshift-spark-redshift-cluster on the Provisioned clusters dashboard page.
  2. In the cluster details, on the Query data menu, choose Query in query editor v2.
  3. Choose the cluster in the navigation pane and connect to the Amazon Redshift cluster when it requests for authentication.
  4. Select Temporary credentials.
  5. For Database, enter dev.
  6. For User name, enter redshiftmasteruser.
  7. Choose Save.
  8. In the navigation pane, expand the cluster redshift-spark-redshift-cluster, expand the dev database, expand tickit, and expand Tables to list all the tables inside the schema tickit.

You should find the table test_emr.

  1. Choose (right-click) the table test_emr, then choose Select table to query the table.
  2. Choose Run to run the SQL statement.

Use Amazon Redshift integration with Apache Spark on Amazon EMR Serverless

The Amazon EMR release version 6.9.0 and above provides the Amazon Redshift integration for Apache Spark JARs (managed by Amazon Redshift) and Amazon Redshift JDBC JARs locally on Amazon EMR Serverless as well. These files are located under the /usr/share/aws/redshift/ directory. In the following example, we use the Python script made available in the S3 bucket by the CloudFormation stack we created earlier.

  1. In the CloudFormation stack outputs, make a note of the value for EMRServerlessExecutionScript, which is the location of the Python script in the S3 bucket.
  2. Also note the value for EMRServerlessJobExecutionRole, which is the IAM role to be used with running the Amazon EMR Serverless job.
  3. Access Amazon EMR Studio by choosing or copying the link provided in the CloudFormation stack outputs for the key EMRStudioURL.
  4. Choose Applications under Serverless in the navigation pane.

You will find an EMR application created by the CloudFormation stack with the name emr-spark-redshift.

  1. Choose the application name to submit a job.
  2. Choose Submit job.
  3. Under Job details, for Name, enter an identifiable name for the job.
  4. For Runtime role, choose the IAM role that you noted from the CloudFormation stack output earlier.
  5. For Script location, provide the path to the Python script you noted earlier from the CloudFormation stack output.
  6. Expand the section Spark properties and choose the Edit in text
  7. Enter the following value in the text box, which provides the path to the redshift-connector, Amazon Redshift JDBC driver, spark-avro JAR, and minimal-json JAR files:
    --jars /usr/share/aws/redshift/jdbc/RedshiftJDBC.jar,/usr/share/aws/redshift/spark-redshift/lib/spark-redshift.jar,/usr/share/aws/redshift/spark-redshift/lib/spark-avro.jar,/usr/share/aws/redshift/spark-redshift/lib/minimal-json.jar

  8. Choose Submit job.
  9. Wait for the job to complete and the run status to show as Success.
  10. Navigate to the Amazon Redshift query editor to view if the table was created successfully.
  11. Check the pushdown queries run for Amazon Redshift query group emr-serverless-redshift. You can run the following SQL statement against the database dev:
    SELECT query_text FROM SYS_QUERY_HISTORY WHERE query_label = 'emr-serverless-redshift' ORDER BY start_time DESC LIMIT 1

You can see that the pushdown query and return results are stored in Parquet file format on Amazon S3.

Use Amazon Redshift integration with Apache Spark on AWS Glue

Starting with AWS Glue version 4.0 and above, the Apache Spark jobs connecting to Amazon Redshift can use the Amazon Redshift integration for Apache Spark and Amazon Redshift JDBC driver. Existing AWS Glue jobs that already use Amazon Redshift as source or target can be upgraded to AWS Glue 4.0 to take advantage of this new connector. The CloudFormation template provided with this post creates the following AWS Glue resources:

  • AWS Glue connection for Amazon Redshift – The connection to establish connection from AWS Glue to Amazon Redshift using the Amazon Redshift integration for Apache Spark
  • IAM role attached to the AWS Glue job – The IAM role to manage permissions to run the AWS Glue job
  • AWS Glue job – The script for the AWS Glue job performing transformations and aggregations using the Amazon Redshift integration for Apache Spark

The following example uses the AWS Glue connection attached to the AWS Glue job with PySpark and includes the following steps:

  1. On the AWS Glue console, choose Connections in the navigation pane.
  2. Under Connections, choose the AWS Glue connection for Amazon Redshift created by the CloudFormation template.
  3. Verify the connection details.

You can now reuse this connection within a job or across multiple jobs.

  1. On the Connectors page, choose the AWS Glue job created by the CloudFormation stack under Your jobs, or access the AWS Glue job by using the URL provided for the key GlueJob in the CloudFormation stack output.
  2. Access and verify the script for the AWS Glue job.
  3. On the Job details tab, make sure that Glue version is set to Glue 4.0.

This ensures that the job uses the latest redshift-spark connector.

  1. Expand Advanced properties and in the Connections section, verify that the connection created by the CloudFormation stack is attached.
  2. Verify the job parameters added for the AWS Glue job. These values are also available in the output for the CloudFormation stack.
  3. Choose Save and then Run.

You can view the status for the job run on the Run tab.

  1. After the job run completes successfully, you can verify the output of the table test-glue created by the AWS Glue job.
  2. We check the pushdown queries run for Amazon Redshift query group glue-redshift. You can run the following SQL statement against the database dev:
    SELECT query_text FROM SYS_QUERY_HISTORY WHERE query_label = 'glue-redshift' ORDER BY start_time DESC LIMIT 1

Best practices

Keep in mind the following best practices:

  • Consider using the Amazon Redshift integration for Apache Spark from Amazon EMR instead of using the redshift-spark connector (community version) for your new Apache Spark jobs.
  • If you have existing Apache Spark jobs using the redshift-spark connector (community version), consider upgrading them to use the Amazon Redshift integration for Apache Spark
  • The Amazon Redshift integration for Apache Spark automatically applies predicate and query pushdown to optimize for performance. We recommend using supported functions (autopushdown) in your query. The Amazon Redshift integration for Apache Spark will turn the function into a SQL query and run the query in Amazon Redshift. This optimization results in required data being retrieved, so Apache Spark can process less data and have better performance.
    • Consider using aggregate pushdown functions like avg, count, max, min, and sum to retrieve filtered data for data processing.
    • Consider using Boolean pushdown operators like in, isnull, isnotnull, contains, endswith, and startswith to retrieve filtered data for data processing.
    • Consider using logical pushdown operators like and, or, and not (or !) to retrieve filtered data for data processing.
  • It’s recommended to pass an IAM role using the parameter aws_iam_role for the Amazon Redshift authentication from your Apache Spark application on Amazon EMR or AWS Glue. The IAM role should have necessary permissions to retrieve temporary IAM credentials to authenticate to Amazon Redshift as shown in this blog’s “Examples for Amazon Redshift integration for Apache Spark in an Apache Spark application” section.
  • With this feature, you don’t have to maintain your Amazon Redshift user name and password in the secrets manager and Amazon Redshift database.
  • Amazon Redshift uses the UNLOAD SQL statement to store this temporary data on Amazon S3. The Apache Spark application retrieves the results from the temporary directory (stored in Parquet file format). This temporary directory on Amazon S3 is not cleaned up automatically, and therefore could add additional cost. We recommend using Amazon S3 lifecycle policies to define the retention rules for the S3 bucket.
  • It’s recommended to turn on Amazon Redshift audit logging to log the information about connections and user activities in your database.
  • It’s recommended to turn on Amazon Redshift at-rest encryption to encrypt your data as Amazon Redshift writes it in its data centers and decrypt it for you when you access it.
  • It’s recommended to upgrade to AWS Glue v4.0 and above to use the Amazon Redshift integration for Apache Spark, which is available out of the box. Upgrading to this version of AWS Glue will automatically make use of this feature.
  • It’s recommended to upgrade to Amazon EMR v6.9.0 and above to use the Amazon Redshift integration for Apache Spark. You don’t have to manage any drivers or JAR files explicitly.
  • Consider using Amazon EMR Studio notebooks to interact with your Amazon Redshift data in your Apache Spark application.
  • Consider using AWS Glue Studio to create Apache Spark jobs using a visual interface. You can also switch to writing Apache Spark code in either Scala or PySpark within AWS Glue Studio.

Clean up

Complete the following steps to clean up the resources that are created as a part of the CloudFormation template to ensure that you’re not billed for the resources if you’ll no longer be using them:

  1. Stop the Amazon EMR Serverless application:
    • Access Amazon EMR Studio by choosing or copying the link provided in the CloudFormation stack outputs for the key EMRStudioURL.
    • Choose Applications under Serverless in the navigation pane.

You will find an EMR application created by the CloudFormation stack with the name emr-spark-redshift.

    • If the application status shows as Stopped, you can move to the next steps. However, if the application status is Started, choose the application name, then choose Stop application and Stop application again to confirm.
  1. Delete the Amazon EMR Studio Workspace:
    • Access Amazon EMR Studio by choosing or copying the link provided in the CloudFormation stack outputs for the key EMRStudioURL.
    • Choose Workspaces in the navigation pane.
    • Select the Workspace that you created and choose Delete, then choose Delete again to confirm.
  2. Delete the CloudFormation stack:
    • On the AWS CloudFormation console, navigate to the stack you created earlier.
    • Choose the stack name and then choose Delete to remove the stack and delete the resources created as a part of this post.
    • On the confirmation screen, choose Delete stack.

Conclusion

In this post, we explained how you can use the Amazon Redshift integration for Apache Spark to build and deploy applications with Amazon EMR on Amazon EC2, Amazon EMR Serverless, and AWS Glue to automatically apply predicate and query pushdown to optimize the query performance for data in Amazon Redshift. It’s highly recommended to use Amazon Redshift integration for Apache Spark for seamless and secure connection to Amazon Redshift from your Amazon EMR or AWS Glue.

Here is what some of our customers have to say about the Amazon Redshift integration for Apache Spark:

“We empower our engineers to build their data pipelines and applications with Apache Spark using Python and Scala. We wanted a tailored solution that simplified operations and delivered faster and more efficiently for our clients, and that’s what we get with the new Amazon Redshift integration for Apache Spark.”

—Huron Consulting

“GE Aerospace uses AWS analytics and Amazon Redshift to enable critical business insights that drive important business decisions. With the support for auto-copy from Amazon S3, we can build simpler data pipelines to move data from Amazon S3 to Amazon Redshift. This accelerates our data product teams’ ability to access data and deliver insights to end-users. We spend more time adding value through data and less time on integrations.”

—GE Aerospace

“Our focus is on providing self-service access to data for all of our users at Goldman Sachs. Through Legend, our open-source data management and governance platform, we enable users to develop data-centric applications and derive data-driven insights as we collaborate across the financial services industry. With the Amazon Redshift integration for Apache Spark, our data platform team will be able to access Amazon Redshift data with minimal manual steps, allowing for zero-code ETL that will increase our ability to make it easier for engineers to focus on perfecting their workflow as they collect complete and timely information. We expect to see a performance improvement of applications and improved security as our users can now easily access the latest data in Amazon Redshift.”

—Goldman Sachs


About the Authors

Gagan Brahmi is a Senior Specialist Solutions Architect focused on big data analytics and AI/ML platform at Amazon Web Services. Gagan has over 18 years of experience in information technology. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. In his spare time, he spends time with his family and explores new places.

Vivek Gautam is a Data Architect with specialization in data lakes at AWS Professional Services. He works with enterprise customers building data products, analytics platforms, and solutions on AWS. When not building and designing data lakes, Vivek is a food enthusiast who also likes to explore new travel destinations and go on hikes.

Naresh Gautam is a Data Analytics and AI/ML leader at AWS with 20 years of experience, who enjoys helping customers architect highly available, high-performance, and cost-effective data analytics and AI/ML solutions to empower customers with data-driven decision-making. In his free time, he enjoys meditation and cooking.

Beaux Sharifi is a Software Development Engineer within the Amazon Redshift drivers’ team where he leads the development of the Amazon Redshift Integration with Apache Spark connector. He has over 20 years of experience building data-driven platforms across multiple industries. In his spare time, he enjoys spending time with his family and surfing.

Exploring new ETL and ELT capabilities for Amazon Redshift from the AWS Glue Studio visual editor

Post Syndicated from Aniket Jiddigoudar original https://aws.amazon.com/blogs/big-data/exploring-new-etl-and-elt-capabilities-for-amazon-redshift-from-the-aws-glue-studio-visual-editor/

In a modern data architecture, unified analytics enable you to access the data you need, whether it’s stored in a data lake or a data warehouse. In particular, we have observed an increasing number of customers who combine and integrate their data into an Amazon Redshift data warehouse to analyze huge data at scale and run complex queries to achieve their business goals.

One of the most common use cases for data preparation on Amazon Redshift is to ingest and transform data from different data stores into an Amazon Redshift data warehouse. This is commonly achieved via AWS Glue, which is a serverless, scalable data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources. AWS Glue provides an extensible architecture that enables users with different data processing use cases, and works well with Amazon Redshift. At AWS re:Invent 2022, we announced support for the new Amazon Redshift integration with Apache Spark available in AWS Glue 4.0, which provides enhanced ETL (extract, transform, and load) and ELT capabilities with improved performance.

Today, we are pleased to announce a new and enhanced visual job authoring capabilities for Amazon Redshift ETL and ELT workflows on the AWS Glue Studio visual editor. The new authoring experience gives you the ability to:

  • Get started faster with Amazon Redshift by directly browsing Amazon Redshift schemas and tables from the AWS Glue Studio visual interface
  • Flexible authoring through native Amazon Redshift SQL support as a source or custom preactions and postactions
  • Simplify common data loading operations into Amazon Redshift through new support for INSERT, TRUNCATE, DROP, and MERGE commands

With these enhancements, you can use existing transforms and connectors in AWS Glue Studio to quickly create data pipelines for Amazon Redshift. No-code users can complete end-to-end tasks using only the visual interface, SQL users can reuse their existing Amazon Redshift SQL within AWS Glue, and all users can tune their logic with custom actions on the visual editor.

In this post, we explore the new streamlined user interface and dive deeper into how to use these capabilities. To demonstrate these new capabilities, we showcase the following:

  • Passing a custom SQL JOIN statement to Amazon Redshift
  • Using the results to apply an AWS Glue Studio visual transform
  • Performing an APPEND on the results to load them into a destination table

Set up resources with AWS CloudFormation

To demonstrate the AWS Glue Studio visual editor experience with Amazon Redshift, we provide an AWS CloudFormation template for you to set up baseline resources quickly. The template creates the following resources for you:

  • An Amazon VPC, subnets, route tables, an internet gateway, and NAT gateways
  • An Amazon Redshift cluster
  • An AWS Identity and Access Management (IAM) role associated with the Amazon Redshift cluster
  • An IAM role for running the AWS Glue job
  • An Amazon Simple Storage Service (Amazon S3) bucket to be used as a temporary location for Amazon Redshift ETL
  • An AWS Secrets Manager secret that stores the user name and password for the Amazon Redshift cluster

Note that at the time of writing this post, Amazon Redshift MERGE is in preview, and the cluster created is a preview cluster.

To launch the CloudFormation stack, complete the following steps:

  1. On the AWS CloudFormation console, choose Create stack and then choose With new resources (standard).
  2. For Template source, select Upload a template file, and upload the provided template.
  3. Choose Next.
  4. Enter a name for the CloudFormation stack, then choose Next.
  5. Acknowledge that this stack might create IAM resources for you, then choose Submit.
  6. After the CloudFormation stack is successfully created, follow the steps mentioned at https://docs.aws.amazon.com/redshift/latest/gsg/rs-gsg-create-sample-db.html to load sample tickit data into the created Redshift Cluster

Exploring Amazon Redshift reads

In this section, we go over the new read functionality in the AWS Glue Studio visual editor and demonstrate how we can run a custom SQL statement via the new UI.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Select the Visual with a blank canvas, because we’re authoring a job from scratch, then choose Create.
  3. In the blank canvas, choose the plus sign to add an Amazon Redshift node of type Source.

When you close the node selector, and you should see an Amazon Redshift source node on the canvas along with the data source properties.

You can choose from two methods of accessing your Amazon Redshift data:

  • Direct data connection – This new method allows you to establish a connection to your Amazon Redshift sources without the need to catalog them
  • Glue Data Catalog tables – This method requires you to have already crawled or generated your Amazon Redshift tables in the AWS Glue Data Catalog

For this post, we use the Direct data connection option.

  1. For Redshift access type, select the Direct data connection.
  2. For Redshift connection, choose your AWS Glue Connection redshift-demo-blog-connection created in the CloudFormation stack.

Specifying the connection automatically configures all the network related details along with the name of the database you wish to connect to.

The UI then presents a choice on how you’d like to access the data from within your selected Amazon Redshift cluster’s database:

  • Choose a single table – This option lets you select a single schema, and a single table from your database. You can browse through all of your available schemas and tables right from the AWS Glue Studio visual editor itself, which makes choosing your source table much easier.
  • Enter a custom query If you’re looking to perform your ETL on a subset of data from your Amazon Redshift tables, you can author an Amazon Redshift query from the AWS Glue Studio UI. This query will be passed to the connected Amazon Redshift cluster, and the returned query result will be available in downstream transformations on AWS Glue Studio.

For the purposes of this post, we write our own custom query that joins data from the preloaded event table and venue table.

  1. Select Enter a custom query and enter the following query into the query editor:
select venue.venueid from event, venue where event.venueid = venue.venueid and event.starttime between '2008-01-01 14:00:00' and '2008-01-01 15:00:00' and venue.venueseats = 0

The intent of this query is to gather the venueid of locations that have had an event between 2008-01-01 14:00:00 and 2008-01-01 15:00:00 and have had venueseats = 0. If we run a similar query from the Amazon Redshift Query Editor, we can see that there are actually five such venues within that time frame. We wish to merge this data back into Amazon Redshift without including these rows.

  1. Choose Infer schema, which allows the AWS Glue Studio visual editor to understand the schema from the returned columns from your query.

You can see the schema on the Output schema tab.

  1. Under Performance and security, for S3 staging directory, choose the S3 temporary directory location created by the CloudFormation stack ( RedshiftS3TempPath ).
  2. For IAM role, choose the IAM role specified by RedshiftIamRoleARN in the CloudFormation stack.

Now we’re going to add a transform to drop duplicate rows from our join result. This will ensure that the MERGE operation in the following steps won’t have conflicting keys when performing the operation.

  1. Choose the Drop Duplicates node to view the node properties.
  2. On the Transform tab, for Drop duplicates, select Match specific keys.
  3. For Keys to match rows, choose venueid.

In this section, we defined the steps to read the output of a custom JOIN query. We then dropped the duplicate records from the returned value. In the next section, we explore the write path on the same job.

Exploring Amazon Redshift writes

Now we go over the enhancements for writing to Amazon Redshift as a destination. This section goes over all the simplified options for writing to Amazon Redshift, but highlights the new Amazon Redshift MERGE capabilities for the purposes of this post.

The MERGE operator offers great flexibility for conditionally merging rows from a source into a destination table. MERGE is powerful because it simplifies operations that traditionally were only achievable by using multiple insert, update, or delete statements separately. Within AWS Glue Studio, particularly with the custom MERGE option, you can define a more complex matching condition to handle finding the records to update.

  1. From the canvas page of the job used in the previous section, select Amazon Redshift to add an Amazon Redshift node of type Target.

When you close the selector, you should see your Amazon Redshift target node added on the Amazon Glue Studio canvas, along with possible options.

  1. For Redshift access type, select Direct data connection.

Similar to the Amazon Redshift source node, the Direct data connection method allows you to write directly to your Amazon Redshift tables without needing to have them cataloged within the AWS Glue Data Catalog.

  1. For Redshift connection, choose your AWS Glue connection redshift-demo-blog-connection created in the CloudFormation stack.
  2. For Schema, choose public.
  3. For Table, choose the venue table as the destination Amazon Redshift table where we will store the merged data.
  4. Choose MERGE data into target table.

This selection provides the user with two options:

  • Choose keys and simple actions – This is a user-friendly version of the MERGE operation. You simply specify the matching keys, and choose what happens to the rows that match the key (update them or delete them) or don’t have any matches (insert them).
  • Enter custom MERGE statement – This option provides the most flexibility. You can enter your own custom logic for MERGE.

For this post, we use the simple actions method for performing a MERGE operation.

  1. For Handling of data and target table, select MERGE data into target table, and then select Choose keys and simple actions.
  2. For Matching Keys, select venueid .

This field will become our MERGE condition for checking keys

  1. For When matched, select the Delete record in the table
  2. For When not matched, select Insert source data as a new row into the table

With these selections, we’ve configured the AWS Glue job to run a MERGE statement on Amazon Redshift while inserting our data. Moreover, for performing this MERGE operation, we use the as the key (you can select multiple keys). If there is a key match with the destination table’s record, we delete that record. Otherwise, we insert the record into the destination table.

  1. Navigate to the Job details tab.
  2. For Name, enter a name for the job.
  3. For the IAM Role drop down, select the RedshiftIamRole role that was created via the CloudFormation template.
  4. Choose Save.

  5. Choose Run and wait for the job to finish.

You can track its progress on the Runs tab.

  1. After the run reaches a successful state, navigate back to the Amazon Redshift Query Editor.
  2. Run the same query again to discover that those rows have been deleted in accordance to our MERGE specifications.

In this section, we configured an Amazon Redshift target node to write a MERGE statement to conditionally update records in our destination Amazon Redshift table. We then saved and ran the AWS Glue job, and saw the effect of the MERGE statement on our destination Amazon Redshift table.

Other available write options

In addition to MERGE, the AWS Glue Studio visual editor’s Amazon Redshift destination node also supports a number of other common operations:

  • APPEND – Appending to your target table performs an insert into the selected table without updating any of the existing records (if there are duplicates, both records will be retained). In cases where you want to update existing rows in addition to adding new rows (often referred to an UPSERT operation), you can select the Also update existing records in target table option. Note that both APPEND only and UPSERT (APPEND with UPDATE) are a simpler subset of the MERGE functionality discussed earlier.
  • TRUNCATE – The TRUNCATE option clears all the data in the existing table but retains all the existing table schema, followed by an APPEND of all new data to the empty table. This option is often used when the full dataset needs to be refreshed and downstream services or tools depend on the table schema being consistent. For example, every night an Amazon Redshift table needs to be fully updated with the latest customer information that will be consumed by an Amazon QuickSight dashboard. In this case, the ETL developer would choose TRUNCATE to ensure the data is fully refreshed but the table schema is guaranteed not to change.
  • DROP – This option is used when the full dataset needs to be refreshed and the downstream services or tools that depend on the schema or systems can handle possible schema changes without breaking.

How write operations are being handled on the backend

The Amazon Redshift connector supports two parameters called preactions and postactions. These parameters allow you to run SQL statements that will be passed on to the Amazon Redshift data warehouse before and after the actual write operation is carried out by Spark.

On the Script tab on the AWS Glue Studio page, we can see what SQL statements are being run.

Use a custom implementation for writing data into Amazon Redshift

In the event that the provided presets require more customization, or your use case requires more advanced implementations for writing to Amazon Redshift, AWS Glue Studio also allows you to freely select which preactions and postactions can be run when writing to Amazon Redshift.

To show an example, we create an Amazon Redshift datashare as a preaction, then perform the cleaning up of the same datashare as a postaction via AWS Glue Studio.

NOTE: This section is not executed as part of the above blog and is provided as an example.

  1. Choose the Amazon Redshift data target node.
  2. On the Data target properties tab, expand the Custom Redshift parameters section.
  3. For the parameters, add the following:
    1. Parameter: preactions  with Value BEGIN; CREATE DATASHARE ds1; END
    2. Parameter: postactions with Value BEGIN; DROP DATASHARE ds1; END

As you can see, we can specify multiple Amazon Redshift statements as a part of both the preactions and postactions parameters. Remember that these statements will override any existing preactions or postactions with your specified actions (as you can see in the following generated code).

Cleanup

To avoid additional costs, make sure to delete any unnecessary resources and files:

  • Empty and delete the contents from the S3 temporary bucket
  • If you deployed the sample CloudFormation stack, delete the CloudFormation stack via the AWS CloudFormation console. Make sure to empty the S3 bucket before you delete the bucket.

Conclusion

In this post, we went over the new AWS Glue Studio visual options for performing reads and writes from Amazon Redshift. We also saw the simplicity with which you can browse your Amazon Redshift tables right from the AWS Glue Studio visual editor UI, and how to run your own custom SQL statements against your Amazon Redshift sources. We then explored how to perform simple ETL loading tasks against Amazon Redshift with just a few clicks, and showcased the new Amazon Redshift MERGE statement.

To dive deeper into the new Amazon Redshift integrations for the AWS Glue Studio visual editor, check out Connecting to Redshift in AWS Glue Studio.


About the Authors

Aniket Jiddigoudar is a Big Data Architect on the AWS Glue team. He works with customers to help improve their big data workloads. In his spare time, he enjoys trying out new food, playing video games, and kickboxing.

Sean Ma is a Principal Product Manager on the AWS Glue team. He has an 18+ year track record of innovating and delivering enterprise products that unlock the power of data for users. Outside of work, Sean enjoys scuba diving and college football.

Get maximum value out of your cloud data warehouse with Amazon Redshift

Post Syndicated from Sana Ahmed original https://aws.amazon.com/blogs/big-data/get-maximum-value-out-of-your-cloud-data-warehouse-with-amazon-redshift/

Every day, customers are challenged with how to manage their growing data volumes and operational costs to unlock the value of data for timely insights and innovation, while maintaining consistent performance. Data creation, consumption, and storage are predicted to grow to 175 zettabytes by 2025, forecasted by the 2022 IDC Global DataSphere report.

As data workloads grow, costs to scale and manage data usage with the right governance typically increase as well. So how do organizational leaders drive their business forward with high performance, controlled costs, and high security? With the right analytics approach, this is possible.

In this post, we look at three key challenges that customers face with growing data and how a modern data warehouse and analytics system like Amazon Redshift can meet these challenges across industries and segments.

Building an optimal data system

As data grows at an extraordinary rate, data proliferation across your data stores, data warehouse, and data lakes can become a challenge. Different departments within an organization can place data in a data lake or within their data warehouse depending on the type of data and usage patterns of that department. Teams may place their unstructured data like social media feeds within their Amazon Simple Storage Service (Amazon S3) data lake and historical structured data within their Amazon Redshift data warehouse. Teams need access to both the data lake and the data warehouse to work seamlessly for best insights, requiring an optimal data infrastructure that can scale almost infinitely to accommodate a growing number of concurrent data users without impacting performance—all while keeping costs under control.

A quintessential example of a company managing analytics on billions of data points across the data lake and the warehouse in a mission-critical business environment is Nasdaq, an American stock exchange. Within 2 years of migration to Amazon Redshift, Nasdaq was managing 30–70 billion records, growing daily worth over 4 terabytes.

With Amazon Redshift, Nasdaq was able to query their warehouse and use Amazon Redshift Spectrum, a capability to query the data quickly in place without data loading, from their S3 data lakes. Nasdaq minimized time to insights with the ability to query 15 terabytes of data on Amazon S3 immediately without any extra data loading after writing data to Amazon S3. This performance innovation allows Nasdaq to have a multi-use data lake between teams.

Robert Hunt, Vice President of Software Engineering for Nasdaq, shared, “We have to both load and consume the 30 billion records in a time period between market close and the following morning. Data loading delayed the delivery of our reports. We needed to be able to write or load data into our data storage solution very quickly without interfering with the reading and querying of the data at the same time.”

Nasdaq’s massive data growth meant they needed to evolve their data architecture to keep up. They built their foundation of a new data lake on Amazon S3 so they could deliver analytics using Amazon Redshift as a compute layer. Nasdaq’s peak volume of daily data ingestion reached 113 billion records, and they completed data loading for reporting 5 hours faster while running 32% faster queries.

Enabling newer personas with data warehousing and analytics

Another challenge is enabling newer data users and personas with powerful analytics to meet business goals and perform critical decision-making. Where traditionally it was the data engineer and the database administrator who set up and managed the warehouse, today line of business data analysts, data scientists, and developers are all using the data warehouse to get to near-real-time business decision-making.
These personas who don’t have specialized data management or data engineering skills don’t want to be concerned with managing the capacity of their analytics systems to handle unpredictable or spiky data workloads or wait for IT to optimize for cost and capacity. Customers want to get started with analytics on large amounts of data instantly and scale analytics quickly and cost-effectively without infrastructure management.

Take the case of mobile gaming company Playrix. They were able to use Amazon Redshift Serverless to serve their key stakeholders with dashboards with financial data for quick decision-making.

Igor Ivanov, Technical Director of Playrix, stated, “Amazon Redshift Serverless is great for achieving the on-demand high performance that we need for massive queries.”

Playrix had a two-fold business goal, including marketing to its end-users (game players) with near-real-time data while also analyzing their historical data for the past 4–5 years. In seeking a solution, Playrix wanted to avoid disrupting other technical processes while also increasing cost savings. The company migrated to Redshift Serverless and scaled up to handle more complicated analytics on 600 TB from the past 5 years, all without storing two copies of the data or disrupting other analytics jobs. With Redshift Serverless, Playrix achieved a more flexible architecture and saved an overall 20% in costs of its marketing stack, decreasing its cost of customer acquisition.

“With no overhead and infrastructure management,” Ivanov shared, “we now have more time for experimenting, developing solutions, and planning new research.”

Breaking down data silos

Organizations need to easily access and analyze diverse types of structured and unstructured data, including log files, clickstreams, voice, and video. However, these wide-ranging data types are typically stored in silos across multiple data stores. To unlock the true potential of the data, organizations must break down these silos to unify and normalize all types of data and ensure that the right people have access to the right data.

Data unification can get expensive fast, with time and cost spent on building complex, custom extract, transform, load (ETL) pipelines that move or copy data from system to system. If not done right, you can end up with data latency issues, inaccuracies, and potential security and data governance risks. Instead, teams are looking for ways to share transactionally consistent, live, first-party and third-party data with each other or their end customers, without data movement or data copying.

Stripe, a payment processing platform for businesses, is an Amazon Redshift customer and a partner with thousands of end customers who require access to Stripe data for their applications. Stripe built the Stripe Data Pipeline, a solution for Stripe customers to access Stripe datasets within their Amazon Redshift data warehouses, without having to build, maintain, or scale custom ETL jobs. The Stripe Data Pipeline is powered by the data sharing capability of Amazon Redshift. Customers get a single source of truth, with low-latency data access, to speed up financial close and get better insights, analyzing best-performing payment methods, fraud by location, and more. Cutting down data engineering time and effort to access unified data creates new business opportunities from comprehensive insights and saves costs.

A modern data architecture with Amazon Redshift

These stories about harnessing maximum value from siloed data across the organization and applying powerful analytics for business insights in a cost-efficient way are possible because of AWS’s approach to a modern data architecture for their customers. Within this architecture, AWS’s data warehousing solution Amazon Redshift is a fully managed petabyte scale system, deeply integrated with AWS database, analytics, and machine learning (ML) services. Tens of thousands of customers use Amazon Redshift every day to run data warehousing and analytics in the cloud and process exabytes of data for business insights. Customers looking for a highly performing, cost-optimized cloud data warehouse solution choose Amazon Redshift for the following reasons:

  • Its leadership in price-performance
  • The ability to break through data silos for meaningful insights
  • Easy analytics capabilities that cut down data engineering and administrative requirements
  • Security and reliability features that are offered out of the box, at no additional cost

The price-performance in a cloud data warehouse benchmark metric is simply defined as the cost to perform a particular workload. Knowing how much your data warehouse is going to cost and how performance changes as your user base and data processing increases is crucial for planning, budgeting, and decision-making around choosing the best data warehouse.

Amazon Redshift is able to attain the best price-performance for customers (up to five times better than other cloud data warehouses) by optimizing the code for AWS hardware, high-performance and power-efficient compute hardware, new compression and caching algorithms, and autonomics (ML-based optimizations) within the warehouse to abstract the administrative activities away from the user, saving time and improving performance. Flexible pricing options such as pay-as-you-go with Redshift Serverless, separation of storage and compute scaling, and 1–3-year compute reservations with heavy discounts keep prices low.

The native integrations in Amazon Redshift with databases, data lakes, streaming data services, and ML services, employing zero-ETL approaches help you access data in place without data movement and easily ingest data into the warehouse without building complex pipelines. This keeps data engineering costs low and expands analytics for more users.

For example, the integration in Amazon Redshift with Amazon SageMaker allows data analysts to stay within the data warehouse and create, train, and build ML models in SQL with no need for ETL jobs or learning new languages for ML (see Jobcase Scales ML Workflows to Support Billions of Daily Predictions Using Amazon Redshift ML for an example). Every week, over 80 billion predictions happen in the warehouse with Amazon Redshift ML.

Finally, customers don’t have to pay more to secure their critical data assets. Security features offer comprehensive identity management with data encryption, granular access controls at row and column level, and data masking abilities to protect sensitive data and authorizations for the right users or groups. These features are available out of the box, within the standard pricing model.

Conclusion

Overall, customers who choose Amazon Redshift innovate in a new reality where the data warehouse scales up and down automatically as workloads change, and maximizes the value of data for all cornerstones of their business.

For market leaders like Nasdaq, they are able to ingest billions of data points daily for trading and selling at high volume and velocity, all in time for proper billing and trading the following business day. For customers like Playrix, choosing Redshift Serverless means marketing to customers with comprehensive analytics in near-real time without getting bogged down by maintenance and overhead. For Stripe, it also means taking the complexity and TCO out of ETL, removing silos and unifying data.

Although data will continue to grow at unprecedented amounts, your bottom line doesn’t need to suffer. While organizational leaders face the pressures of solving for cost optimization in all types of economic environments, Amazon Redshift gives market leaders a space to innovate without compromising their data value, performance, and budgets of their cloud data warehouse.

Learn more about maximizing the value of your data with a modern data warehouse like Amazon Redshift. For more information about the price-performance leadership of Amazon Redshift and to review benchmarks against other vendors, see Amazon Redshift continues its price-performance leadership. Additionally, you can optimize costs using a variety of performance and cost levers, including Amazon Redshift’s flexible pricing models, which cover pay-as-you-go pricing for variable workloads, free trials, and reservations for steady state workloads.


About the authors

Sana Ahmed is a  Sr. Product Marketing Manager for Amazon Redshift. She is passionate about people, products and problem-solving with product marketing. As a Product Marketer, she has taken 50+ products to market and worked at various different companies including Sprinklr, PayPal and Facebook. Her hobbies include tennis, museum-hopping and fun conversations with friends and family.

Sunaina AbdulSalah leads product marketing for Amazon Redshift. She focuses on educating customers about the impact of data warehousing and analytics and sharing AWS customer stories. She has a deep background in marketing and GTM functions in the B2B technology and cloud computing domains. Outside of work, she spends time with her family and friends and enjoys traveling.

Automate discovery of data relationships using ML and Amazon Neptune graph technology

Post Syndicated from Moira Lennox original https://aws.amazon.com/blogs/big-data/automate-discovery-of-data-relationships-using-ml-and-amazon-neptune-graph-technology/

Data mesh is a new approach to data management. Companies across industries are using a data mesh to decentralize data management to improve data agility and get value from data. However, when a data producer shares data products on a data mesh self-serve web portal, it’s neither intuitive nor easy for a data consumer to know which data products they can join to create new insights. This is especially true in a large enterprise with thousands of data products.

This post shows how to use machine learning (ML) and Amazon Neptune to create automated recommendations to join data products and display those recommendations alongside the existing data products. This allows data consumers to easily identify new datasets and provides agility and innovation without spending hours doing analysis and research.

Background

The success of a data-driven organization recognizes data as a key enabler to increase and sustain innovation. It follows what is called a distributed system architecture. The goal of a data product is to solve the long-standing issue of data silos and data quality. Independent data products often only have value if you can connect them, join them, and correlate them to create a higher order data product that creates additional insights. A modern data architecture is critical in order to become a data-driven organization. It allows stakeholders to manage and work with data products across the organization, enhancing the pace and scale of innovation.

Solution overview

A data mesh architecture starts to solve for the decoupled architecture by decoupling the data infrastructure from the application infrastructure, which is a common challenge in traditional data architectures. It focuses on decentralized ownership, domain design, data products, and self-serve data infrastructure. This allows for a new way of thinking and new organizational elements—namely, a modern data community.

However, today’s data mesh platform contains largely independent data products. Even with well-documented data products, knowing how to connect or join data products is a time-consuming job. Data consumers spend hours, days, or months to understand and analyze the data. Identifying links or relationships between data products is critical to create value from the data mesh and enable a data-driven organization.

The solution in this post illustrates an approach to solving these challenges. It uses a fictional insurance company with several data products shared on their data mesh marketplace. The following figure shows the sample data products used in our solution.

Suppose a consumer is browsing the customer data product in the data mesh marketplace. The consumer wonders if the customer data could be linked to claim, policy, or encounter data. Because these data products come from different lines of business (LOBs) or silos, it’s hard to know. A consumer would have to review each data product and do the necessary analysis and research to know this with any certainty.

To solve this problem, our solution uses ML and Neptune to create recommendations for the data consumer. The solution generates a list of data products, product attributes, and the associated probability scores to show join ability. This reduces the time to discover, analyze, and create new insights.

We use Valentine, a data science algorithm for comparing datasets, to improve data product recommendations. Neptune, the managed AWS graph database service, stores information about explicit connections between datasets, improving the recommendations.

Example use case

Let’s walk through a concrete example. Suppose a consumer is browsing the Customer data product in the data mesh marketplace. Customer is similar to the Policy and Encounter data products, but these products come from different silos. Their similarity to the Customer is hard to gauge. To expedite the consumer’s work, the mesh recommends how the Policy and Encounter products can be connected to the Customer product.

Let’s consider two cases. First, is Customer similar to Claim? The following is a sample of the data in each product.

Intuitively, these two products have lots of overlap. Every Cust_Nbr in Claim has a corresponding Customer_ID in Customer. There is no foreign key constraint in Claim that assures us it points to Customer. We think there is enough similarity to infer a join relationship.

The data science algorithm Valentine is an effective tool for this. Valentine is presented in the paper Valentine: Evaluating Matching Techniques for Dataset Discovery (2021, Koutras et al.). Valentine determines if two datasets are joinable or unionable. We focus on the former. Two datasets are joinable if a record from one dataset has a link to a record in the other dataset using one or more columns. Valentine addresses the use case where data is messy: there is no foreign key constraint in place, and data doesn’t match perfectly between datasets. Valentine looks for similarities, and its findings are probabilistic. It scores its proposed matches.

This solution uses an implementation of Valentine available in the following GitHub repo. The first step is to load each data product from its source into a Pandas data frame. If the data is large, load a representative subset of it, at most a few million records. Pass the frames to the valentine_match() function and select the matching method. We use COMA, one of several methods that Valentine supports. The function’s result indicates the similarity of columns and the score. In this case, it tells us that the Customer_ID for Customer matches the Cust_Nbr for Claim, with a very high score. We then instruct the data mesh to recommend Claim to the consumer browsing Customer.

A graph database isn’t required to recommend Claim; the two products could be directly compared. But let’s consider Encounter. Is Customer similar to Encounter? This case is more complicated. Many encounters in the Encounter product don’t link to a customer. An encounter occurs when someone contacts the contact center, which could be by phone or email. The party may or may not be a customer, and if they are a customer, we may not know their customer ID during this encounter. Additionally, sometimes the phone or email they use isn’t the same as the one from a customer record in the Customer product.

In the following sample encounter set, encounters 1 and 2 match to Customer_ID 4. Note that encounter 2’s inbound_email doesn’t exactly match the inbound_email in that customer’s record in the Customer product. Encounter 3 has no Customer_ID, but its inbound_email matches the customer with ID 8. Encounter 4 appears to refer to the customer with ID 8, but the email doesn’t match, and no Customer_ID is given. Encounter 5 only has Inbound_Phone, but that matches the customer with ID 1. Encounter 6 only has an Inbound_Phone, and it doesn’t appear to match any of the customers we’ve listed so far.

We don’t have a strong enough comparison to infer similarity.

But we know more about the customer than the Customer product tells us. In the Neptune database, we maintain a knowledge graph that combines multiple products and links them through relationships. A knowledge graph allows us to combine data from different sources to gain a better understanding of a specific problem domain. In Neptune, we combine the Customer product data with an additional data product: Sales Opportunity. We ingest each product from its source into the knowledge graph and model a hasSalesOpportunity relationship between Customer and SalesOpportunity resources. The following figure shows these resources, their attributes, and their relationship.

With the AWS SDK for Pandas, we combine this data by running a query against the Neptune graph. We use a graph query language (such as SPARQL) to wrangle a representative subset of customer and sales opportunity data into a Pandas data frame (shown as Enhanced Customer View in the following figure). In the following example, we enhance customers 7 and 8 with alternate phone or email contact data from sales opportunities.

We pass that frame to Valentine and compare it to Encounter. This time, two additional encounters match a customer.

The score meets our threshold, and is high enough to share with the consumer as a possible match. To the customer browsing Customer in the mesh marketplace, we present the recommendation of Encounter, along with scoring details to support the recommendation. With this recommendation, the consumer can explore the Encounter product with greater confidence.

Conclusion

Data-driven organizations are transitioning to a data product way of thinking. Utilizing strategies like data mesh generates value on a large scale. We took this a step further by creating a blueprint to create smart recommendations by linking similar data products using graph technology and ML. In this post, we showed how an organization can augment a data catalog with additional metadata by using ML and Neptune with an automated process.

This solution solves the interoperability and linkage problem for data products. Additionally, it gives organizations real-time insights, agility, and innovation without spending time on data analysis and research. This approach creates a truly connected ecosystem with simplified access to delight your data consumers. The current solution is platform agnostic; however, in a future post we will show how to implement this using data.all (open-source software) and Amazon DataZone.

To learn more about ML in Neptune, refer to Amazon Neptune ML for machine learning on graphs. You can also explore Neptune notebooks demonstrating ML and data science for graphs. For more information about the data mesh architecture, refer to Design a data mesh architecture using AWS Lake Formation and AWS Glue. To learn more about Amazon DataZone and how you can share, search, and discover data at scale across organizational boundaries.


About the Authors


Moira Lennox
is a Senior Data Strategy Technical Specialist for AWS with 27 years’ experience helping companies innovate and modernize their data strategies to achieve new heights and allow for strategic decision-making. She has experience working in large enterprises and technology providers, in both business and technical roles across multiple industries, including health care live sciences, financial services, communications, digital entertainment, energy, and manufacturing.

Joel Farvault is Principal Specialist SA Analytics for AWS with 25 years’ experience working on enterprise architecture, data strategy, and analytics, mainly in the financial services industry. Joel has led data transformation projects on fraud analytics, claims automation, and data governance.

Mike Havey is a Solutions Architect for AWS with over 25 years of experience building enterprise applications. Mike is the author of two books and numerous articles. His Amazon author page

Configure SAML federation for Amazon OpenSearch Serverless with AWS IAM Identity Center

Post Syndicated from Utkarsh Agarwal original https://aws.amazon.com/blogs/big-data/configure-saml-federation-for-amazon-opensearch-serverless-with-aws-iam-identity-center/

Amazon OpenSearch Serverless is a serverless option of Amazon OpenSearch Service that makes it easy for you to run large-scale search and analytics workloads without having to configure, manage, or scale OpenSearch clusters. It automatically provisions and scales the underlying resources to deliver fast data ingestion and query responses for even the most demanding and unpredictable workloads. With OpenSearch Serverless, you can configure SAML to enable users to access data through OpenSearch Dashboards using an external SAML identity provider (IdP).

AWS IAM Identity Center (Successor to AWS Single Sign-On) helps you securely create or connect your workforce identities and manage their access centrally across AWS accounts and applications, OpenSearch Dashboards being one of them.

In this post, we show you how to configure SAML authentication for OpenSearch Dashboards using IAM Identity Center as its IdP.

Solution overview

The following diagram illustrates how the solution allows users or groups to authenticate into OpenSearch Dashboards using single sign-on (SSO) with IAM Identity Center using its built-in directory as the identity source.

The workflow steps are as follows:

  1. A user accesses the OpenSearch Dashboard URL in their browser and chooses the SAML provider.
  2. OpenSearch Serverless redirects the login to the specified IdP.
  3. The IdP provides a login form for the user to specify the credentials for authentication.
  4. After the user is authenticated successfully, a SAML assertion is sent back to OpenSearch Serverless.

OpenSearch Serverless validates the SAML assertion, and the user logs in to OpenSearch Dashboards.

Prerequisites

To get started, you must have an active OpenSearch Serverless collection. Refer to Creating and managing Amazon OpenSearch Serverless collections to learn more about creating a collection. Furthermore, you must have the correct AWS Identity and Access Management (IAM) permissions for configuring SAML authentication along with relevant IAM permissions for configuring the data access policy.

IAM Identity Center should be enabled, and you should have the relevant IAM permissions to create an application in IAM Identity Center and create and manage users and groups.

Create and configure the application in IAM Identity Center

To set up your application in IAM Identity Center, complete the following steps:

  1. On the IAM Identity Center dashboard, choose Applications in the navigation pane.
  2. Choose Add application
  3. For Custom application, select Add custom SAML 2.0 application.
  4. Choose Next.
  5. Under Configure application, enter a name and description for the application.
  6. Under IAM Identity Center metadata, choose Download under IAM Identity Center SAML metadata file.

We use this metadata file to create a SAML provider under OpenSearch Serverless. It contains the public certificate used to verify the signature of the IAM Identity Center SAML assertions.

  1. Under Application properties, leave Application start URL and Relay state blank.
  2. For Session duration, choose 1 hour (the default value).

Note that the session duration you configure in this step takes precedence over the OpenSearch Dashboards timeout setting specified in the configuration of the SAML provider details on the OpenSearch Serverless end.

  1. Under Application metadata, select Manually type your metadata values.
  2. For Application ACS URL, enter your URL using the format https://collection.<REGION>.aoss.amazonaws.com/_saml/acs. For example, we enter https://collection.us-east-1.aoss.amazonaws.com/_saml/acs for this post.
  3. For Application SAML audience, enter your service provider in the format aws:opensearch:<aws account id>.
  4. Choose Submit.

Now you modify the attribute settings. The attribute mappings you configure here become part of the SAML assertion that is sent to the application.

  1. On the Actions menu, choose Edit attribute mappings.
  2. Configure Subject to map to ${user:email}, with the format unspecified.

Using ${user:email} here ensures that the email address for the user in IAM Identity Center is passed in the <NameId> tag of the SAML response.

  1. Choose Save changes.

Now we assign a user to the application.

  1. Create a user in IAM Identity Center to use to log in to OpenSearch Dashboards.

Alternatively, you can use an existing user.

  1. On the IAM Identity Center console, navigate to your application and choose Assign Users and select the user(s) you would like to assign.

You have now created a custom SAML application. Next, you will configure the SAML provider in OpenSearch Serverless.

Create a SAML provider

The SAML provider you create in this step can be assigned to any collection in the same Region. Complete the following steps:

  1. On the OpenSearch Service console, under Serverless in the navigation pane, choose SAML authentication under Security.
  2. Choose Create SAML provider.
  3. Enter a name and description for your SAML provider.
  4. Enter the metadata from your IdP that you downloaded earlier.
  5. Under Additional settings, you can optionally add custom user ID and group attributes. We leave these settings blank for now.
  6. Choose Create a SAML provider.

You have now configured a SAML provider for OpenSearch Serverless. Next, we walk you through configuring the data access policy for accessing collections.

Create the data access policy

In this section, you set up data access policies for OpenSearch Serverless and allow access to the users. Complete the following steps:

  1. On the OpenSearch Service console, under Serverless in the navigation pane, choose Data access policies under Security.
  2. Choose Create access policy.
  3. Enter a name and description for your access policy.
  4. For Policy definition method, select Visual Editor.
  5. In the Rules section, enter a rule name.
  6. Under Select principals, for Add principals, choose SAML users and groups.
  7. For SAML provider name, choose the SAML provider you created earlier.
  8. Specify the user in the format user/<email> (for example, user/[email protected]).

The value of the email address should match the email address in IAM Identity Center.

  1. Choose Save.
  2. Choose Grant and specify the permissions.

You can configure what access you want to provide for the specific user at the collection level and specific indexes at the index pattern level.

You should select the access the user needs based on the least privilege model. Refer to Supported policy permissions and Supported OpenSearch API operations and permissions to set up more granular access for your users.

  1. Choose Save and configure any additional rules, if required.

You can now review and edit your configuration if needed.

  1. Choose Create to create the data access policy.

Now you have the data access policy that will allow the users to perform the allowed actions on OpenSearch Dashboards.

Access OpenSearch Dashboards

To sign in to OpenSearch Dashboards, complete the following steps:

  1. On the OpenSearch Service dashboard, under Serverless in the navigation pane, choose Dashboard.
  2. Locate your dashboard and copy the OpenSearch Dashboards URL (in the format <collection-endpoint>/_dashboards).
  3. Enter this URL into a new browser tab.
  4. On the OpenSearch login page, choose your IdP and specify your SSO credentials.
  5. Choose Login.

Configure SAML authentication using groups in IAM Identity Center

Groups can help you organize your users and permissions in a coherent way. With groups, you can add multiple users from the IdP, and then use groupid as the identifier in the data access policy. For more information, refer to Add groups and Add users to groups.

To configure group access to OpenSearch Dashboards, complete the following steps:

  1. On the IAM Identity Center console, navigate to your application.
  2. In the Attribute mappings section, add an additional user as group and map it to ${user:groups}, with the format unspecified.
  3. Choose Save changes.
  4. For the SAML provider in OpenSearch Serverless, under Additional settings, for Group attribute, enter group.
  5. For the data access policy, create a new rule or add an additional principal in the previous rule.
  6. Choose the SAML provider name and enter group/<GroupId>.

You can fetch the value for the group ID by navigating to the Group section on the IAM Identity Center console.

Clean up

If you don’t want to continue using the solution, be sure to delete the resources you created:

  1. On the IAM Identity Center console, remove the application.
  2. On OpenSearch Dashboards, delete the following resources:
    1. Delete your collection.
    2. Delete the data access policy.
    3. Delete the SAML provider.

Conclusion

In this post, you learned how to set up IAM Identity Center as an IdP to access OpenSearch Dashboards using SAML as SSO. You also learned on how to set up users and groups within IAM Identity Center and control the access of users and groups for OpenSearch Dashboards. For more details, refer to SAML authentication for Amazon OpenSearch Serverless.

Stay tuned for a series of posts focusing on the various options available for you to build effective log analytics and search solutions using OpenSearch Serverless. You can also refer to the Getting started with Amazon OpenSearch Serverless workshop to know more about OpenSearch Serverless.

If you have feedback about this post, submit it in the comments section. If you have questions about this post, start a new thread on the OpenSearch Service forum or contact AWS Support.


About the Authors

Utkarsh Agarwal is a Cloud Support Engineer in the Support Engineering team at Amazon Web Services. He specializes in Amazon OpenSearch Service. He provides guidance and technical assistance to customers thus enabling them to build scalable, highly available and secure solutions in AWS Cloud. In his free time, he enjoys watching movies, TV series and of course cricket! Lately, he his also attempting to master the art of cooking in his free time – The taste buds are excited, but the kitchen might disagree.

Ravi Bhatane is a software engineer with Amazon OpenSearch Serverless Service. He is passionate about security, distributed systems, and building scalable services. When he’s not coding, Ravi enjoys photography and exploring new hiking trails with his friends.

Prashant Agrawal is a Sr. Search Specialist Solutions Architect with Amazon OpenSearch Service. He works closely with customers to help them migrate their workloads to the cloud and helps existing customers fine-tune their clusters to achieve better performance and save on cost. Before joining AWS, he helped various customers use OpenSearch and Elasticsearch for their search and log analytics use cases. When not working, you can find him traveling and exploring new places. In short, he likes doing Eat → Travel → Repeat.

How CyberSolutions built a scalable data pipeline using Amazon EMR Serverless and the AWS Data Lab

Post Syndicated from Constantin Scoarță original https://aws.amazon.com/blogs/big-data/how-cybersolutions-built-a-scalable-data-pipeline-using-amazon-emr-serverless-and-the-aws-data-lab/

This post is co-written by Constantin Scoarță and Horațiu Măiereanu from CyberSolutions Tech.

CyberSolutions is one of the leading ecommerce enablers in Germany. We design, implement, maintain, and optimize award-winning ecommerce platforms end to end. Our solutions are based on best-in-class software like SAP Hybris and Adobe Experience Manager, and complemented by unique services that help automate the pricing and sourcing processes.

We have built data pipelines to process, aggregate, and clean our data for our forecasting service. With the growing interest in our services, we wanted to scale our batch-based data pipeline to process more historical data on a daily basis and yet remain performant, cost-efficient, and predictable. To meet our requirements, we have been exploring the use of Amazon EMR Serverless as a potential solution.

To accelerate our initiative, we worked with the AWS Data Lab team. They offer joint engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data and analytics initiatives. We chose to work through a Build Lab, which is a 2–5-day intensive build with a technical customer team.

In this post, we share how we engaged with the AWS Data Lab program to build a scalable and performant data pipeline using EMR Serverless.

Use case

Our forecasting and recommendation algorithm is fed with historical data, which needs to be curated, cleaned, and aggregated. Our solution was based on AWS Glue workflows orchestrating a set of AWS Glue jobs, which worked fine for our requirements. However, as our use case developed, it required more computations and bigger datasets, resulting into unpredictable performance and cost.

This pipeline performs daily extracts from our data warehouse and a few other systems, curates the data, and does some aggregations (such as daily average). Those will be consumed by our internal tools and generate recommendations accordingly. Prior to the engagement, the pipeline was processing 28 days’ worth of historical data in approximately 70 minutes. We wanted to extend that to 100 days and 365 days of data without having to extend the extraction window or factor in the resources configured.

Solution overview

While working with the Data Lab team, we decided to structure our efforts into two approaches. As a short-term improvement, we were looking into optimizing the existing pipeline based on AWS Glue extract, transform, and load (ETL) jobs, orchestrated via AWS Glue workflows. However, for the mid-term to long-term, we looked at EMR Serverless to run our forecasting data pipeline.

EMR Serverless is an option in Amazon EMR that makes it easy and cost-effective for data engineers and analysts to run petabyte-scale data analytics in the cloud. With EMR Serverless, we could run applications built using open-source frameworks such as Apache Spark (as in our case) without having to configure, manage, optimize, or secure clusters. The following factors influenced our decision to use EMR Serverless:

  • Our pipeline had minimal dependency on the AWS Glue context and its features, instead running native Apache Spark
  • EMR Serverless offers configurable drivers and workers
  • With EMR Serverless, we were able to take advantage of its cost tracking feature for applications
  • The need for managing our own Spark History Server was eliminated because EMR Serverless automatically creates a monitoring Spark UI for each job

Therefore, we planned the lab activities to be categorized as follows:

  • Improve the existing code to be more performant and scalable
  • Create an EMR Serverless application and adapt the pipeline
  • Run the entire pipeline with different date intervals

The following solution architecture depicts the high-level components we worked with during the Build Lab.

In the following sections, we dive into the lab implementation in more detail.

Improve the existing code

After examining our code decisions, we identified a step in our pipeline that consumed the most time and resources, and we decided to focus on improving it. Our target job for this optimization was the “Create Moving Average” job, which involves computing various aggregations such as averages, medians, and sums on a moving window. Initially, this step took around 4.7 minutes to process an interval of 28 days. However, running the job for larger datasets proved to be challenging – it didn’t scale well and even resulted in errors in some cases.

While reviewing our code, we focused on several areas, including checking data frames at certain steps to ensure that they contained content before proceeding. Initially, we used the count() API to achieve this, but we discovered that head() was a better alternative because it returns the first n rows only and is faster than count() for large input data. With this change, we were able to save around 15 seconds when processing 28 days’ worth of data. Additionally, we optimized our output writing by using coalesce() instead of repartition().

These changes managed to shave off some time, down to 4 minutes per run. However, we could achieve a better performance by using cache() on data frames before performing the aggregations, which materializes the data frame upon the following transformation. Additionally, we used unpersist() to free up executors’ memory after we were done with the mentioned aggregations. This led to a runtime of approximately 3.5 minutes for this job.

Following the successful code improvements, we managed to extend the data input to 100 days, 1 year, and 3 years. For this specific job, the coalesce() function wasn’t avoiding the shuffle operation and caused uneven data distribution per executor, so we switched back to repartition() for this job. By the end, we managed to get successful runs in 4.7, 12, and 57 minutes, using the same number of workers in AWS Glue (10 standard workers).

Adapt code to EMR Serverless

To observe if running the same job in EMR Serverless would yield better results, we configured an application that uses a comparable number of executors as in AWS Glue jobs. In the job configurations, we used 2 cores and 6 GB of memory for the driver and 20 executors with 4 cores and 16 GB of memory. However, we didn’t use additional ephemeral storage (by default, workers come with free 20 GB).

By the time we had the Build Lab, AWS Glue supported Apache Spark 3.1.1; however, we opted to use Spark 3.2.0 (Amazon EMR version 6.6.0) instead. Additionally, during the Build Lab, only x86_64 EMR Serverless applications were available, although it now also supports arm64-based architecture.

We adapted the code utilizing AWS Glue context to work with native Apache Spark. For instance, we needed to overwrite existing partitions and sync updates with the AWS Glue Data Catalog, especially when old partitions were replaced and new ones were added. We achieved this by setting spark.conf.set("spark.sql.sources.partitionOverwriteMode", "DYNAMIC") and using an MSCK REPAIR query to sync the relevant table. Similarly, we replaced the read and write operations to rely on Apache Spark APIs.

During the tests, we intentionally disabled the fine-grained auto scaling feature of EMR Serverless while running jobs, in order to observe how the code would perform with the same number of workers but different date intervals. We achieved that by setting spark.dynamicAllocation.enabled to disabled (the default is true).

For the same code, number of workers, and data inputs, we managed to get better performance results with EMR Serverless, which were 2.5, 2.9, 6, and 16 minutes for 28 days, 100 days, 1 year, and 3 years, respectively.

Run the entire pipeline with different date intervals

Because the code for our jobs was implemented in a modular fashion, we were able to quickly test all of them with EMR Serverless and then link them together to orchestrate the pipeline via Amazon Managed Workflows for Apache Airflow (Amazon MWAA).

Regarding performance, our previous pipeline using AWS Glue took around 70 minutes to run with our regular workload. However, our new pipeline, powered by Amazon MWAA-backed EMR Serverless, achieved similar results in approximately 60 minutes. Although this is a notable improvement, the most significant benefit was our ability to scale up to process larger amounts of data using the same number of workers. For instance, processing 1 year’s worth of data only took around 107 minutes to complete.

Conclusion and key takeaways

In this post, we outlined the approach taken by the CyberSolutions team in conjunction with the AWS Data Lab to create a high-performing and scalable demand forecasting pipeline. By using optimized Apache Spark jobs on customizable EMR Serverless workers, we were able to surpass the performance of our previous workflow. Specifically, the new setup resulted in 50–72% better performance for most jobs when processing 100 days of data, resulting in an overall cost savings of around 38%.

EMR Serverless applications’ features helped us have better control over cost. For example, we configured the pre-initialized capacity, which resulted in job start times of 1–4 seconds. And we set up the application behavior to start with the first submitted job and automatically stop after a configurable idle time.

As a next step, we are actively testing AWS Graviton2-based EMR applications, which come with more performance gains and lower cost.


About the Authors

 Constantin Scoarță is a Software Engineer at CyberSolutions Tech. He is mainly focused on building data cleaning and forecasting pipelines. In his spare time, he enjoys hiking, cycling, and skiing.

Horațiu Măiereanu is the Head of Python Development at CyberSolutions Tech. His team builds smart microservices for ecommerce retailers to help them improve and automate their workloads. In his free time, he likes hiking and traveling with his family and friends.

Ahmed Ewis is a Solutions Architect at the AWS Data Lab. He helps AWS customers design and build scalable data platforms using AWS database and analytics services. Outside of work, Ahmed enjoys playing with his child and cooking.

Reference guide to build inventory management and forecasting solutions on AWS

Post Syndicated from Jason Dalba original https://aws.amazon.com/blogs/big-data/reference-guide-to-build-inventory-management-and-forecasting-solutions-on-aws/

Inventory management is a critical function for any business that deals with physical products. The primary challenge businesses face with inventory management is balancing the cost of holding inventory with the need to ensure that products are available when customers demand them.

The consequences of poor inventory management can be severe. Overstocking can lead to increased holding costs and waste, while understocking can result in lost sales, reduced customer satisfaction, and damage to the business’s reputation. Inefficient inventory management can also tie up valuable resources, including capital and warehouse space, and can impact profitability.

Forecasting is another critical component of effective inventory management. Accurately predicting demand for products allows businesses to optimize inventory levels, minimize stockouts, and reduce holding costs. However, forecasting can be a complex process, and inaccurate predictions can lead to missed opportunities and lost revenue.

To address these challenges, businesses need an inventory management and forecasting solution that can provide real-time insights into inventory levels, demand trends, and customer behavior. Such a solution should use the latest technologies, including Internet of Things (IoT) sensors, cloud computing, and machine learning (ML), to provide accurate, timely, and actionable data. By implementing such a solution, businesses can improve their inventory management processes, reduce holding costs, increase revenue, and enhance customer satisfaction.

In this post, we discuss how to streamline inventory management forecasting systems with AWS managed analytics, AI/ML, and database services.

Solution overview

In today’s highly competitive business landscape, it’s essential for retailers to optimize their inventory management processes to maximize profitability and improve customer satisfaction. With the proliferation of IoT devices and the abundance of data generated by them, it has become possible to collect real-time data on inventory levels, customer behavior, and other key metrics.

To take advantage of this data and build an effective inventory management and forecasting solution, retailers can use a range of AWS services. By collecting data from store sensors using AWS IoT Core, ingesting it using AWS Lambda to Amazon Aurora Serverless, and transforming it using AWS Glue from a database to an Amazon Simple Storage Service (Amazon S3) data lake, retailers can gain deep insights into their inventory and customer behavior.

With Amazon Athena, retailers can analyze this data to identify trends, patterns, and anomalies, and use Amazon ElastiCache for customer-facing applications with reduced latency. Additionally, by building a point of sales application on Amazon QuickSight, retailers can embed customer 360 views into the application to provide personalized shopping experiences and drive customer loyalty.

Finally, we can use Amazon SageMaker to build forecasting models that can predict inventory demand and optimize stock levels.

With these AWS services, retailers can build an end-to-end inventory management and forecasting solution that provides real-time insights into inventory levels and customer behavior, enabling them to make informed decisions that drive business growth and customer satisfaction.

The following diagram illustrates a sample architecture.

With the appropriate AWS services, your inventory management and forecasting system can have optimized collection, storage, processing, and analysis of data from multiple sources. The solution includes the following components.

Data ingestion and storage

Retail businesses have event-driven data that requires action from downstream processes. It’s critical for an inventory management application to handle the data ingestion and storage for changing demands.

The data ingestion process is typically triggered by an event such as an order being placed, kicking off the inventory management workflow, which requires actions from backend services. Developers are responsible for the operational overhead of trying to maintain the data ingestion load from an event driven-application.

The volume and velocity of data can change in the retail industry each day. Events like Black Friday or a new campaign can create volatile demand in what is required to process and store the inventory data. Serverless services designed to scale to businesses’ needs help reduce the architectural and operational challenges that are driven from high-demand retail applications.

Understanding the scaling challenges that occur when inventory demand spikes, we can deploy Lambda, a serverless, event-driven compute service, to trigger the data ingestion process. As inventory events occur like purchases or returns, Lambda automatically scales compute resources to meet the volume of incoming data.

After Lambda responds to the inventory action request, the updated data is stored in Aurora Serverless. Aurora Serverless is a serverless relational database that is designed to scale to the application’s needs. When peak loads hit during events like Black Friday, Aurora Serverless deploys only the database capacity necessary to meet the workload.

Inventory management applications have ever-changing demands. Deploying serverless services to handle the ingestion and storage of data will not only optimize cost but also reduce the operational overhead for developers, freeing up bandwidth for other critical business needs.

Data performance

Customer-facing applications require low latency to maintain positive user experiences with microsecond response times. ElastiCache, a fully managed, in-memory database, delivers high-performance data retrieval to users.

In-memory caching provided by ElastiCache is used to improve latency and throughput for read-heavy applications that online retailers experience. By storing critical pieces of data in-memory like commonly accessed product information, the application performance improves. Product information is an ideal candidate for a cached store due to data staying relatively the same.

Functionality is often added to retail applications to retrieve trending products. Trending products can be cycled through the cache dependent on customer access patterns. ElastiCache manages the real-time application data caching, allowing your customers to experience microsecond response times while supporting high-throughput handling of hundreds of millions of operations per second.

Data transformation

Data transformation is essential in inventory management and forecasting solutions for both data analysis around sales and inventory, as well as ML for forecasting. This is because raw data from various sources can contain inconsistencies, errors, and missing values that may distort the analysis and forecast results.

In the inventory management and forecasting solution, AWS Glue is recommended for data transformation. The tool addresses issues such as cleaning, restructuring, and consolidating data into a standard format that can be easily analyzed. As a result of the transformation, businesses can obtain a more precise understanding of inventory, sales trends, and customer behavior, influencing data-driven decisions to optimize inventory management and sales strategies. Furthermore, high-quality data is crucial for ML algorithms to make accurate forecasts.

By transforming data, organizations can enhance the accuracy and dependability of their forecasting models, ultimately leading to improved inventory management and cost savings.

Data analysis

Data analysis has become increasingly important for businesses because it allows leaders to make informed operational decisions. However, analyzing large volumes of data can be a time-consuming and resource-intensive task. This is where Athena come in. With Athena, businesses can easily query historical sales and inventory data stored in S3 data lakes and combine it with real-time transactional data from Aurora Serverless databases.

The federated capabilities of Athena allow businesses to generate insights by combining datasets without the need to build ETL (extract, transform, and load) pipelines, saving time and resources. This enables businesses to quickly gain a comprehensive understanding of their inventory and sales trends, which can be used to optimize inventory management and forecasting, ultimately improving operations and increasing profitability.

With Athena’s ease of use and powerful capabilities, businesses can quickly analyze their data and gain valuable insights, driving growth and success without the need for complex ETL pipelines.

Forecasting

Inventory forecasting is an important aspect of inventory management for businesses that deal with physical products. Accurately predicting demand for products can help optimize inventory levels, reduce costs, and improve customer satisfaction. ML can help simplify and improve inventory forecasting by making more accurate predictions based on historical data.

SageMaker is a powerful ML platform that you can use to build, train, and deploy ML models for a wide range of applications, including inventory forecasting. In this solution, we use SageMaker to build and train an ML model for inventory forecasting, covering the basic concepts of ML, the data preparation process, model training and evaluation, and deploying the model for use in a production environment.

The solution also introduces the concept of hierarchical forecasting, which involves generating coherent forecasts that maintain the relationships within the hierarchy or reconciling incoherent forecasts. The workshop provides a step-by-step process for using the training capabilities of SageMaker to carry out hierarchical forecasting using synthetic retail data and the scikit-hts package. The FBProphet model was used along with bottom-up and top-down hierarchical aggregation and disaggregation methods. We used Amazon SageMaker Experiments to train multiple models, and the best model was picked out of the four trained models.

Although the approach was demonstrated on a synthetic retail dataset, you can use the provided code with any time series dataset that exhibits a similar hierarchical structure.

Security and authentication

The solution takes advantage of the scalability, reliability, and security of AWS services to provide a comprehensive inventory management and forecasting solution that can help businesses optimize their inventory levels, reduce holding costs, increase revenue, and enhance customer satisfaction. By incorporating user authentication with Amazon Cognito and Amazon API Gateway, the solution ensures that the system is secure and accessible only by authorized users.

Next steps

The next step to build an inventory management and forecasting solution on AWS would be to go through the Inventory Management workshop. In the workshop, you will get hands-on with AWS managed analytics, AI/ML, and database services to dive deep into an end-to-end inventory management solution. By the end of the workshop, you will have gone through the configuration and deployment of the critical pieces that make up an inventory management system.

Conclusion

In conclusion, building an inventory management and forecasting solution on AWS can help businesses optimize their inventory levels, reduce holding costs, increase revenue, and enhance customer satisfaction. With AWS services like IoT Core, Lambda, Aurora Serverless, AWS Glue, Athena, ElastiCache, QuickSight, SageMaker, and Amazon Cognito, businesses can use scalable, reliable, and secure technologies to collect, store, process, and analyze data from various sources.

The end-to-end solution is designed for individuals in various roles, such as business users, data engineers, data scientists, and data analysts, who are responsible for comprehending, creating, and overseeing processes related to retail inventory forecasting. Overall, an inventory management and forecasting solution on AWS can provide businesses with the insights and tools they need to make data-driven decisions and stay competitive in a constantly evolving retail landscape.


About the Authors

Jason D’Alba is an AWS Solutions Architect leader focused on databases and enterprise applications, helping customers architect highly available and scalable solutions.

Navnit Shukla is an AWS Specialist Solution Architect, Analytics, and is passionate about helping customers uncover insights from their data. He has been building solutions to help organizations make data-driven decisions.

Vetri Natarajan is a Specialist Solutions Architect for Amazon QuickSight. Vetri has 15 years of experience implementing enterprise business intelligence (BI) solutions and greenfield data products. Vetri specializes in integration of BI solutions with business applications and enable data-driven decisions.

Sindhura Palakodety is a Solutions Architect at AWS. She is passionate about helping customers build enterprise-scale Well-Architected solutions on the AWS platform and specializes in Data Analytics domain.

Push Amazon EMR step logs from Amazon EC2 instances to Amazon CloudWatch logs

Post Syndicated from Nausheen Sayed original https://aws.amazon.com/blogs/big-data/push-amazon-emr-step-logs-from-amazon-ec2-instances-to-amazon-cloudwatch-logs/

Amazon EMR is a big data service offered by AWS to run Apache Spark and other open-source applications on AWS to build scalable data pipelines in a cost-effective manner. Monitoring the logs generated from the jobs deployed on EMR clusters is essential to help detect critical issues in real time and identify root causes quickly.

Pushing those logs into Amazon CloudWatch enables you to centralize and drive actionable intelligence from your logs to address operational issues without needing to provision servers or manage software. You can instantly begin writing queries with aggregations, filters, and regular expressions. In addition, you can visualize time series data, drill down into individual log events, and export query results to CloudWatch dashboards.

To ingest logs that are persisted on the Amazon Elastic Compute Cloud (Amazon EC2) instances of an EMR cluster into CloudWatch, you can use the CloudWatch agent. This provides a simple way to push logs from an EC2 instance to CloudWatch.

The CloudWatch agent is a software package that autonomously and continuously runs on your servers. You can install and configure the CloudWatch agent to collect system and application logs from EC2 instances, on-premises hosts, and containerized applications. CloudWatch processes and stores the logs collected by the CloudWatch agent, which further helps with the performance and health monitoring of your infrastructure and applications.

In this post, we create an EMR cluster and centralize the EMR step logs of the jobs in CloudWatch. This will make it easier for you to manage your EMR cluster, troubleshoot issues, and monitor performance. This solution is particularly helpful if you want to use CloudWatch to collect and visualize real-time logs, metrics, and event data, streamlining your infrastructure and application maintenance.

Overview of solution

The solution presented in this post is based on a specific configuration where the EMR step concurrency level is set to 1. This means that only one step is run at a time on the cluster. It’s important to note that if the EMR step concurrency level is set to a value greater than 1, the solution may not work as expected. We highly recommend verifying your EMR step concurrency configuration before implementing the solution presented in this post.

The following diagram illustrates the solution architecture.

Solution Architecture Diagram

The workflow includes the following steps:

  1. Users start an Apache Spark EMR job, creating a step on the EMR cluster. Using Apache Spark, the workload is distributed across the different nodes of the EMR cluster.
  2. In each node (EC2 instance) of the cluster, a CloudWatch agent watches different logs directories, capturing new entries in the log files and pushing them to CloudWatch.
  3. Users can view the step logs accessing the different log groups from the CloudWatch console. The step logs written by Amazon EMR are as follows:
    • controller — Information about the processing of the step. If your step fails while loading, you can find the stack trace in this log.
    • stderr — The standard error channel of Spark while it processes the step.
    • stdout — The standard output channel of Spark while it processes the step.

We provide an AWS CloudFormation template in this post as a general guide. The template demonstrates how to configure a CloudWatch agent on Amazon EMR to push Spark logs to CloudWatch. You can review and customize it as needed to include your Amazon EMR security configurations. As a best practice, we recommend including your Amazon EMR security configurations in the template to encrypt data in transit.

You should also be aware that some of the resources deployed by this stack incur costs when they remain in use.

In the next sections, we go through the following steps:

  1. Create and upload the bootstrap script to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. Use the CloudFormation template to create the following resources:
  3. Monitor the Spark logs on the CloudWatch console.

Prerequisites

This post assumes that you have the following:

Create and upload the bootstrap script to an S3 bucket

For more information, see Uploading objects and Installing and running the CloudWatch agent on your servers.

To create and the upload the bootstrap script, complete the following steps:

  1. Create a local file named bootstrap_cloudwatch_agent.sh with the following content:
    #!/bin/bash
    
    echo -e 'Installing CloudWatch Agent... \n'
    sudo rpm -Uvh --force https://s3.amazonaws.com/amazoncloudwatch-agent/amazon_linux/amd64/latest/amazon-cloudwatch-agent.rpm
    
    echo -e 'Starting CloudWatch Agent... \n'
    sudo amazon-cloudwatch-agent-ctl -a fetch-config -m ec2 -c ssm:AmazonCloudWatch-Config.json -s

  2. On the Amazon S3 console, choose your S3 bucket.
  3. On the Objects tab, choose Upload.
  4. Choose Add files, then choose the bootstrap script.
  5. Choose Upload, then choose the file name: bootstrap_cloudwatch_agent.sh.
  6. Choose Copy S3 URI. We use this value in a later step.

Provision resources with the CloudFormation template

Choose Launch Stack to launch a CloudFormation stack in your account and deploy the template:

This template creates an IAM role, IAM instance profile, Systems Manager parameter, and EMR cluster. The cluster starts the Spark PI estimation example application. You will be billed for the AWS resources used if you create a stack from this template.

The CloudFormation wizard will ask you to modify or provide these parameters:

  • InstanceType – The type of instance for all instance groups. The default is m4.xlarge.
  • InstanceCountCore – The number of instances in the core instance group. The default is 2.
  • EMRReleaseLabel – The Amazon EMR release label you want to use. The default is emr-6.9.0.
  • BootstrapScriptPath – The S3 path of your CloudWatch agent installation bootstrap script that you copied earlier.
  • Subnet – The EC2 subnet where the cluster launches. You must provide this parameter.
  • EC2KeyPairName – An optional EC2 keypair for connecting to cluster nodes, as an alternative to Session Manager.

Monitor the log streams

After the CloudFormation stack deploys successfully, on the CloudWatch console, choose Log groups in the navigation pane. Then filter the log groups by the prefix /aws/emr/master.

choose Log groups in the navigation pane

The ID in the log group corresponds to the EC2 instance ID of the EMR primary node. If you have multiple EMR clusters, you can use this ID to identify a particular EMR cluster, based on the primary node ID.

In the log group, you will find the three different log streams.

In the log group, you will find the three different log streams.

The log streams contain the following information:

  • step-stdout – The standard output channel of Spark while it processes the step.
    The standard output channel of Spark while it processes the step
  • step-stderr – The standard error channel of Spark while it processes the step.
    The standard error channel of Spark while it processes the step.
  • step-controller – Information about the processing of the step. If your step fails while loading, you can find the stack trace in this log.
    Information about the processing of the step.

Clean up

To avoid future charges in your account, delete the resources you created in this walkthrough. The EMR cluster will incur charges as long as the cluster is active, so stop it when you’re done.

  1. On the CloudFormation console, in the navigation pane, choose Stacks.
  2. Choose the stack you launched (EMR-CloudWatch-Demo), then choose Delete.
  3. Empty the S3 bucket you created.
  4. Delete the S3 bucket you created.

Conclusion

Now that you have completed the steps in this walkthrough, you have the CloudWatch agent running on your cluster hosts and configured to push EMR step logs to CloudWatch. With this feature, you can effectively monitor the health and performance of your Spark jobs running on Amazon EMR, detecting critical issues in real time and identifying root causes quickly.

You can package and deploy this solution through a CloudFormation template like this example template, which creates the IAM instance profile role, Systems Manager parameter, and EMR cluster.

To take this further, consider using these logs in CloudWatch alarms for alerts on a log group-metric filter. You could collect them with other alarms into a composite alarm or configure alarm actions such as sending Amazon Simple Notification Service (Amazon SNS) notifications to trigger event-driven processes such as AWS Lambda functions.


About the Author

Ennio Pastore is a Senior Data Architect on the AWS Data Lab team. He is an enthusiast of everything related to new technologies that have a positive impact on businesses and general livelihood. Ennio has over 10 years of experience in data analytics. He helps companies define and implement data platforms across industries, such as telecommunications, banking, gaming, retail, and insurance.

Implement column-level encryption to protect sensitive data in Amazon Redshift with AWS Glue and AWS Lambda user-defined functions

Post Syndicated from Aaron Chong original https://aws.amazon.com/blogs/big-data/implement-column-level-encryption-to-protect-sensitive-data-in-amazon-redshift-with-aws-glue-and-aws-lambda-user-defined-functions/

Amazon Redshift is a massively parallel processing (MPP), fully managed petabyte-scale data warehouse that makes it simple and cost-effective to analyze all your data using existing business intelligence tools.

When businesses are modernizing their data warehousing solutions to Amazon Redshift, implementing additional data protection mechanisms for sensitive data, such as personally identifiable information (PII) or protected health information (PHI), is a common requirement, especially for those in highly regulated industries with strict data security and privacy mandates. Amazon Redshift provides role-based access control, row-level security, column-level security, and dynamic data masking, along with other database security features to enable organizations to enforce fine-grained data security.

Security-sensitive applications often require column-level (or field-level) encryption to enforce fine-grained protection of sensitive data on top of the default server-side encryption (namely data encryption at rest). In other words, sensitive data should be always encrypted on disk and remain encrypted in memory, until users with proper permissions request to decrypt the data. Column-level encryption provides an additional layer of security to protect your sensitive data throughout system processing so that only certain users or applications can access it. This encryption ensures that only authorized principals that need the data, and have the required credentials to decrypt it, are able to do so.

In this post, we demonstrate how you can implement your own column-level encryption mechanism in Amazon Redshift using AWS Glue to encrypt sensitive data before loading data into Amazon Redshift, and using AWS Lambda as a user-defined function (UDF) in Amazon Redshift to decrypt the data using standard SQL statements. Lambda UDFs can be written in any of the programming languages supported by Lambda, such as Java, Go, PowerShell, Node.js, C#, Python, Ruby, or a custom runtime. You can use Lambda UDFs in any SQL statement such as SELECT, UPDATE, INSERT, or DELETE, and in any clause of the SQL statements where scalar functions are allowed.

Solution overview

The following diagram describes the solution architecture.

Architecture Diagram

To illustrate how to set up this architecture, we walk you through the following steps:

  1. We upload a sample data file containing synthetic PII data to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. A sample 256-bit data encryption key is generated and securely stored using AWS Secrets Manager.
  3. An AWS Glue job reads the data file from the S3 bucket, retrieves the data encryption key from Secrets Manager, performs data encryption for the PII columns, and loads the processed dataset into an Amazon Redshift table.
  4. We create a Lambda function to reference the same data encryption key from Secrets Manager, and implement data decryption logic for the received payload data.
  5. The Lambda function is registered as a Lambda UDF with a proper AWS Identity and Access Management (IAM) role that the Amazon Redshift cluster is authorized to assume.
  6. We can validate the data decryption functionality by issuing sample queries using Amazon Redshift Query Editor v2.0. You may optionally choose to test it with your own SQL client or business intelligence tools.

Prerequisites

To deploy the solution, make sure to complete the following prerequisites:

  • Have an AWS account. For this post, you configure the required AWS resources using AWS CloudFormation in the us-east-2 Region.
  • Have an IAM user with permissions to manage AWS resources including Amazon S3, AWS Glue, Amazon Redshift, Secrets Manager, Lambda, and AWS Cloud9.

Deploy the solution using AWS CloudFormation

Provision the required AWS resources using a CloudFormation template by completing the following steps:

  1. Sign in to your AWS account.
  2. Choose Launch Stack:
    Launch Button
  3. Navigate to an AWS Region (for example, us-east-2).
  4. For Stack name, enter a name for the stack or leave as default (aws-blog-redshift-column-level-encryption).
  5. For RedshiftMasterUsername, enter a user name for the admin user account of the Amazon Redshift cluster or leave as default (master).
  6. For RedshiftMasterUserPassword, enter a strong password for the admin user account of the Amazon Redshift cluster.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources.
  8. Choose Create stack.
    Create CloudFormation stack

The CloudFormation stack creation process takes around 5–10 minutes to complete.

  1. When the stack creation is complete, on the stack Outputs tab, record the values of the following:
    1. AWSCloud9IDE
    2. AmazonS3BucketForDataUpload
    3. IAMRoleForRedshiftLambdaUDF
    4. LambdaFunctionName

CloudFormation stack output

Upload the sample data file to Amazon S3

To test the column-level encryption capability, you can download the sample synthetic data generated by Mockaroo. The sample dataset contains synthetic PII and sensitive fields such as phone number, email address, and credit card number. In this post, we demonstrate how to encrypt the credit card number field, but you can apply the same method to other PII fields according to your own requirements.

Sample synthetic data

An AWS Cloud9 instance is provisioned for you during the CloudFormation stack setup. You may access the instance from the AWS Cloud9 console, or by visiting the URL obtained from the CloudFormation stack output with the key AWSCloud9IDE.

CloudFormation stack output for AWSCloud9IDE

On the AWS Cloud9 terminal, copy the sample dataset to your S3 bucket by running the following command:

S3_BUCKET=$(aws s3 ls| awk '{print $3}'| grep awsblog-pii-data-input-)
aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-2274/pii-sample-dataset.csv s3://$S3_BUCKET/

Upload sample dataset to S3

Generate a secret and secure it using Secrets Manager

We generate a 256-bit secret to be used as the data encryption key. Complete the following steps:

  1. Create a new file in the AWS Cloud9 environment.
    Create new file in Cloud9
  2. Enter the following code snippet. We use the cryptography package to create a secret, and use the AWS SDK for Python (Boto3) to securely store the secret value with Secrets Manager:
    from cryptography.fernet import Fernet
    import boto3
    import base64
    
    key = Fernet.generate_key()
    client = boto3.client('secretsmanager')
    
    response = client.create_secret(
        Name='data-encryption-key',
        SecretBinary=base64.urlsafe_b64decode(key)
    )
    
    print(response['ARN'])

  3. Save the file with the file name generate_secret.py (or any desired name ending with .py).
    Save file in Cloud9
  4. Install the required packages by running the following pip install command in the terminal:
    pip install --user boto3
    pip install --user cryptography

  5. Run the Python script via the following command to generate the secret:
    python generate_secret.py

    Run Python script

Create a target table in Amazon Redshift

A single-node Amazon Redshift cluster is provisioned for you during the CloudFormation stack setup. To create the target table for storing the dataset with encrypted PII columns, complete the following steps:

  1. On the Amazon Redshift console, navigate to the list of provisioned clusters, and choose your cluster.
    Amazon Redshift console
  2. To connect to the cluster, on the Query data drop-down menu, choose Query in query editor v2.
    Connect with Query Editor v2
  3. If this is the first time you’re using the Amazon Redshift Query Editor V2, accept the default setting by choosing Configure account.
    Configure account
  4. To connect to the cluster, choose the cluster name.
    Connect to Amazon Redshift cluster
  5. For Database, enter demodb.
  6. For User name, enter master.
  7. For Password, enter your password.

You may need to change the user name and password according to your CloudFormation settings.

  1. Choose Create connection.
    Create Amazon Redshift connection
  2. In the query editor, run the following DDL command to create a table named pii_table:
    CREATE TABLE pii_table(
      id BIGINT,
      full_name VARCHAR(50),
      gender VARCHAR(10),
      job_title VARCHAR(50),
      spoken_language VARCHAR(50),
      contact_phone_number VARCHAR(20),
      email_address VARCHAR(50),
      registered_credit_card VARCHAR(50)
    );

We recommend using the smallest possible column size as a best practice, and you may need to modify these table definitions per your specific use case. Creating columns much larger than necessary will have an impact on the size of data tables and affect query performance.

Create Amazon Redshift table

Create the source and destination Data Catalog tables in AWS Glue

The CloudFormation stack provisioned two AWS Glue data crawlers: one for the Amazon S3 data source and one for the Amazon Redshift data source. To run the crawlers, complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
    AWS Glue Crawlers
  2. Select the crawler named glue-s3-crawler, then choose Run crawler to trigger the crawler job.
    Run Amazon S3 crawler job
  3. Select the crawler named glue-redshift-crawler, then choose Run crawler.
    Run Amazon Redshift crawler job

When the crawlers are complete, navigate to the Tables page to verify your results. You should see two tables registered under the demodb database.

AWS Glue database tables

Author an AWS Glue ETL job to perform data encryption

An AWS Glue job is provisioned for you as part of the CloudFormation stack setup, but the extract, transform, and load (ETL) script has not been created. We create and upload the ETL script to the /glue-script folder under the provisioned S3 bucket in order to run the AWS Glue job.

  1. Return to your AWS Cloud9 environment either via the AWS Cloud9 console, or by visiting the URL obtained from the CloudFormation stack output with the key AWSCloud9IDE.
    CloudFormation stack output for AWSCloud9IDE

We use the Miscreant package for implementing a deterministic encryption using the AES-SIV encryption algorithm, which means that for any given plain text value, the generated encrypted value will be always the same. The benefit of using this encryption approach is to allow for point lookups, equality joins, grouping, and indexing on encrypted columns. However, you should also be aware of the potential security implication when applying deterministic encryption to low-cardinality data, such as gender, boolean values, and status flags.

  1. Create a new file in the AWS Cloud9 environment and enter the following code snippet:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrameCollection
    from awsglue.dynamicframe import DynamicFrame
    
    import boto3
    import base64
    from miscreant.aes.siv import SIV
    from pyspark.sql.functions import udf, col
    from pyspark.sql.types import StringType
    
    args = getResolvedOptions(sys.argv, ["JOB_NAME", "SecretName", "InputTable"])
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)
    
    # retrieve the data encryption key from Secrets Manager
    secret_name = args["SecretName"]
    
    sm_client = boto3.client('secretsmanager')
    get_secret_value_response = sm_client.get_secret_value(SecretId = secret_name)
    data_encryption_key = get_secret_value_response['SecretBinary']
    siv = SIV(data_encryption_key)  # Without nonce, the encryption becomes deterministic
    
    # define the data encryption function
    def pii_encrypt(value):
        if value is None:
            value = ""
        ciphertext = siv.seal(value.encode())
        return base64.b64encode(ciphertext).decode('utf-8')
    
    # register the data encryption function as Spark SQL UDF   
    udf_pii_encrypt = udf(lambda z: pii_encrypt(z), StringType())
    
    # define the Glue Custom Transform function
    def Encrypt_PII (glueContext, dfc) -> DynamicFrameCollection:
        newdf = dfc.select(list(dfc.keys())[0]).toDF()
        
        # PII fields to be encrypted
        pii_col_list = ["registered_credit_card"]
    
        for pii_col_name in pii_col_list:
            newdf = newdf.withColumn(pii_col_name, udf_pii_encrypt(col(pii_col_name)))
    
        encrypteddyc = DynamicFrame.fromDF(newdf, glueContext, "encrypted_data")
        return (DynamicFrameCollection({"CustomTransform0": encrypteddyc}, glueContext))
    
    # Script generated for node S3 bucket
    S3bucket_node1 = glueContext.create_dynamic_frame.from_catalog(
        database="demodb",
        table_name=args["InputTable"],
        transformation_ctx="S3bucket_node1",
    )
    
    # Script generated for node ApplyMapping
    ApplyMapping_node2 = ApplyMapping.apply(
        frame=S3bucket_node1,
        mappings=[
            ("id", "long", "id", "long"),
            ("full_name", "string", "full_name", "string"),
            ("gender", "string", "gender", "string"),
            ("job_title", "string", "job_title", "string"),
            ("spoken_language", "string", "spoken_language", "string"),
            ("contact_phone_number", "string", "contact_phone_number", "string"),
            ("email_address", "string", "email_address", "string"),
            ("registered_credit_card", "long", "registered_credit_card", "string"),
        ],
        transformation_ctx="ApplyMapping_node2",
    )
    
    # Custom Transform
    Customtransform_node = Encrypt_PII(glueContext, DynamicFrameCollection({"ApplyMapping_node2": ApplyMapping_node2}, glueContext))
    
    # Script generated for node Redshift Cluster
    RedshiftCluster_node3 = glueContext.write_dynamic_frame.from_catalog(
        frame=Customtransform_node,
        database="demodb",
        table_name="demodb_public_pii_table",
        redshift_tmp_dir=args["TempDir"],
        transformation_ctx="RedshiftCluster_node3",
    )
    
    job.commit()

  2. Save the script with the file name pii-data-encryption.py.
    Save file in Cloud9
  3. Copy the script to the desired S3 bucket location by running the following command:
    S3_BUCKET=$(aws s3 ls| awk '{print $3}'| grep awsblog-pii-data-input-)
    aws s3 cp pii-data-encryption.py s3://$S3_BUCKET/glue-script/pii-data-encryption.py

    Upload AWS Glue script to S3

  4. To verify the script is uploaded successfully, navigate to the Jobs page on the AWS Glue console.You should be able to find a job named pii-data-encryption-job.
    AWS Glue console
  5. Choose Run to trigger the AWS Glue job.It will first read the source data from the S3 bucket registered in the AWS Glue Data Catalog, then apply column mappings to transform data into the expected data types, followed by performing PII fields encryption, and finally loading the encrypted data into the target Redshift table. The whole process should be completed within 5 minutes for this sample dataset.AWS Glue job scriptYou can switch to the Runs tab to monitor the job status.
    Monitor AWS Glue job

Configure a Lambda function to perform data decryption

A Lambda function with the data decryption logic is deployed for you during the CloudFormation stack setup. You can find the function on the Lambda console.

AWS Lambda console

The following is the Python code used in the Lambda function:

import boto3
import os
import json
import base64
import logging
from miscreant.aes.siv import SIV

logger = logging.getLogger()
logger.setLevel(logging.INFO)

secret_name = os.environ['DATA_ENCRYPT_KEY']

sm_client = boto3.client('secretsmanager')
get_secret_value_response = sm_client.get_secret_value(SecretId = secret_name)
data_encryption_key = get_secret_value_response['SecretBinary']

siv = SIV(data_encryption_key)  # Without nonce, the encryption becomes deterministic

# define lambda function logic
def lambda_handler(event, context):
    ret = dict()
    res = []
    for argument in event['arguments']:
        encrypted_value = argument[0]
        try:
            de_val = siv.open(base64.b64decode(encrypted_value)) # perform decryption
        except:
            de_val = encrypted_value
            logger.warning('Decryption for value failed: ' + str(encrypted_value)) 
        res.append(json.dumps(de_val.decode('utf-8')))

    ret['success'] = True
    ret['results'] = res

    return json.dumps(ret) # return decrypted results

If you want to deploy the Lambda function on your own, make sure to include the Miscreant package in your deployment package.

Register a Lambda UDF in Amazon Redshift

You can create Lambda UDFs that use custom functions defined in Lambda as part of your SQL queries. Lambda UDFs are managed in Lambda, and you can control the access privileges to invoke these UDFs in Amazon Redshift.

  1. Navigate back to the Amazon Redshift Query Editor V2 to register the Lambda UDF.
  2. Use the CREATE EXTERNAL FUNCTION command and provide an IAM role that the Amazon Redshift cluster is authorized to assume and make calls to Lambda:
    CREATE OR REPLACE EXTERNAL FUNCTION pii_decrypt (value varchar(max))
    RETURNS varchar STABLE
    LAMBDA '<--Replace-with-your-lambda-function-name-->'
    IAM_ROLE '<--Replace-with-your-redshift-lambda-iam-role-arn-->';

You can find the Lambda name and Amazon Redshift IAM role on the CloudFormation stack Outputs tab:

  • LambdaFunctionName
  • IAMRoleForRedshiftLambdaUDF

CloudFormation stack output
Create External Function in Amazon Redshift

Validate the column-level encryption functionality in Amazon Redshift

By default, permission to run new Lambda UDFs is granted to PUBLIC. To restrict usage of the newly created UDF, revoke the permission from PUBLIC and then grant the privilege to specific users or groups. To learn more about Lambda UDF security and privileges, see Managing Lambda UDF security and privileges.

You must be a superuser or have the sys:secadmin role to run the following SQL statements:

GRANT SELECT ON "demodb"."public"."pii_table" TO PUBLIC;
CREATE USER regular_user WITH PASSWORD '1234Test!';
CREATE USER privileged_user WITH PASSWORD '1234Test!';
REVOKE EXECUTE ON FUNCTION pii_decrypt(varchar) FROM PUBLIC;
GRANT EXECUTE ON FUNCTION pii_decrypt(varchar) TO privileged_user;

First, we run a SELECT statement to verify that our highly sensitive data field, in this case the registered_credit_card column, is now encrypted in the Amazon Redshift table:

SELECT * FROM "demodb"."public"."pii_table";

Select statement

For regular database users who have not been granted the permission to use the Lambda UDF, they will see a permission denied error when they try to use the pii_decrypt() function:

SET SESSION AUTHORIZATION regular_user;
SELECT *, pii_decrypt(registered_credit_card) AS decrypted_credit_card FROM "demodb"."public"."pii_table";

Permission denied

For privileged database users who have been granted the permission to use the Lambda UDF for decrypting the data, they can issue a SQL statement using the pii_decrypt() function:

SET SESSION AUTHORIZATION privileged_user;
SELECT *, pii_decrypt(registered_credit_card) AS decrypted_credit_card FROM "demodb"."public"."pii_table";

The original registered_credit_card values can be successfully retrieved, as shown in the decrypted_credit_card column.

Decrypted results

Cleaning up

To avoid incurring future charges, make sure to clean up all the AWS resources that you created as part of this post.

You can delete the CloudFormation stack on the AWS CloudFormation console or via the AWS Command Line Interface (AWS CLI). The default stack name is aws-blog-redshift-column-level-encryption.

Conclusion

In this post, we demonstrated how to implement a custom column-level encryption solution for Amazon Redshift, which provides an additional layer of protection for sensitive data stored on the cloud data warehouse. The CloudFormation template gives you an easy way to set up the data pipeline, which you can further customize for your specific business scenarios. You can also modify the AWS Glue ETL code to encrypt multiple data fields at the same time, and to use different data encryption keys for different columns for enhanced data security. With this solution, you can limit the occasions where human actors can access sensitive data stored in plain text on the data warehouse.

You can learn more about this solution and the source code by visiting the GitHub repository. To learn more about how to use Amazon Redshift UDFs to solve different business problems, refer to Example uses of user-defined functions (UDFs) and Amazon Redshift UDFs.


About the Author

Aaron ChongAaron Chong is an Enterprise Solutions Architect at Amazon Web Services Hong Kong. He specializes in the data analytics domain, and works with a wide range of customers to build big data analytics platforms, modernize data engineering practices, and advocate AI/ML democratization.

Create threshold alerts on tables and pivot tables in Amazon QuickSight

Post Syndicated from Lillie Atkins original https://aws.amazon.com/blogs/big-data/create-threshold-alerts-on-tables-and-pivot-tables-in-amazon-quicksight/

Amazon QuickSight previously launched threshold alerts on KPIs and gauge charts. Now, QuickSight supports creating threshold alerts on tables and pivot tables—our most popular visual types. This allows readers and authors to track goals or key performance indicators (KPIs) and be notified via email when they are met. These alerts allow readers and authors to relax and rely on notifications for when their attention is needed. In this post, we share how to create threshold alerts on tables or pivot tables to track important metrics.

Background information

Threshold alerts are a QuickSight Enterprise Edition feature and available for dashboards consumed on the QuickSight website. Threshold alerts aren’t yet available in embedded QuickSight dashboards or on the mobile app.

Alerts are created based on the visual at that point in time and are not affected by potential future changes to the visual’s design. This means the visual can be changed or deleted and the alert continues to work as long as the data in the dataset remains valid. In addition, you can create multiple alerts off of one visual, and rename them as appropriate.

Finally, alerts respect RLS and CLS rules.

Set up an alert on a table or pivot table

Threshold alerts are configured for dashboards. On a dashboard, there are three different ways to create an alert on a table or pivot table.

First, you can create directly from a pivot table or table. You click directly on the cell you would like to create an alert on (if there is another action enabled, you may have to right-click to get this option to show). This needs to be on a numeric value (no dates or strings allow for creation of alerts). Then choose Create Alert to start creating the alert.

Let’s assume you want to track the profit coming from online purchases for auto-related merchandise being shipped first class. Choose the appropriate cell and then choose Create Alert.

Create Alert

You’re presented with the creation pane for alerts. The only difference from KPIs or gauge visual alerts is that here you’ll find the other dimensions in the row that you’re creating the alert on. This will help you identify what value from the table you have selected, because there can be duplicates of the numeric values.

In the following screenshot, the value to track is profit, which currently is $437.39. This is the value that will be compared to the threshold you set. You will also see the dimensions being used to define this alert, which are taken from the row of the table. In this case, the Category is Auto, the Segement is Online, and the Ship Mode is First Class.

Now that you have checked that the value is correct, you can update the name of the alert that is automatically filled with the name of the visual it is created off of, set the condition (between Is above, Is below, and Is equal to), and pick the threshold value, notification frequency, and whether you want to be emailed when there is no data.

In the following example, the alert has been configured so that you will receive an email when the profit is above the threshold of $1,000. You’ve also left the notification frquency at Daily at most and haven’t requested to be emailed when there is no data.

If you have a date field, you also will see an option to control the date. This will automatically set the date field to be the most recent of whatever aggregation you’re looking at, such as hour, week, day, month. However, you could override to use the specific date applied to the value you have selected if you would prefer.

Below is an example where the data was aggregated based on the week and so Latest Week has been selected rather than the historical Week of Jan 4, 2015.

You can then choose Save if you’re happy with the alert and it will load the Manage alert pane.

The Create Alert button is also at the bottom of the pane. This is the second way you can start creating an alert off of a table or pivot table.

You can also get to this pane from the upper right alert button on the dashboard.

Create Alert through the icon on dashboard

If you have no alerts, this will automatically drop you into the creation pane. There you will be asked to select a visual that supports alerts to begin creating an alert. If you already have alerts (as previously demonstrated), then all you need to do is choose Create Alert.

Then select a visual and choose Next.

You’re prompted to select a cell if you have picked a table or pivot table visual.

Then you repeat the same steps as creating off a cell within a table or pivot table.

Finally, you can start creating an alert from the bell icon on the pivot table or table. This is the third way to create an alert.

bell icon

You’ll be prompted to select a cell from the table, and the creation pane appears.

After you choose the cell that you want to track, you start the creation process just like the first two examples.

Update and delete alerts

To update or delete an alert, you need to navigate back to the Manage alerts pane. You get there from the bell icon on the top right corner of the dashboard.

Create Alert through the icon on dashboard

You can then choose the options menu (three dots) on the alert you want to manage. Three options appear: Edit alert, View history (to view recent times the alert has breached and notified you), and Delete.

Notifications

You’ll receive an email when your alert breaches the rule you set. The following is an example of what that looks like (the alert has been adjusted to be alerted if profit is over $100 and to be notified as frequently as possible).

notification if alert is breached

The current profit breach is highlighted and the historical numbers are shown along with the date and time of the recorded breaches. You can also navigate to the dashboard by choosing View Dashboard.

Evaluate alerts

The evaluation schedule for threshold alerts is based on the dataset. For SPICE datasets, alert rules are checked against the data after a successful data refresh. With datasets querying your data sources directly, alerts are evaluated daily at a random time between 6:00 PM and 8:00 AM. This is based on the the timezone of the AWS Region your dataset was created in. Dataset owners can set up their own schedules for checking alerts and increase the frequency up to hourly (to learn more, refer to Working with threshold alerts in Amazon QuickSight).

Restrict alerts

The admin for the QuickSight account can restrict who has access to set threshold alerts through custom permissions. For more information, see the section Customizing user permissions in Embed multi-tenant analytics in applications with Amazon QuickSight.

Pricing

Threshold alerts are billed for each evaluation, and follow the familiar pricing used for anomaly detection, starting at $0.50 per 1,000 evaluations. For example, if you set up an alert on a SPICE dataset that refreshes daily, you have 30 evaluations of the alert rule in a month, which costs 30 * $0.5/1000 = $0.015 in a month. For more information, refer to Amazon QuickSight Pricing.

Conclusion

In this post, you learned how to create threshold alerts on tables and pivot tables within QuickSight dashboards so that you can track important metrics. For more information about how to create threshold alerts on KPIs or gauge charts, refer to Create threshold-based alerts in Amazon QuickSight. Additional information is available in the Amazon QuickSight User Guide.


About the Author

Lillie Atkins is a Product Manager for Amazon QuickSight, Amazon Web Service’s cloud-native, fully managed BI service.

Generic orchestration framework for data warehousing workloads using Amazon Redshift RSQL

Post Syndicated from Akhil B original https://aws.amazon.com/blogs/big-data/generic-orchestration-framework-for-data-warehousing-workloads-using-amazon-redshift-rsql/

Tens of thousands of customers run business-critical workloads on Amazon Redshift, AWS’s fast, petabyte-scale cloud data warehouse delivering the best price-performance. With Amazon Redshift, you can query data across your data warehouse, operational data stores, and data lake using standard SQL. You can also integrate AWS services like Amazon EMR, Amazon Athena, Amazon SageMaker, AWS Glue, AWS Lake Formation, and Amazon Kinesis to take advantage of all of the analytic capabilities in the AWS Cloud.

Amazon Redshift RSQL is a native command-line client for interacting with Amazon Redshift clusters and databases. You can connect to an Amazon Redshift cluster, describe database objects, query data, and view query results in various output formats. You can use Amazon Redshift RSQL to replace existing extract, transform, load (ETL) and automation scripts, such as Teradata BTEQ scripts. You can wrap Amazon Redshift RSQL statements within a shell script to replicate existing functionality in the on-premise systems. Amazon Redshift RSQL is available for Linux, Windows, and macOS operating systems.

This post explains how you can create a generic configuration-driven orchestration framework using AWS Step Functions, Amazon Elastic Compute Cloud (Amazon EC2), AWS Lambda, Amazon DynamoDB, and AWS Systems Manager to orchestrate RSQL-based ETL workloads. If you’re migrating from legacy data warehouse workloads to Amazon Redshift, you can use this methodology to orchestrate your data warehousing workloads.

Solution overview

Customers migrating from legacy data warehouses to Amazon Redshift may have a significant investment in proprietary scripts like Basic Teradata Query (BTEQ) scripting for database automation, ETL, or other tasks. You can now use the AWS Schema Conversion Tool (AWS SCT) to automatically convert proprietary scripts like BTEQ scripts to Amazon Redshift RSQL scripts. The converted scripts run on Amazon Redshift with little to no changes. To learn about new options for database scripting, refer to Accelerate your data warehouse migration to Amazon Redshift – Part 4.

During such migrations, you may also want to modernize your current on-premises, third-party orchestration tools with a cloud-native framework to replicate and enhance your current orchestration capability. Orchestrating data warehouse workloads includes scheduling the jobs, checking if the pre-conditions have been met, running the business logic embedded within RSQL, monitoring the status of the jobs, and alerting if there are any failures.

This solution allows on-premises customers to migrate to a cloud-native orchestration framework that uses AWS serverless services such as Step Functions, Lambda, DynamoDB, and Systems Manager to run the Amazon Redshift RSQL jobs deployed on a persistent EC2 instance. You can also deploy the solution for greenfield implementations. In addition to meeting functional requirements, this solution also provides full auditing, logging, and monitoring of all ETL and ELT processes that are run.

To ensure high availability and resilience, you can use multiple EC2 instances that are a part of an auto scaling group along with Amazon Elastic File System (Amazon EFS) to deploy and run the RSQL jobs. When using auto scaling groups, you can install RSQL onto the EC2 instance as a part of the bootstrap script. You can also deploy the Amazon Redshift RSQL scripts onto the EC2 instance using AWS CodePipeline and AWS CodeDeploy. For more details, refer to Auto Scaling groups, the Amazon EFT User Guide, and Integrating CodeDeploy with Amazon EC2 Auto Scaling.

The following diagram illustrates the architecture of the orchestration framework.

Architecture Diagram

The key components of the framework are as follows:

  1. Amazon EventBridge is used as the ETL workflow scheduler, and it triggers a Lambda function at a preset schedule.
  2. The function queries a DynamoDB table for the configuration associated to the RSQL job and queries the status of the job, run mode, and restart information for that job.
  3. After receiving the configuration, the function triggers a Step Functions state machine by passing the configuration details.
  4. Step Functions starts running different stages (like configuration iteration, run type check, and more) of the workflow.
  5. Step Functions uses the Systems Manager SendCommand API to trigger the RSQL job and goes into a paused state with TaskToken. The RSQL scripts are persisted on an EC2 instance and are wrapped in a shell script. Systems Manager runs an AWS-RunShellScript SSM document to run the RSQL job on the EC2 instance.
  6. The RSQL job performs ETL and ELT operations on the Amazon Redshift cluster. When it’s complete, it returns a success/failure code and status message back to the calling shell script.
  7. The shell script calls a custom Python module with the success/failure code, status message, and the callwait TaskToken that was received from Step Functions. The Python module logs the RSQL job status in the job audit DynamoDB audit table, and exports logs to the Amazon CloudWatch log group.
  8. The Python module then performs a SendTaskSuccess or SendTaskFailure API call based on the RSQL job run status. Based on the status of the RSQL job, Step Functions either resumes the flow or stops with failure.
  9. Step Functions logs the workflow status (success or failure) in the DynamoDB workflow audit table.

Prerequisites

You should have the following prerequisites:

Deploy AWS CDK stacks

Complete the following steps to deploy your resources using the AWS CDK:

  1. Clone the GitHub repo:
    git clone https://github.com/aws-samples/amazon-redshift-rsql-orchestration-framework.git

  2. Update the following the environment parameters in cdk.json (this file can be found in the infra directory):
    1. ec2_instance_id – The EC2 instance ID on which RSQL jobs are deployed
    2. redshift_secret_id – The name of the Secrets Manager key that stores the Amazon Redshift database credentials
    3. rsql_script_path – The absolute directory path in the EC2 instance where the RSQL jobs are stored
    4. rsql_log_path – The absolute directory path in the EC2 instance used for storing the RSQL job logs
    5. rsql_script_wrapper – The absolute directory path of the RSQL wrapper script (rsql_trigger.sh) on the EC2 instance.

    The following is a sample cdk.json file after being populated with the parameters

        "environment": {
          "ec2_instance_id" : "i-xxxx",
          "redshift_secret_id" : "blog-secret",
          "rsql_script_path" : "/home/ec2-user/blog_test/rsql_scripts/",
          "rsql_log_path" : "/home/ec2-user/blog_test/logs/",
          "rsql_script_wrapper" : "/home/ec2-user/blog_test/instance_code/rsql_trigger.sh"
        }
    

  3. Deploy the AWS CDK stack with the following code:
    cd amazon-redshift-rsql-orchestration-framework/lambdas/lambda-layer/
    sh zip_lambda_layer.sh
    cd ../../infra/
    python3 -m venv ./venv
    source .venv/bin/activate
    pip install -r requirements.txt
    cdk bootstrap <AWS Account ID>/<AWS Region>
    cdk deploy --all

Let’s look at the resources the AWS CDK stack deploys in more detail.

CloudWatch log group

A CloudWatch log group (/ops/rsql-logs/) is created, which is used to store, monitor, and access log files from EC2 instances and other sources.

The log group is used to store the RSQL job run logs. For each RSQL script, all the stdout and stderr logs are stored as a log stream within this log group.

DynamoDB configuration table

The DynamoDB configuration table (rsql-blog-rsql-config-table) is the basic building block of this solution. All the RSQL jobs, restart information and run mode (sequential or parallel), and sequence in which the jobs are to be run are stored in this configuration table.

The table has the following structure:

  • workflow_id – The identifier for the RSQL-based ETL workflow.
  • workflow_description – The description for the RSQL-based ETL workflow.
  • workflow_stages – The sequence of stages within a workflow.
  • execution_type – The type of run for RSQL jobs (sequential or parallel).
  • stage_description – The description for the stage.
  • scripts – The list of RSQL scripts to be run. The RSQL scripts must be placed in the location defined in a later step.

The following is an example of an entry in the configuration table. You can see the workflow_id is blog_test_workflow and the description is Test Workflow for Blog.

It has three stages that are triggered in the following order: Schema & Table Creation Stage, Data Insertion Stage 1, and Data Insertion Stage 2. The stage Schema & Table Creation Stage has two RSQL jobs running sequentially, and Data Insertion Stage 1 and Data Insertion Stage 2 each have two jobs running in parallel.

{
	"workflow_id": "blog_test_workflow",
	"workflow_description": "Test Workflow for Blog",
	"workflow_stages": [{
			"execution_flag": "y",
			"execution_type": "sequential",
			"scripts": [
				"rsql_blog_script_1.sh",
				"rsql_blog_script_2.sh"
			],
			"stage_description": "Schema & Table Creation Stage"
		},
		{
			"execution_flag": "y",
			"execution_type": "parallel",
			"scripts": [
				"rsql_blog_script_3.sh",
				"rsql_blog_script_4.sh"
			],
			"stage_description": "Data Insertion Stage 1"
		},
		{
			"execution_flag": "y",
			"execution_type": "parallel",
			"scripts": [
				"rsql_blog_script_5.sh",
				"rsql_blog_script_6.sh"
			],
			"stage_description": "Data Insertion Stage 2"
		}
	]
}

DynamoDB audit tables

The audit tables store the run details for each RSQL job within the ETL workflow with a unique identifier for monitoring and reporting purposes. The reason why there are two audit tables is because one table stores the audit information at a RSQL job level and the other stores it at a workflow level.

The job audit table (rsql-blog-rsql-job-audit-table) has the following structure:

  • job_name – The name of the RSQL script
  • workflow_execution_id – The run ID for the workflow
  • execution_start_ts – The start timestamp for the RSQL job
  • execution_end_ts – The end timestamp for the RSQL job
  • execution_status – The run status of the RSQL job (Running, Completed, Failed)
  • instance_id – The EC2 instance ID on which the RSQL job is run
  • ssm_command_id – The Systems Manager command ID used to trigger the RSQL job
  • workflow_id – The workflow_id under which the RSQL job is run

The workflow audit table (rsql-blog-rsql-workflow-audit-table) has the following structure:

  • workflow_execution_id – The run ID for the workflow
  • workflow_id – The identifier for a particular workflow
  • execution_start_ts – The start timestamp for the workflow
  • execution_status – The run status of the workflow or state machine (Running, Completed, Failed)
  • rsql_jobs – The list of RSQL scripts that are a part of the workflow
  • execution_end_ts – The end timestamp for the workflow

Lambda functions

The AWS CDK creates the Lambda functions that retrieve the config data from the DynamoDB config table, update the audit details in DynamoDB, trigger the RSQL scripts on the EC2 instance, and iterate through each stage. The following is a list of the functions:

  • rsql-blog-master-iterator-lambda
  • rsql-blog-parallel-load-check-lambda
  • rsql-blog-sequential-iterator-lambda
  • rsql-blog-rsql-invoke-lambda
  • rsql-blog-update-audit-ddb-lambda

Step Functions state machines

This solution implements a Step Functions callback task integration pattern that enables Step Functions workflows to send a token to an external system via multiple AWS services.

The AWS CDK deploys the following state machines:

  • RSQLParallelStateMachine – The parallel state machine is triggered if the execution_type for a stage in the configuration table is set to parallel. The Lambda function with a callback token is triggered in parallel for each of the RSQL scripts using a Map state.
  • RSQLSequentialStateMachine – The sequential state machine is triggered if the execution_type for a stage in the configuration table is set to sequential. This state machine uses a iterator design pattern to run each RSQL job within the stage as per the sequence mentioned in the configuration.
  • RSQLMasterStatemachine – The primary state machine iterates through each stage and triggers different state machines based on the run mode (sequential or parallel) using a Choice state.

Move the RSQL script and instance code

Copy the instance_code and rsql_scripts directories (present in the GitHub repo) to the EC2 instance. Make sure the framework directory within instance_code is copied as well.

The following screenshots show that the instance_code and rsql_scripts directories are copied to the same parent folder on the EC2 instance.

Instance Code Scripts Image
Instance Code EC2 Copy Image
RSQL Script Image
RSQL Script EC2 Copy Image

RSQL script run workflow

To further illustrate the mechanism to run the RSQL scripts, see the following diagram.

RSQL Script Workflow Diagram

The Lambda function, which gets the configuration details from the configuration DynamoDB table, triggers the Step Functions workflow, which performs the following steps:

  1. A Lambda function defined as a workflow step receives the Step Functions TaskToken and configuration details.
  2. The TaskToken and configuration details are passed onto the EC2 instance using the Systems Manger SendCommand API call. After the Lambda function is run, the workflow branch goes into paused state and waits for a callback token.
  3. The RSQL scripts are run on the EC2 instance, which perform ETL and ELT on Amazon Redshift. After the scripts are run, the RSQL script passes the completion status and TaskToken to a Python script. This Python script is embedded within the RSQL script.
  4. The Python script updates the RSQL job status (success/failure) in the job audit DynamoDB table. It also exports the RSQL job logs to the CloudWatch log group.
  5. The Python script passes the RSQL job status (success/failure) and the status message back to the Step Functions workflow along with TaskToken using the SendTaskSuccess or SendTaskFailure API call.
  6. Depending on the job status received, Step Functions either resumes the workflow or stops the workflow.

If EC2 auto scaling groups are used, then you can use the Systems Manager SendCommand to ensure resilience and high availability by specifying one or more EC2 instances (that are a part of the auto scaling group). For more information, refer to Run commands at scale.

When multiple EC2 instances are used, set the max-concurrency parameter of the RunCommand API call to 1, which makes sure that the RSQL job is triggered on only one EC2 instance. For further details, refer to Using concurrency controls.

Run the orchestration framework

To run the orchestration framework, complete the following steps:

  1. On the DynamoDB console, navigate to the configuration table and insert the configuration details provided earlier. For instructions on how to insert the example JSON configuration details, refer to Write data to a table using the console or AWS CLI.DynamoDB Config Insertion
  2. On the Lambda console, open the rsql-blog-rsql-workflow-trigger-lambda function and choose Test.Workflow Trigger Lambda Function
  3. Add the test event similar to the following code and choose Test:
    {
    	"workflow_id": "blog_test_workflow",
    	"workflow_execution_id": "demo_test_26"
    }

    Workflow Trigger Lambda function Payload

  4. On the Step Functions console, navigate to the rsql-master-state-machine function to open the details page.RSQL Master Step Function
  5. Choose Edit, then choose Workflow Studio New. The following screenshot shows the primary state machine.RSQL Master Step Function Flow
  6. Choose Cancel to leave Workflow Studio, then choose Cancel again to leave edit mode. You’re directed back to the details page.
    RSQL Master Step Function Details
  7. On the Executions tab, choose the latest run.
    RSQL Master Step Function Execution
  8. From the Graph view, you can check the status of each state by choosing it. Every state that uses an external resource has a link to it on the Details tab.RSQL Master Step Function Execution Graph
  9. The orchestration framework runs the ETL load, which consists of the following sample RSQL scripts:
    • rsql_blog_script_1.sh – This script creates a schema rsql_blog within the database
    • rsql_blog_script_2.sh – This script creates a table blog_table within the schema created in the earlier script
    • rsql_blog_script_3.sh – Inserts one row into the table created in the previous script
    • rsql_blog_script_4.sh – Inserts one row into the table created in the previous script
    • rsql_blog_script_5.sh – Inserts one row into the table created in the previous script
    • rsql_blog_script_6.sh – Inserts one row into the table created in the previous script

You need to replace these RSQL scripts with the RSQL scripts developed for your workloads by inserting the relevant configuration details into the configuration DynamoDB table (rsql-blog-rsql-config-table).

Validation

After you run the framework, you’ll find a schema (called rsql_blog) with one table (called blog_table) created. This table consists of four rows.

RSQL Execution Table

You can check the logs of the RSQL job in the CloudWatch log group (/ops/rsql-logs/) and also the run status of the workflow in the workflow audit DynamoDB table (rsql-blog-rsql-workflow-audit-table).

RSQL Script CloudWatch Logs
RSQL Workflow Audit Record

Clean up

To avoid ongoing charges for the resources that you created, delete them. AWS CDK deletes all resources except data resources such as DynamoDB tables.

  • First, delete all AWS CDK stacks
    cdk destroy --all

  • On the DynamoDB console, select the following tables and delete them:
    • rsql-blog-rsql-config-table
    • rsql-blog-rsql-job-audit-table
    • rsql-blog-rsql-workflow-audit-table

Conclusion

You can use Amazon Redshift RSQL, Systems Manager, EC2 instances, and Step Functions to create a modern and cost-effective orchestration framework for ETL workflows. There is no overhead to create and manage different state machines for each of your ETL workflow. In this post, we demonstrated how to use this configuration-based generic orchestration framework to trigger complex RSQL-based ETL workflows.

You can also trigger an email notification through Amazon Simple Notification Service (Amazon SNS) within the state machine to the notify the operations team of the completion status of the ETL process. Further, you can achieve a event-driven ETL orchestration framework by using EventBridge to start the workflow trigger lambda function.


About the Authors

Akhil is a Data Analytics Consultant at AWS Professional Services. He helps customers design & build scalable data analytics solutions and migrate data pipelines and data warehouses to AWS. In his spare time, he loves travelling, playing games and watching movies.


Ramesh Raghupathy is a Senior Data Architect with WWCO ProServe at AWS. He works with AWS customers to architect, deploy, and migrate to data warehouses and data lakes on the AWS Cloud. While not at work, Ramesh enjoys traveling, spending time with family, and yoga.

Raza Hafeez is a Senior Data Architect within the Shared Delivery Practice of AWS Professional Services. He has over 12 years of professional experience building and optimizing enterprise data warehouses and is passionate about enabling customers to realize the power of their data. He specializes in migrating enterprise data warehouses to AWS Modern Data Architecture.

Dipal Mahajan is a Lead Consultant with Amazon Web Services based out of India, where he guides global customers to build highly secure, scalable, reliable, and cost-efficient applications on the cloud. He brings extensive experience on Software Development, Architecture and Analytics from industries like finance, telecom, retail and healthcare.