Tag Archives: Analytics

Build a serverless streaming pipeline with Amazon MSK Serverless, Amazon MSK Connect, and MongoDB Atlas

Post Syndicated from Igor Alekseev original https://aws.amazon.com/blogs/big-data/build-a-serverless-streaming-pipeline-with-amazon-msk-serverless-amazon-msk-connect-and-mongodb-atlas/

This post was cowritten with Babu Srinivasan and Robert Walters from MongoDB.

Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed, highly available Apache Kafka service. Amazon MSK makes it easy to ingest and process streaming data in real time and use that data easily within the AWS ecosystem. With Amazon MSK Serverless, you can automatically provision and manage required resources to provide on-demand streaming capacity and storage for your applications.

Amazon MSK also supports integration of data sources such as MongoDB Atlas via Amazon MSK Connect. MSK Connect allows serverless integration of MongoDB data with Amazon MSK using the MongoDB Connector for Apache Kafka.

MongoDB Atlas Serverless provides database services that dynamically scale up and down with data size and throughput—and the cost scales accordingly. It’s best suited for applications with variable demands to be managed with minimal configuration. It provides high performance and reliability with automated upgrade, encryption, security, metrics, and backup features built in with the MongoDB Atlas infrastructure.

MSK Serverless is a type of cluster for Amazon MSK. Just like MongoDB Atlas Serverless, MSK Serverless automatically provisions and scales compute and storage resources. You can now create end-to-end serverless workflows. You can build a serverless streaming pipeline with serverless ingestion using MSK Serverless and serverless storage using MongoDB Atlas. In addition, MSK Connect now supports private DNS hostnames. This allows Serverless MSK instances to connect to Serverless MongoDB clusters via AWS PrivateLink, providing you with secure connectivity between platforms.

If you’re interested in using a non-serverless cluster, refer to Integrating MongoDB with Amazon Managed Streaming for Apache Kafka (MSK).

This post demonstrates how to implement a serverless streaming pipeline with MSK Serverless, MSK Connect, and MongoDB Atlas.

Solution overview

The following diagram illustrates our solution architecture.

Data flow between AWS MSK and MongoDB Atlas

The data flow starts with an Amazon Elastic Compute Cloud (Amazon EC2) client instance that writes records to an MSK topic. As data arrives, an instance of the MongoDB Connector for Apache Kafka writes the data to a collection in the MongoDB Atlas Serverless cluster. For secure connectivity between the two platforms, an AWS PrivateLink connection is created between the MongoDB Atlas cluster and the VPC containing the MSK instance.

This post walks you through the following steps:

  1. Create the serverless MSK cluster.
  2. Create the MongoDB Atlas Serverless cluster.
  3. Configure the MSK plugin.
  4. Create the EC2 client.
  5. Configure an MSK topic.
  6. Configure the MongoDB Connector for Apache Kafka as a sink.

Configure the serverless MSK cluster

To create a serverless MSK cluster, complete the following steps:

  1. On the Amazon MSK console, choose Clusters in the navigation pane.
  2. Choose Create cluster.
  3. For Creation method, select Custom create.
  4. For Cluster name, enter MongoDBMSKCluster.
  5. For Cluster type¸ select Serverless.
  6. Choose Next.Serverless MSK Cluster creation UI
  7. On the Networking page, specify your VPC, Availability Zones, and corresponding subnets.
  8. Note the Availability Zones and subnets to use later.Cluster settings showing VPC and Subnets
  9. Choose Next.
  10. Choose Create cluster.

When the cluster is available, its status becomes Active.

Cluster Available for Use

Create the MongoDB Atlas Serverless cluster

To create a MongoDB Atlas cluster, follow the Getting Started with Atlas tutorial. Note that for the purposes of this post, you need to create a serverless instance.

Create new cluster dialog

After the cluster is created, configure an AWS private endpoint with the following steps:

  1. On the Security menu, choose Network Access.Network Access location in the Security menu
  2. On the Private Endpoint tab, choose Serverless Instance.
    Serverless Instance network access
  3. Choose Create new endpoint.
  4. For Serverless Instance, choose the instance you just created.
  5. Choose Confirm.Create Private Endpoint UI
  6. Provide your VPC endpoint configuration and choose Next.VPC Endpoint Configuration UI
  7. When creating the AWS PrivateLink resource, make sure you specify the exact same VPC and subnets that you used earlier when creating the networking configuration for the serverless MSK instance.
  8. Choose Next.VPC Endpoint Subnet Configuration UI
  9. Follow the instructions on the Finalize page, then choose Confirm after your VPC endpoint is created.

Upon success, the new private endpoint will show up in the list, as shown in the following screenshot.

Network Access Confirmation Page

Configure the MSK Plugin

Next, we create a custom plugin in Amazon MSK using the MongoDB Connector for Apache Kafka. The connector needs to be uploaded to an Amazon Simple Storage Service (Amazon S3) bucket before you can create the plugin. To download the MongoDB Connector for Apache Kafka, refer to Download a Connector JAR File.

  1. On the Amazon MSK console, choose Customized plugins in the navigation pane.
  2. Choose Create custom plugin.
  3. For S3 URI, enter the S3 location of the downloaded connector.
  4. Choose Create custom plugin.

MSK plugin details

Configure an EC2 client

Next, let’s configure an EC2 instance. We use this instance to create the topic and insert data into the topic. For instructions, refer to the section Configure an EC2 client in the post Integrating MongoDB with Amazon Managed Streaming for Apache Kafka (MSK).

Create a topic on the MSK cluster

To create a Kafka topic, we need to install the Kafka CLI first.

  1. On the client EC2 instance, first install Java:

sudo yum install java-1.8.0

  1. Next, run the following command to download Apache Kafka:

wget https://archive.apache.org/dist/kafka/2.6.2/kafka_2.12-2.6.2.tgz

  1. Unpack the tar file using the following command:

tar -xzf kafka_2.12-2.6.2.tgz

The distribution of Kafka includes a bin folder with tools that can be used to manage topics.

  1. Go to the kafka_2.12-2.6.2 directory and issue the following command to create a Kafka topic on the serverless MSK cluster:

bin/kafka-topics.sh --create --topic sandbox_sync2 --bootstrap-server <BOOTSTRAP SERVER> --command-config=bin/client.properties --partitions 2

You can copy the bootstrap server endpoint on the View Client Information page for your serverless MSK cluster.

Bootstrap Server Connection Page

You can configure IAM authentication by following these instructions.

Configure the sink connector

Now, let’s configure a sink connector to send the data to the MongoDB Atlas Serverless instance.

  1. On the Amazon MSK console, choose Connectors in the navigation pane.
  2. Choose Create connector.
  3. Select the plugin you created earlier.
  4. Choose Next.Sink Connector UI
  5. Select the serverless MSK instance that you created earlier.
  6. Enter your connection configuration as the following code:
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
key.converter.schema.enable=false
value.converter.schema.enable=false
database=MongoDBMSKDemo
collection=Sink
tasks.max=1
topics=MongoDBMSKDemo.Source
connection.uri=(MongoDB Atlas Connection String Gos Here) 
value.converter=org.apache.kafka.connect.storage.StringConverter 
key.converter=org.apache.kafka.connect.storage.StringConverter

Make sure that the connection to the MongoDB Atlas Serverless instance is through AWS PrivateLink. For more information, refer to Connecting Applications Securely to a MongoDB Atlas Data Plane with AWS PrivateLink.

  1. In the Access Permissions section, create an AWS Identity and Access Management (IAM) role with the required trust policy.
  2. Choose Next.IAM Role configuration
  3. Specify Amazon CloudWatch Logs as your log delivery option.
  4. Complete your connector.

When the connector status changes to Active, the pipeline is ready.

Connector Confirmation Page

Insert data into the MSK topic

On your EC2 client, insert data into the MSK topic using the kafka-console-producer as follows:

bin/kafka-console-producer.sh --topic sandbox_sync2 --bootstrap-server <BOOTSTRAP SERVER> --producer.config=bin/client.properties

To verify that data successfully flows from the Kafka topic to the serverless MongoDB cluster, we use the MongoDB Atlas UI.

MongoDB Atlas Browse Collections UI

If you run into any issues, be sure to check the log files. In this example, we used CloudWatch to read the events that were generated from Amazon MSK and the MongoDB Connector for Apache Kafka.

CloudWatch Logs UI

Clean up

To avoid incurring future charges, clean up the resources you created. First, delete the MSK cluster, connector, and EC2 instance:

  1. On the Amazon MSK console, choose Clusters in the navigation pane.
  2. Select your cluster and on the Actions menu, choose Delete.
  3. Choose Connectors in the navigation pane.
  4. Select your connector and choose Delete.
  5. Choose Customized plugins in the navigation pane.
  6. Select your plugin and choose Delete.
  7. On the Amazon EC2 console, choose Instances in the navigation pane.
  8. Choose the instance you created.
  9. Choose Instance state, then choose Terminate instance.
  10. On the Amazon VPC console, choose Endpoints in the navigation pane.
  11. Select the endpoint you created and on the Actions menu, choose Delete VPC endpoints.

Now you can delete the Atlas cluster and AWS PrivateLink:

  1. Log in to the Atlas cluster console.
  2. Navigate to the serverless cluster to be deleted.
  3. On the options drop-down menu, choose Terminate.
  4. Navigate to the Network Access section.
  5. Choose the private endpoint.
  6. Select the serverless instance.
  7. On the options drop-down menu, choose Terminate.Endpoint Termination UI

Summary

In this post, we showed you how to build a serverless streaming ingestion pipeline using MSK Serverless and MongoDB Atlas Serverless. With MSK Serverless, you can automatically provision and manage required resources on an as-needed basis. We used a MongoDB connector deployed on MSK Connect to seamlessly integrate the two services, and used an EC2 client to send sample data to the MSK topic. MSK Connect now supports Private DNS hostnames, enabling you to use private domain names between the services. In this post, the connector used the default DNS servers of the VPC to resolve the Availability Zone-specific private DNS name. This AWS PrivateLink configuration allowed secure and private connectivity between the MSK Serverless instance and the MongoDB Atlas Serverless instance.

To continue your learning, check out the following resources:


About the Authors

Igor Alekseev is a Senior Partner Solution Architect at AWS in Data and Analytics domain. In his role Igor is working with strategic partners helping them build complex, AWS-optimized architectures. Prior joining AWS, as a Data/Solution Architect he implemented many projects in Big Data domain, including several data lakes in Hadoop ecosystem. As a Data Engineer he was involved in applying AI/ML to fraud detection and office automation.

Kiran Matty is a Principal Product Manager with Amazon Web Services (AWS) and works with the Amazon Managed Streaming for Apache Kafka (Amazon MSK) team based out of Palo Alto, California. He is passionate about building performant streaming and analytical services that help enterprises realize their critical use cases.

 Babu Srinivasan is a Senior Partner Solutions Architect at MongoDB. In his current role, he is working with AWS to build the technical integrations and reference architectures for the AWS and MongoDB solutions. He has more than two decades of experience in Database and Cloud technologies . He is passionate about providing technical solutions to customers working with multiple Global System Integrators(GSIs) across multiple geographies.

Robert Walters is currently a Senior Product Manager at MongoDB. Previous to MongoDB, Rob spent 17 years at Microsoft working in various roles, including program management on the SQL Server team, consulting, and technical pre-sales. Rob has co-authored three patents for technologies used within SQL Server and was the lead author of several technical books on SQL Server. Rob is currently an active blogger on MongoDB Blogs.

How NETSCOUT built a global DDoS awareness platform with Amazon OpenSearch Service

Post Syndicated from Hardik Modi original https://aws.amazon.com/blogs/big-data/how-netscout-built-a-global-ddos-awareness-platform-with-amazon-opensearch-service/

This post was co-written with Hardik Modi, AVP, Threat and Migitation Products at NETSCOUT.

NETSCOUT Omnis Threat Horizon is a global cybersecurity awareness platform providing users with highly contextualized visibility into “over the horizon” threat activity on the global DDoS (Distributed Denial of Service) landscape—threats that could be impacting their industry, their customers, or their suppliers. It allows visitors to create custom profiles and understand DDoS activity that is being observed in near-real time through NETSCOUT’s ATLAS visibility platform. Users can create free accounts to create customized profiles that lead to a map-based visualization (as in the following screenshot) as well as tailored summary reporting. DDoS attacks can be impactful to services delivered over the internet. Visibility of this nature is key to anyone who wishes to understand what is happening on the threat landscape. Omnis Threat Horizon has been generally available since August 2019.

NETSCOUT Omnis Threat Horizon: Real-Time DDoS Attack Map

To provide continuous visibility at a low per-user cost (to enable a free service), the NETSCOUT development team chose a series of AWS technologies to power the collection, storage, analysis, warehousing, user authentication, and delivery of the application. In particular, they chose Amazon OpenSearch Service as the core analytics engine. They store all processed attack records in OpenSearch Service.

This post discusses the challenges and design patterns NETSCOUT used on its path to representing the details of roughly 10 million annual DDoS attacks in near-real time.

Background

NETSCOUT, through its Arbor product line, is a long-time provider of solutions for network visibility and DDoS mitigation for service providers and enterprises. Since 2007, NETSCOUT has operated a program called ATLAS, in which customers can opt to share anonymized data about the DDoS attacks they are observing on their network. As this program has matured, NETSCOUT has comprehensive visibility into the DDoS attack landscape—both the number and nature of attacks. This visibility informs and improves their products, allowing them to share analysis findings in the form of papers, blog posts, and a biannual threat report. Since NETSCOUT started collecting and analyzing data in the current form in September 2012, they have observed 96 million attacks, allowing them to perform considerable analysis of trends across regions and verticals, as well as understand the vectors used and sizes of attacks.

Omnis Threat Horizon is a solution to display this information to a broader audience—essentially anyone interested in the threat landscape, and specifically the DDoS attack trends at any given time. In addition to providing real-time maps, the solution allows the user to go back in time to observe visually or in summary form what might have been happening at a given time.

They wanted to make sure that the visual elements and application was responsive globally, both in terms of representing real-time data as well as showing historical information. Furthermore, they wanted to keep the incremental cost per user as low as possible, in order to be able to provide this service for free globally.

Solution overview

The following diagram illustrates the solution architecture.

One of the objectives behind the chosen solution was to utilize native AWS services in every instance possible. Furthermore, they chose to break component functionality into their own microservices, and make consistent use of this through the solution.

Individual monitoring sensors deliver data to Amazon Simple Storage Service (Amazon S3) on an hourly basis. As new entries are received, Amazon Simple Notification Service (Amazon SNS) notifications are delivered, resulting in processing of the data. Successive microservices are responsible for:

  • Parsing
  • Running algorithms to identify and separate spurious entries
  • Deduplication
  • Scoring
  • Confidence

After this processing, each attack is represented as a separate document in the OpenSearch Service domain. As of writing this post, NETSCOUT has approximately 96 million attacks in the cluster, all of which can be represented in some form in the maps and reports in Omnis Threat Horizon.

The data is organized in hourly bin files, and served to the application via Amazon CloudFront.

Lessons learned related to Elasticsearch

On previous projects, NETSCOUT tried Apache Cassandra, a popular NoSQL open-source database, and considered it inadequate for aggregation queries. While developing Horizon, they chose Elasticsearch to get access to more powerful aggregation query capabilities with significantly less developer time.

They started with a self-managed instance, but faced the following issues:

  • Considerable expenditure of person hours simply to manage the infrastructure
  • Each version upgrade was an involved process, requiring a lot of planning and still posing technical challenges along the way
  • No auto scaling and big aggregation queries could break Elasticsearch

After a few cycles of powering through this, they moved to OpenSearch Service to overcome these challenges.

Outcome

NETSCOUT saw the following benefits from this architecture:

  • Fast processing of attack data – The time from when attack data is received to when it is available in the data store is of the order of seconds, allowing them to provide near-real-time visibility in the solution.
  • Lower management overhead – The data store grows consistently, and by using a managed service, teams avoid having to perform tasks related to cluster management. This was a big pain point with previous solutions adopted involving the same technology.
  • Scalable architecture – It’s possible to add new capabilities into the pipeline as requirements emerge, without rearchitecting other components.

Conclusion

With OpenSearch Service, NETSCOUT has been able to build a resilient data store for the attack data they capture. As a result of architectural choices made and the underlying AWS services, they’re able to provide visibility into their data at small incremental costs, allowing them to provide a global visibility platform at no cost to the end-user.

With the most experience, the most reliable, scalable, and secure cloud, and the most comprehensive set of services and solutions, AWS is the best place to unlock value from your data and turn it into insight.


About the Authors

Hardik-Modi
Hardik Modi is AVP, Threat and Migitation Products at NETSCOUT. In this role, he oversees the teams responsible for mitigation products as well as the creation of security content for NETSCOUTs products, enabling best-in-class protection for users, as well as the continuous delivery and publication of impactful research across the DDoS and Intrusion landscapes.

Sujatha-Kuppuraju
Sujatha Kuppuraju is a Principal Solutions Architect at Amazon Web Services (AWS). She engages with customers to create innovative solutions that address customer business problems and accelerate the adoption of AWS services.

Mike-Arruda
Mike Arruda is a Senior Technical Account Manager at AWS, based in the New England area. He works with AWS Enterprise customers, supporting their success in adopting best practices and helping them achieve their desired business outcomes with AWS.

Build highly available streams with Amazon Kinesis Data Streams

Post Syndicated from Jeremy Ber original https://aws.amazon.com/blogs/big-data/build-highly-available-streams-with-amazon-kinesis-data-streams/

Many use cases are moving towards a real-time data strategy due to demand for real-time insights, low-latency response times, and the ability to adapt to the changing needs of end-users. For this type of workload, you can use Amazon Kinesis Data Streams to seamlessly provision, store, write, and read data in a streaming fashion. With Kinesis Data Streams, there are no servers to manage, and you can scale your stream to handle any additional throughput as it comes in.

Kinesis Data Streams offers 99.9% availability in a single AWS Region. For even higher availability, there are several strategies to explore within the streaming layer. This post compares and contrasts different strategies for creating a highly available Kinesis data stream in case of service interruptions, delays, or outages in the primary Region of operation.

Considerations for high availability

Before we dive into our example use case, there are several considerations to keep in mind when designing a highly available Kinesis Data Streams workload that relate to the business need for a particular pipeline:

  • Recovery Time Objective (RTO) is defined by the organization. RTO is the maximum acceptable delay between the interruption of service and restoration of service. This determines what is considered an acceptable time window when service is unavailable.
  • Recovery Point Objective (RPO) is defined by the organization. RPO is the maximum acceptable amount of time since the last data recovery point. This determines what is considered an acceptable loss of data between the last recovery point and the interruption of service.

In a general sense, the lower your values for RPO and RTO, the more expensive the overall solution becomes. This is because the solution needs to account for minimizing both data loss and service unavailability by having multiple instances of the service up and running in multiple Regions. This is why a big piece of high availability is the replication of data flowing through a workload. In our case, the data is replicated across Regions of Kinesis Data Streams. Conversely, the higher the RPO and RTO values are, the more complexity you introduce into your failover mechanism. This is due to the fact that the cost savings you realize by not standing up multiple instances across multiple Regions are offset by the orchestration needed to spin up these instances in the event of an outage.

In this post, we are only covering failover of a Kinesis data stream. In use cases where higher availability is required across the entire data pipeline, having failover architectures for every component (Amazon API Gateway, AWS Lambda, Amazon DynamoDB) is strongly encouraged.

The simplest approach to high availability is to start a new instance of producers, consumers, and data streams in a new Region upon service unavailability detection. The benefit here is primarily cost, but your RPO and RTO values will be higher as a result.

We cover the following strategies for highly available Kinesis Data Streams:

  • Warm standy – An architecture in which there is active replication of data from the Kinesis data stream in Region A to Region B. Consumers of the data stream are running in both Regions at all times. Recommended for use cases that can’t withstand extended downtime past their replication lag.
  • Cold standby – Active replication of data from the data stream in Region A to Region B, but consumers of the data stream in Region B are spun up when an outage in Region A is detected. Recommended for use cases that can afford some downtime as infrastructure is spun up in secondary Region. In this scenario, RPO will be similar to the warm standby strategy; however, RTO will increase.

For high availability purposes, these use cases need to replicate the data across Regions in a way that allows consumers and producers of the data stream to fail over quickly upon detection of a service unavailability and utilize the secondary Region’s stream. Let’s take an example architecture to further explain these DR strategies. We use API Gateway and Lambda to publish stock ticker information to a Kinesis data stream. The data is then retrieved by another Lambda consumer to save durably into DynamoDB for querying, alerting, and reporting. The following diagram illustrates this architecture.

the primary architecture for the post--showcasing data coming from a mobile phone to API Gateway, then AWS Lambda, then Kinesis Data Streams, Lambda again and finally publishing to a DynamoDB Table

We use this architecture with an example use case requiring the streaming workload to be highly available in the event of a Region outage. The customer can withstand an RTO of 15 minutes during an outage, because they refresh end-users’ dashboards on a 15-minute interval. The customer is sensitive to downtime and data loss, because their data will be used for historical auditing purposes, operational metrics, and dashboards for end-users. Downtime for this customer means that data isn’t able to be persisted in their database from their streaming layer, and therefore unavailable to any consuming application. For this use case, data can be retried up to 5 minutes from our Lambda function before failing over to the new Region. Consumers are considered unavailable when the stream is unavailable, and can scale up in the secondary Region to account for any backlog of events.

How might we approach making a Kinesis data stream highly available for this use case?

Warm standby pattern

The following architecture diagram illustrates the warm standby high availability pattern for Kinesis Data Streams.

warm standby pattern showcasing data being replicated between a kinesis data stream in one region to another

image showcasing the warm standby failover--where data from first lambda begins replicating to secondary region KDA

The warm standby architectural pattern involves running a Kinesis data stream both in the primary and secondary Region, along with consumers and downstream destinations of the primary Region’s streaming layer being replicated as well. Sources are configured to automatically fail over to the secondary Region in the case of service unavailability in the first Region. We dive into details of how to achieve this in the client failover section of this post. Data is replicated across Regions from the data stream in the primary Region to the secondary Region. This is done instead of having the sources publish to both Regions to avoid any consistency issues between the streams in the two Regions.

Although this architectural pattern gives very high availability, it’s also the most expensive option because we’re duplicating virtually the entire streaming layer across two Regions. For business use cases that can’t withstand extended data loss or withstand downtime, this may be the best option for them. From an RTO perspective, this architectural pattern ensures there will be no downtime. There is some nuance in the RPO metric in that it depends heavily on the replication lag. In the event of the primary stream becoming unavailable, whatever data hasn’t yet been replicated may be unavailable in the secondary Region. This data won’t be considered lost, but may be unavailable for consumption until the primary stream becomes available again. This method also can result in events being out of order.

For business needs that can’t tolerate this level of record unavailability, consider retaining data on the producer for the purposes of publishing to an available stream when available, or rewinding against the source for the producer if possible so that data stuck in the primary Region can be resent to the secondary stream upon failover. We cover this consideration in the client failover section of this post.

Cold standby pattern

The following architecture diagram illustrates the cold standby high availability pattern for Kinesis Data Streams.

active passive pattern for kinesis data streams

The cold standby architectural pattern involves running a data stream both in the primary and secondary Region, and spinning up the downstream resources like a stream consumer and destination for streams when a service interruption is detected—passive mode. Just like the warm standby pattern, sources are configured to automatically fail over to the secondary Region in the case of service unavailability in the first Region. Likewise, data is replicated across Regions from the data stream in the primary Region to the secondary Region.

The primary benefit this architectural pattern provides is cost efficiency. By not running consumers at all times, this effectively reduces your costs significantly compared to the warm standby pattern. However, this pattern may introduce some data unavailability for downstream systems while the secondary Region infrastructure is provisioned. Additionally, depending on replication lag, some records may be unavailable, as discussed in the warm standby pattern. It should be noted that depending on how long it takes to spin up resources, it may take some time for consumers to reprocess the data in the secondary Region, and latency can be introduced when failing over. Our implementation assumes a minimal replication lag and that downstream systems have the ability to reprocess a configurable amount of data to catch up to the tip of the stream. We discuss approaches to spinning these resources up in the client failover section, but one possible approach to this would be using an AWS CloudFormation template that spins these resources up on service unavailability detection.

For business needs that can tolerate some level of data unavailability and can accept interruptions while the new infrastructure in the secondary Region is spun up, this is an option to consider both from a cost perspective and an RPO/RTO perspective. The complexity of spinning up resources upon detecting service unavailability is offset by the lower cost of the overall solution.

Which pattern makes sense for our use case?

Let’s revisit the use case described earlier to identify which of the strategies best meets our needs. We can extract the pieces of information from the customer’s problem statement to identify that they need a high availability architecture that:

  • Can’t withstand extended amounts of data loss
  • Must resume operations within 15 minutes of service interruption identification

This criterion tells us that their RPO is close to zero, and their RTO is 15 minutes. From here, we can determine that the cold standby architecture with data replication provides us limited data loss, and the maximum downtime will be determined by the time it takes to provision consumers and downstream destinations in the secondary Region.

Let’s dive deeper into the implementation details of each of the core phases of high availability, including an implementation guide for our use case.

Launch AWS CloudFormation resources

If you want to follow along with our code samples, you can launch the following CloudFormation stack and follow the instructions in order to simulate the cold standby architecture referenced in this post.

Launch Stack

For purposes of the Kinesis Data Streams high availability setup demo, we use us-west-2 as the primary Region and us-east-2 as the failover Region. While deploying this solution in your own account, you can choose your own primary and failover Regions.

  1. Deploy the supplied CloudFormation template in failover Region us-east-2.

Make sure you specify us-east-2 as the value for the FailoverRegion parameter in the CloudFormation template.

  1. Deploy the supplied CloudFormation template in primary Region us-west-2.

Make sure you specify us-east-2 as the value for the FailoverRegion parameter in the CloudFormation template.

In steps 1 and 2, we deployed the following resources in the primary and failover Regions:

  1. KDS-HA-Stream – AWS::Kinesis::Stream (primary and failover Region)
  2. KDS-HA-ProducerLambda – AWS::Lambda::Function (primary Region)
  3. KDS-HA-ConsumerLambda – AWS::Lambda::Function (primary and failover Region)
  4. KDS-HA-ReplicationAgentLambda – AWS::Lambda::Function (primary Region)
  5. KDS-HA-FailoverLambda – AWS::Lambda::Function (primary Region)
  6. ticker-prices – AWS::DynamoDB::GlobalTable (primary and failover Region)

The KDS-HA-Stream Kinesis data stream is deployed in both Regions. An enhanced fan-out consumer of the KDS-HA-Stream stream KDS-HA-ReplicationAgentLambda in the primary Region is responsible for replicating messages to the data stream in the failover Region.

KDS-HA-ConsumerLambda is a Lambda function consuming messages out of the KDS-HA-Stream stream and persisting data into a DynamoDB table after preprocessing.

You can inspect the content of the ticker-prices DynamoDB table in the primary and failover Region. Note that last_updated_region attribute shows us-west-2 as its value because it’s the primary Region.

Replication

When deciding how to replicate data from a data stream in Region A to a data stream in Region B, there are several strategies that involve a consumer reading data off of the primary stream and sending that data cross-Region to the secondary data stream. This would act as a replicator service, responsible for copying the data between the two streams, maintaining a relatively low latency to replicate and ensuring data isn’t lost during this replication.

Because replication off of a shared throughput data stream could impact the flow of data in a production workload, we recommend using the enhanced fan-out feature of Kinesis Data Streams consumers to ensure replication doesn’t have an impact on consumption latency.

The replication strategy implemented in this post features asynchronous replication, meaning that the replication process doesn’t block any standard data flow in the primary stream. Synchronous replication would be a safer approach to guarantee replication and avoid data loss; however, this isn’t possible without a service-side implementation.

The following image shows a timeline of data flow for the cold standby architecture, with data being replicated as soon as it’s published.

Lambda replication

Lambda can treat a Kinesis data stream as an event source, which will funnel events from your data stream into a Lambda function. This Lambda function then receives and forwards these events across Regions to your data stream in a secondary Region. Lambda functions allow you to utilize best streaming practices such as retries of records that encounter errors, bisect on error functionality, and using the Lambda parallelization factor; using more instances of your Lambda function than you have available shards can help process records faster.

This Lambda function is at the crux of the architecture for high availability; it’s responsible solely for sending data across Regions, and it also has the best capability to monitor the replication progress. Important metrics to monitor for Lambda replication include IteratorAge, which indicates how old the last record in the batch was when it finished processing. A high IteratorAge value indicates that the Lambda function is falling behind and therefore is not keeping up with data ingestion for replication purposes. A high IteratorAge can lead to a higher RPO and the higher likelihood of data unavailability when a passive failover happens.

We use the following sample Lambda function in our CloudFormation template to replicate data across Regions:

import json
import boto3
import random
import os
import base64


def lambda_handler(event, context):
    client = boto3.client("kinesis", region_name=os.environ["FAILOVER_REGION"])
    records = []

    for record in event["Records"]:
        records.append(
            {
                "PartitionKey": record["kinesis"]["partitionKey"],
                "Data": base64.b64decode(record["kinesis"]["data"]).decode("utf-8"),
            }
        )
    response = client.put_records(Records=records, StreamName="KDS-HA-Stream")
    if response["FailedRecordCount"] > 0:
        print("Failed replicating data: " + json.dumps(response))
        raise Exception("Failed replicating data!")

The Lambda replicator in the CloudFormation template is configured to read from the data stream in the primary Region.

The following code contains the necessary AWS Identity and Access Management (IAM) permissions for Lambda, giving access for the Lambda function to assume this role. All actions are permitted on data streams and DynamoDB. In the principal of least privilege, it’s recommended to restrict this to the necessary streams in a production environment.

      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - 'sts:AssumeRole'
      Path: /
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'kinesis:DescribeStream'
                  - 'kinesis:DescribeStreamSummary'
                  - 'kinesis:GetRecords'
                  - 'kinesis:GetShardIterator'
                  - 'kinesis:ListShards'
                  - 'kinesis:ListStreams'
                  - 'kinesis:SubscribeToShard'
                  - 'kinesis:PutRecords'
                Resource:
                  - 'arn:aws:kinesis:*:*:stream/KDS-HA-Stream'
                  - 'arn:aws:kinesis:*:*:stream/KDS-HA-Stream/consumer/KDS-HA-Stream-EFO-Consumer:*'
      ManagedPolicyArns:
        - 'arn:aws:iam::aws:policy/CloudWatchLogsFullAccess'
 

Health check

A generalized strategy for determining when to consider our data stream unavailable involves the use of Amazon CloudWatch metrics. We use the metrics coming off of our Lambda producer and consumer in order to assess the availability of our data stream. While producing to a data stream, an error might appear as one of the following responses back from the data stream: PutRecord or PutRecords returns an AmazonKinesisException 500 or AmazonKinesisException 503 error. When consuming from a data stream, an error might appear as one of the following responses back from the data stream: SubscribeToShard.Success or GetRecords returns an AmazonKinesisException 500 or AmazonKinesisException 503.

We can calculate our effective error rate based on PutRecord.Success and GetRecord.Success. An average error rate of 1% or higher over a time window of 5 minutes, for example, could indicate that there is an issue with the data stream, and we may want to fail over. In our CloudFormation template, this error rate threshold as well as time window are configurable, but by default we check for an error rate of 1% in the last 5 minutes to trigger a failover of our clients.

Client failover

When a data stream is deemed to be unreachable, we must now take action to keep our system available and reachable for clients on both ends of the interaction. This means for producers following the cold standby high availability architecture, we change the destination stream where the producer was writing. If high availability and failover of data producers and consumers isn’t a requirement of a given use case, a different architecture would be a better fit.

Prior to failover, the producer may have been delivering data to a stream in Region A, but we now automatically update the destination to be the stream in Region B. For different clients, the methodology of updating the producer will be different, but for ours, we store the active destination for producers in the Lambda environment variables from AWS CloudFormation and update our Lambda functions dynamically on health check failure scenarios.

For our use case, we use the maximum consumer lag time (iteratorAge) plus some buffer to influence the starting position of the failover consumer. This allows us to ensure that the consumer in the secondary Region doesn’t skip records that haven’t been processed in the originating Region, but some data overlap may occur. Note that some duplicates in the downstream system may be introduced, and having an idempotent sink or some method of handling duplicates must be implemented in order to avoid duplicate-related isssues.

In the case where data is successfully written to a data stream but is unable to be consumed from the stream, the data will not be replicated and therefore be unavailable in the second Region. The data will be durably stored in the primary data stream until it comes back online and can be read from. Note that if the stream is unavailable for a longer period of time than your total data retention period on the data stream, this data will be lost. Data retention for Kinesis Data Streams can be retrospectively increased up to 1 year.

For consumers in a cold standby architecture, upon failure detection, the consumer will be disabled or shut down, and the same consumer instance will be spun up in the secondary Region to consume from the secondary data stream. On the consumer side, we assume that the consumer application is stateless in our provided solution. If your application requires state, you can migrate or preload the application state via Amazon Simple Storage Service (Amazon S3) or a database. For a stateless application, the most important aspect of failover is the starting position.

In the following timeline, we can see that at some point, the stream in Region A was deemed unreachable.

The consumer application in Region A was reading data at time t10, and when it fails over to the secondary Region (B), it reads starting at t5 (5 minutes before the current iteratorAgeMilliseconds). This ensures that data isn’t skipped by the consumer application. Keep in mind that there may be some overlap in records in the downstream destinations.

In the provided cold standby AWS CloudFormation example, we can manually trigger a failover with the AWS Command Line Interface (AWS CLI). In the following code, we manually fail over to us-east-2:

aws lambda invoke --function-name KDS-HA-FailoverLambda --cli-binary-format raw-in-base64-out --payload '{}' response.json --region us-west-2

After a few minutes, you can inspect the content of the ticker-prices DynamoDB table in the primary and failover Region. Note that the last_updated_region attribute shows us-east-2 as its value because it’s failed over to the us-east-2 Region.

Failback

After an outage or service unavailability is deemed to be resolved, the next logical step is to reorient your clients back to their original operating Regions. Although it may be tempting to automate this procedure, a manual failback approach during off-business hours when minimal production disruption will take place makes more sense.

In the following images, we can visualize the timeline with which consumer applications are failed back to the original Region.

The producer switches back to the original Region, and we wait for the consumer in Region B to reach 0 lag. At this point, the consumer application in Region B is disabled, and replication to Region B is resumed. We have now returned to our normal state of processing messages as shown in the replication section of this post.

In our AWS CloudFormation setup, we perform a failback with the following steps:

  1. Re-enable the event source mapping and start consuming messages from the primary Region at the latest position:
aws lambda create-event-source-mapping --function-name KDS-HA-ConsumerLambda --batch-size 100 --event-source-arn arn:aws:kinesis:us-west-2:{{accountId}}:stream/KDS-HA-Stream --starting-position LATEST --region us-west-2
  1. Switch the producer back to the primary Region:
aws lambda update-function-configuration --function-name KDS-HA-ProducerLambda --environment "Variables={INPUT_STREAM=KDS-HA-Stream,PRODUCING_TO_REGION=us-west-2}" --region us-west-2
  1. In the failover Region (us-east-2), wait for your data stream’s GetRecords max iterator age (in milliseconds) CloudWatch metric to report 0 as a value. We’re waiting for the consumer Lambda function to catch up with all produced messages.
  2. Stop consuming messages from the failover Region.
  3. Run the following AWS CLI command and grab the UUID from the response, which we use to delete the existing event source mapping. Make sure you’re picking event source mapping for the Lambda function KDS-HA-ConsumerLambda.
aws lambda list-event-source-mappings --region us-east-2
aws lambda delete-event-source-mapping --uuid {{UUID}} --region us-east-2
  1. Restart the replication agent in the primary Region.
  2. Run following AWS CLI command, and capture ConsumerARN from the response:
aws kinesis list-stream-consumers --stream-arn arn:aws:kinesis:us-west-2:{{accountId}}:stream/KDS-HA-Stream --region us-west-2
aws lambda create-event-source-mapping --function-name KDS-HA-ReplicationAgentLambda --batch-size 100 --event-source-arn {{ConsumerARN}} --starting-position LATEST --region us-west-2

When this is complete, you can observe the same data stream metrics—the number of records in and out per second, consumer lag metrics, and number of errors as described in the health check section of this post—to ensure that each of the components has resumed processing data in the original Region. We can also take note of the data landing in DynamoDB, which displays which Region data is being updated from in order to determine the success of our failback procedure.

We recommend for any streaming workload that can’t withstand extended data loss or downtime to implement some form of cross-Region high availability in the unlikely event of service unavailability. These recommendations can help you determine which pattern is right for your use case.

Clean up

To avoid incurring future charges, complete the following steps:

  1. Delete the CloudFormation stack from primary Region us-west-2.
  2. Delete the CloudFormation stack from failover Region us-east-2.
  3. List all event source mappings in primary Region us-west-2 using the aws lambda list-event-source-mappings --region us-west-2 command and note the UUIDs of the event source mappings tied to the KDS-HA-ConsumerLambda and KDS-HA-ReplicationAgentLambda Lambda functions.
  4. Delete event source mappings in primary Region us-west-2 tied to the KDS-HA-ConsumerLambda and KDS-HA-ReplicationAgentLambda Lambda functions using the aws lambda delete-event-source-mapping --uuid {{UUID}} --region us-west-2 command and UUIDs noted in the previous step.

Conclusion

Building highly available Kinesis data streams across multiple Regions is multi-faceted, and all aspects of your RPO, RTO, and operational costs need to be carefully considered. The code and architecture discussed in this post is one of many different architectural patterns you can choose for your workloads, so make sure to choose the appropriate architecture based on the criteria for your specific requirements.

To learn more about Kinesis Data Streams, we have a getting started guide as well as a workshop to walk through all the integrations with Kinesis Data Streams. You can also contact your AWS Solutions Architects, who can be of assistance alongside your high availability journey.


About the Authors

Jeremy Ber has been working in the telemetry data space for the past 7 years as a Software Engineer, Machine Learning Engineer, and most recently a Data Engineer. In the past, Jeremy has supported and built systems that stream in terabytes of data per day, and process complex machine learning algorithms in real time. At AWS, he is a Senior Streaming Specialist Solutions Architect supporting both Amazon MSK and Amazon Kinesis.

Pratik Patel is a Sr Technical Account Manager and streaming analytics specialist. He works with AWS customers and provides ongoing support and technical guidance to help plan and build solutions using best practices, and proactively helps keep customer’s AWS environments operationally healthy.

Improve the performance of Apache Iceberg’s metadata file operations using Amazon FSx for Lustre on Amazon EMR

Post Syndicated from Rajarshi Sarkar original https://aws.amazon.com/blogs/big-data/improve-the-performance-of-apache-icebergs-metadata-file-operations-using-amazon-fsx-for-lustre-on-amazon-emr/

Apache Iceberg is an open table format for large datasets in Amazon Simple Storage Service (Amazon S3), and provides fast query performance over large tables, atomic commits, concurrent writes, and SQL-compatible table evolution. With Amazon EMR 6.5+, you can use Apache Spark on EMR clusters with the Iceberg table format.

Iceberg helps data engineers manage complex challenges such as continuously evolving datasets while maintaining query performance. Iceberg allows you to do the following:

  • Maintain transactional consistency on tables between multiple applications where files can be added, removed, or modified atomically with full read isolation and multiple concurrent writes
  • Implement full schema evolution to track changes to a table over time
  • Issue time travel queries to query historical data and verify changes between updates
  • Organize tables into flexible partition layouts with partition evolution, enabling updates to partition schemes as queries and data volume changes without relying on physical directories
  • Roll back tables to prior versions to quickly correct issues and return tables to a known good state
  • Perform advanced planning and filtering in high-performance queries on large datasets

In this post, we show you how to improve the performance of Iceberg’s metadata file operations using Amazon FSx for Lustre and Amazon EMR.

Performance of metadata file operations in Iceberg

The catalog, metadata layer, and data layer of Iceberg are outlined in the following diagram.

Iceberg maintains metadata across multiple small files (metadata file, manifest list, and manifest files) to effectively prune data, filter data, read the correct snapshot, merge delta files, and more. Although Iceberg has implemented fast scan planning to make sure that metadata file operations don’t take a large amount of time, the time taken is slightly high for object storage like Amazon S3 because it has a higher read/write latency.

In use cases like a high-throughput streaming application writing data into an S3 data lake in near-real time, snapshots are produced in microbatches at a very fast rate, resulting in a high number of snapshot files and causing degradation in performance of metadata file operations.

As shown in the following architecture diagram, the EMR cluster consumes from Kafka and writes to an Iceberg table, which uses Amazon S3 as storage and AWS Glue as the catalog.

In this post, we dive deep into how to improve query performance by caching metadata files in a low-latency file system like FSx for Lustre.

Overview of solution

FSx for Lustre makes it easy and cost-effective to launch and run the high-performance Lustre file system. You use it for workloads where speed matters, such as high throughput streaming writes, machine learning, high performance computing (HPC), video processing, and financial modelling. You can also link the FSx for Lustre file system to an S3 bucket, if required. FSx for Lustre offers multiple deployment options, including the following:

  • Scratch file systems, which are designed for temporary storage and short-term processing of data. Data isn’t replicated and doesn’t persist if a file server fails. Use scratch file systems when you need cost-optimized storage for short-term, processing-heavy workloads.
  • Persistent file systems, which are designed for long-term storage and workloads. The file servers are highly available, and data is automatically replicated within the same Availability Zone in which the file system is located. The data volumes attached to the file servers are replicated independently from the file servers to which they are attached.

The use case with Iceberg’s metadata files is related to caching, and the workloads are short-running (a few hours), so the scratch file system can be considered as a viable deployment option. A Scratch-2 file system with 200 MB/s/TiB of throughput is sufficient for our needs because Iceberg’s metadata files are small in size and we don’t expect a very high number of parallel connections.

You can use FSx for Lustre as a cache for the metadata files (on top of an S3 location) to offer better performance in terms of metadata file operations. To read/write files, Iceberg provides a capability to load a custom FileIO dynamically during runtime. You can pass the FSxForLustreS3FileIO reference using a Spark configuration, which takes care of reading/writing to appropriate file systems (FSx for Lustre for reads and Amazon S3 for writes). By enabling the catalog properties lustre.mount.path, lustre.file.system.path, and data.repository.path, Iceberg resolves the S3 path to FSx for Lustre path at runtime.

As shown in the following architecture diagram, the EMR cluster consumes from Kafka and writes to an Iceberg table that uses Amazon S3 as storage and AWS Glue as the catalog. Metadata reads are redirected to FSx for Lustre, which updates asynchronously.

Pricing and performance

We took a sample dataset across 100, 1,000, and 10,000 snapshots, and could observe up to 8.78 times speedup in metadata file operations and up to 1.26 times speedup in query time. Note that the benefit was observed for tables with a higher number of snapshots. The environment components used in this benchmark are listed in the following table.

Iceberg Version Spark Version Cluster Version Master Workers
0.14.1-amzn-0 3.3.0-amzn-1 Amazon EMR 6.9.0 m5.8xlarge 15 x m5.8xlarge

The following graph compares the speedup for each amount of snapshots.

You can calculate the price using the AWS Pricing Calculator. The estimated monthly cost of an FSx for Lustre file system (scratch deployment type) in the US East (N. Virginia) Region with 1.2 TB storage capacity and 200 MBps/TiB per unit storage throughput is $336.38.

The overall benefit is significant considering the low cost incurred. The performance gain in terms of metadata file operations can help you achieve low-latency read for high-throughput streaming workloads.

Prerequisites

For this walkthrough, you need the following prerequisites:

Create an FSx for Lustre file system

In this section, we walk through the steps to create your FSx for Lustre file system via the FSx for Lustre console. To use the AWS Command Line Interface (AWS CLI), refer to create-file-system.

  1. On the Amazon FSx console, create a new file system.
  2. For File system options, select Amazon FSx for Lustre.
  3. Choose Next.
  4. For File system name¸ enter an optional name.
  5. For Deployment and storage type, select Scratch, SSD, because it’s designed for short-term storage and workloads.
  6. For Throughput per unit of storage, select 200 MB/s/TiB.You can choose the storage capacity according to your use case. A Scratch-2 file system with 200 MB/s/TiB of throughput is sufficient for our needs because Iceberg’s metadata files are small in size and we don’t expect a very high number of parallel connections.
  7. Enter an appropriate VPC, security group, and subnet.Make sure that the security group has the appropriate inbound and outbound rules enabled to access the FSx for Lustre file system from Amazon EMR.
  8. In the Data Repository Import/Export section, select Import data from and export data to S3.
  9. Select Update my file and directory listing as objects are added to, changed in, or deleted from my S3 bucket to keep the file system listing updated.
  10. For Import bucket, enter the S3 bucket to store the Iceberg metadata.
  11. Choose Next and verify the summary of the file system, then choose Create File System.

When the file system is created, you can view the DNS name and mount name.

Create an EMR cluster with FSx for Lustre mounted

This section shows how to create an Iceberg table using Spark, though we can use other engines as well. To create your EMR cluster with FSx for Lustre mounted, complete the following steps:

  1. On the Amazon EMR console, create an EMR cluster (6.9.0 or above) with Iceberg installed. For instructions, refer to Use a cluster with Iceberg installed.

To use the AWS CLI, refer to create-cluster.

  1. Keep the network (VPC) and EC2 subnet the same as the ones you used when creating the FSx for Lustre file system.
  2. Create a bootstrap script and upload it to an S3 bucket that is accessible to EMR.Refer to the following bootstrap script to mount FSx for Lustre in an EMR cluster (the file system gets mounted in the /mnt/fsx path of the cluster). The file system DNS name and the mount name can be found in the file system summary details.
    sudo amazon-linux-extras install -y lustre2.10
    sudo mkdir -p /mnt/fsx
    sudo mount -t lustre -o noatime <Lustre-File-System-DNS-Name>@tcp:/<mount-Name> /mnt/fsx
    sudo ln -s /mnt/fsx /lustre
    sudo chmod -R 755 /mnt/fsx
    sudo chmod -R 755 /lustre

  3. Add the bootstrap action script to the EMR cluster.
  4. Specify your EC2 key pair.
  5. Choose Create cluster.
  6. When the EMR cluster is running, SSH into the cluster and launch the spark-sql using the following code:
    spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
        --conf spark.sql.catalog.my_catalog.warehouse=s3://<bucket>/warehouse/sample_table \
        --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
        --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.FSxForLustreS3FileIO \
        --conf spark.sql.catalog.my_catalog.lustre.mount.path=file:///mnt/fsx \
        --conf spark.sql.catalog.my_catalog.lustre.file.system.path=/warehouse/sample_table/metadata \
        --conf spark.sql.catalog.my_catalog.data.repository.path=s3://<bucket>/warehouse/sample_table/metadata

Note the following:

    • At a Spark session level, the catalog properties io-impl, lustre.mount.path, lustre.file.system.path, and data.repository.path have been set.
    • io-impl sets a custom FileIO implementation that resolves the FSx for Lustre location (from the S3 location) during reads. lustre.mount.path is the local mount path in the EMR cluster, lustre.file.system.path is the FSx for Lustre file system path, and data.repository.path is the S3 data repository path, which is linked to the FSx for Lustre file system path. After all these properties are provided, the data.repository.path is resolved to the concatenation of lustre.mount.path and lustre.file.system.path during reads. Note that FSx for Lustre is eventually consistent after an update in Amazon S3. So, in case FSx for Lustre is catching up with the S3 updates, the FileIO will fall back to appropriate S3 paths.
    • If write.metadata.path is configured, make sure that the path doesn’t contain any trailing slashes and data.repository.path is equivalent to write.metadata.path.
  1. Create the Iceberg database and table using the following queries:
    spark-sql> CREATE DATABASE IF NOT EXISTS my_catalog.db_iceberg;
    
    spark-sql> CREATE TABLE IF NOT EXISTS my_catalog.db_iceberg.sample_table (id int, data string)
    USING iceberg
    LOCATION 's3://<bucket>/warehouse/sample_table';

    Note that to migrate an existing table to use FSx for Lustre, you must create the FSx for Lustre file system, mount the same while starting the EMR cluster, and start the Spark session as highlighted in the previous step. The Amazon S3 listing of the existing table is updated in the FSx for Lustre file system eventually.

  2. Insert the data into the table using an INSERT INTO query and then query the same:
    spark-sql> INSERT INTO my_catalog.db_iceberg.sample_table VALUES (1, 'a'), (2, 'b'), (3, 'c');
    
    spark-sql> SELECT * FROM my_catalog.db_iceberg.sample_table;

  3. You can now view the metadata files in the local FSx mount, which is also linked to the S3 bucket s3://<bucket>/warehouse/sample_table/metadata/:
    $ ls -ltr /mnt/fsx/warehouse/sample_table/metadata/
    total 3
    -rw-r--r-- 1 hadoop hadoop 1289 Mar 22 12:01 00000-40c0ef36-5a4c-4b3e-ba88-68398581c1a8.metadata.json
    -rw-r--r-- 1 hadoop hadoop 1289 Mar 22 12:01 00000-0b2b7ee5-4167-4fab-9527-d79d05c0a864.metadata.json
    -rw-r--r-- 1 hadoop hadoop 3727 Mar 22 12:03 snap-8127395396703547805-1-9a1cf328-db72-4cac-ad61-018048c3c470.avro
    -rw-r--r-- 1 hadoop hadoop 5853 Mar 22 12:03 9a1cf328-db72-4cac-ad61-018048c3c470-m0.avro
    -rw-r--r-- 1 hadoop hadoop 2188 Mar 22 12:03 00001-100c61d1-51c4-4337-8641-a6ed5ed9802e.metadata.json

  4. You can view the metadata files in Amazon S3:
    [hadoop@ip-172-31-17-161 ~]$ aws s3 ls s3://<bucket>/warehouse/sample_table/metadata/
    2022-03-24 05:29:46         20 .00000-0b2b7ee5-4167-4fab-9527-d79d05c0a864.metadata.json.crc
    2022-03-24 05:29:45         20 .00000-40c0ef36-5a4c-4b3e-ba88-68398581c1a8.metadata.json.crc
    2022-03-24 05:29:45         28 .00001-100c61d1-51c4-4337-8641-a6ed5ed9802e.metadata.json.crc
    2022-03-24 05:29:46         56 .9a1cf328-db72-4cac-ad61-018048c3c470-m0.avro.crc
    2022-03-24 05:29:45         40 .snap-8127395396703547805-1-9a1cf328-db72-4cac-ad61-018048c3c470.avro.crc
    2022-03-24 05:29:45       1289 00000-0b2b7ee5-4167-4fab-9527-d79d05c0a864.metadata.json
    2022-03-24 05:29:46       1289 00000-40c0ef36-5a4c-4b3e-ba88-68398581c1a8.metadata.json
    2022-03-24 05:29:45       2188 00001-100c61d1-51c4-4337-8641-a6ed5ed9802e.metadata.json
    2022-03-24 05:29:45       5853 9a1cf328-db72-4cac-ad61-018048c3c470-m0.avro
    2022-03-24 05:29:46       3727 snap-8127395396703547805-1-9a1cf328-db72-4cac-ad61-018048c3c470.avro

  5. You can also view the data files in Amazon S3:
    $ aws s3 ls s3://<bucket>/warehouse/sample_table/data/
    2022-03-22 12:03:04        619 00000-0-2a0e3499-189e-42c3-8c86-df47c91b1a11-00001.parquet
    2022-03-22 12:03:04        619 00001-1-b3e34418-30cf-4a81-80c7-04d2fc089435-00001.parquet

Clean up

When you’re done exploring the solution, complete the following steps to clean up the resources:

  1. Drop the Iceberg table.
  2. Delete the EMR cluster.
  3. Delete the FSx for Lustre file system.
  4. If any orphan files are present, empty the S3 bucket.
  5. Delete the EC2 key pair.
  6. Delete the VPC.

Conclusion

In this post, we demonstrated how to create an FSx for Lustre file system and an EMR cluster with the file system mounted. We observed the performance gain in terms of Iceberg metadata file operations and then cleaned up so as not to incur any additional charges.

Using FSx for Lustre with Iceberg on Amazon EMR allows you to gain significant performance in terms of metadata file operations. We observed 6.33–8.78 times speedup in metadata file operations and 1.06–1.26 times speedup in query time for Iceberg tables with 100, 1,000, and 10,000 snapshots. Note that this approach reduces the time for metadata file operations and not for the data operations. The overall performance gain would be dependent on the number of metadata files, size of each metadata files, amount of data that is being processed, and so on.


About the Author

Rajarshi Sarkar is a Software Development Engineer at Amazon EMR. He works on cutting-edge features of Amazon EMR and is also involved in open-source projects such as Apache Iceberg and Trino. In his spare time, he likes to travel, watch movies and hang out with friends.

Amazon QuickSight AWS re:Invent recap 2022

Post Syndicated from Mia Heard original https://aws.amazon.com/blogs/big-data/amazon-quicksight-aws-reinvent-recap-2022/

AWS re:Invent is a learning conference hosted by AWS for the global cloud computing community. Re:Invent was held at the end of 2022 in Las Vegas, Nevada, from November 28 to December 2.

Amazon QuickSight powers data-driven organizations with unified business intelligence (BI) at hyperscale. This post walks you through a full recap of QuickSight at this year’s re:Invent, including key launch announcements, sessions available virtually, and additional resources for continued learning.

Launch announcements

AWS Announces Five New Capabilities for Amazon QuickSight

This press release covers five new QuickSight capabilities to help you streamline business intelligence operations, using the most popular serverless BI service built for the cloud.

New analytical questions available in Amazon QuickSight Q: “Why” and “Forecast”

QuickSight announces support for two new question types that simplify and scale complex analytical tasks using natural language: “forecast” and “why.”

Announcing Automated Data Preparation for Amazon QuickSight Q

Automated data preparation utilizes machine learning (ML) to infer semantic information about data and adds it to datasets as metadata about the columns (fields), making it faster for you to prepare data in order to support natural language questions.

New Amazon QuickSight API Capabilities to Accelerate Your BI Transformation

New QuickSight API capabilities allow programmatic creation and management of dashboards, analysis, and templates.

Create and Share Operational Reports at Scale with Amazon QuickSight Paginated Reports

This feature allows you to create and share highly formatted, personalized reports containing business-critical data to hundreds of thousands of end-users—without any infrastructure setup or maintenance, up-front licensing, or long-term commitments.

Keynotes

Adam Selipsky, Chief Executive Officer of Amazon Web Services

Watch Adam Selipsky, Chief Executive Officer of Amazon Web Services, as he looks at the ways that forward-thinking builders are transforming industries and even our future, powered by AWS. He highlights innovations in data, infrastructure, and more that are helping customers achieve their goals faster, take advantage of untapped potential, and create a better future with AWS.

Swami Sivasubramanian, Vice President of AWS Data and Machine Learning

Watch Swami Sivasubramanian, Vice President of AWS Data and Machine Learning, as he reveals the latest AWS innovations that can help you transform your company’s data into meaningful insights and actions for your business. In this keynote, several speakers discuss the key components of a future-proof data strategy and how to empower your organization to drive the next wave of modern invention with data. Hear from leading AWS customers who are using data to bring new experiences to life for their customers.

Leadership sessions

Unlock the value of your data with AWS analytics

Data fuels digital transformation and drives effective business decisions. To survive in an ever-changing world, organizations are turning to data to derive insights, create new experiences, and reinvent themselves so they can remain relevant today and in the future. AWS offers analytics services that allow organizations to gain faster and deeper insights from all their data. In this session, G2 Krishnamoorthy, VP of AWS Analytics, addresses the current state of analytics on AWS, covers the latest service innovations around data, and highlights customer successes with AWS analytics. Also, learn from organizations like FINRA and more who have turned to AWS for their digital transformation journey.

Reinvent how you derive value from your data with Amazon QuickSight

In this session, learn how you can use AWS-native business analytics to provide your users with ML-powered interactive dashboards, natural language query (NLQ), and embedded analytics to provide insights to users at scale, when and where they need it. Watch this session to also learn more about how Amazon uses QuickSight internally.

Breakout sessions

What’s New with Amazon QuickSight?

This session covers all of QuickSight’s newly launched capabilities, including paginated reporting in the cloud, 1 billion rows of data with SPICE, assets as code, and new Amazon QuickSight Q capabilities, including ML-powered semantic inferencing, forecasting, new question types, and more.

Differentiate your apps with Amazon QuickSight embedded analytics

Watch this session to learn how to enable new monetization opportunities and grow your business with QuickSight embedded analytics. Discover how you can differentiate your end-user experience by embedding data visualizations, dashboards, and ML-powered natural language query into your applications at scale with no infrastructure to manage. Hear from customers Guardian Life and Showpad and learn more about their QuickSight use cases.

Migrate to cloud-native business analytics with Amazon QuickSight

Legacy BI systems can hurt agile decision-making in the modern organization, with expensive licensing, outdated capabilities, and expensive infrastructure management. In this session, discover how migrating your BI to the cloud with cloud-native, fully managed business analytics capabilities from QuickSight can help you overcome these challenges. Learn how you can use QuickSight’s interactive dashboards and reporting capabilities to provide insights to every user in the organization, lowering your costs and enabling better decision-making. Watch this session to also learn more about Siemens’s QuickSight use case.

Get clarity on your data in seconds with Amazon QuickSight Q

Amazon QuickSight Q is an ML–powered natural language capability that empowers business users to ask questions about all of their data using everyday business language and get answers in seconds. Q interprets questions to understand their intent and generates an answer instantly in the form of a visual without requiring authors to create graphics, dashboards, or analyses. In this session, the QuickSight Q team provides an overview and demonstration of Q in action. Watch this session to also learn more about NASDAQ’s QuickSight use case.

Optimize your AWS cost and usage with Cloud Intelligence Dashboards

Do your engineers know how much they’re spending? Do you have insight into the details of your cost and usage on AWS? Are you taking advantage of all your cost optimization opportunities? Attend this session to learn how organizations are using the Cloud Intelligence Dashboards to start their FinOps journeys and create cost-aware cultures within their organizations. Dive deep into specific use cases and learn how you can use these insights to drive and measure your cost optimization efforts. Discover how unit economics, resource-level visibility, and periodic spend updates make it possible for FinOps practitioners, developers, and business executives to come together to make smarter decisions. Watch this session to also learn more about Dolby laboratories’ QuickSight use case.

Useful resources

With the QuickSight re:Invent breakout session recordings and additional resources, we hope that you learn how to dive deeper into your data with QuickSight. For continued learning, check out more information and resources via our website.


About the author

Mia Heard is a product marketing manager for Amazon QuickSight, AWS’ cloud-native, fully managed BI service.

Accelerate orchestration of an ELT process using AWS Step Functions and Amazon Redshift Data API

Post Syndicated from Poulomi Dasgupta original https://aws.amazon.com/blogs/big-data/accelerate-orchestration-of-an-elt-process-using-aws-step-functions-and-amazon-redshift-data-api/

Extract, Load, and Transform (ELT) is a modern design strategy where raw data is first loaded into the data warehouse and then transformed with familiar Structured Query Language (SQL) semantics leveraging the power of massively parallel processing (MPP) architecture of the data warehouse. When you use an ELT pattern, you can also use your existing SQL workload while migrating from your on-premises data warehouse to Amazon Redshift. This eliminates the need to rewrite relational and complex SQL workloads into a new framework. With Amazon Redshift, you can load, transform, and enrich your data efficiently using familiar SQL with advanced and robust SQL support, simplicity, and seamless integration with your existing SQL tools. When you adopt an ELT pattern, a fully automated and highly scalable workflow orchestration mechanism will help to minimize the operational effort that you must invest in managing the pipelines. It also ensures the timely and accurate refresh of your data warehouse.

AWS Step Functions is a low-code, serverless, visual workflow service where you can orchestrate complex business workflows with an event-driven framework and easily develop repeatable and dependent processes. It can ensure that the long-running, multiple ELT jobs run in a specified order and complete successfully instead of manually orchestrating those jobs or maintaining a separate application.

Amazon DynamoDB is a fast, flexible NoSQL database service for single-digit millisecond performance at any scale.

This post explains how to use AWS Step Functions, Amazon DynamoDB, and Amazon Redshift Data API to orchestrate the different steps in your ELT workflow and process data within the Amazon Redshift data warehouse.

Solution overview

In this solution, we will orchestrate an ELT process using AWS Step Functions. As part of the ELT process, we will refresh the dimension and fact tables at regular intervals from staging tables, which ingest data from the source. We will maintain the current state of the ELT process (e.g., Running or Ready) in an audit table that will be maintained at Amazon DynamoDB. AWS Step Functions allows you to directly call the Data API from a state machine, reducing the complexity of running the ELT pipeline. For loading the dimensions and fact tables, we will be using Amazon Redshift Data API from AWS Lambda. We will use Amazon EventBridge for scheduling the state machine to run at a desired interval based on the customer’s SLA.

For a given ELT process, we will set up a JobID in a DynamoDB audit table and set the JobState as “Ready” before the state machine runs for the first time. The state machine performs the following steps:

  1. The first process in the Step Functions workflow is to pass the JobID as input to the process that is configured as JobID 101 in Step Functions and DynamoDB by default via the CloudFormation template.
  2. The next step is to fetch the current JobState for the given JobID by running a query against the DynamoDB audit table using Lambda Data API.
  3. If JobState is “Running,” then it indicates that the previous iteration is not completed yet, and the process should end.
  4. If the JobState is “Ready,” then it indicates that the previous iteration was completed successfully and the process is ready to start. So, the next step will be to update the DynamoDB audit table to change the JobState to “Running” and JobStart to the current time for the given JobID using DynamoDB Data API within a Lambda function.
  5. The next step will be to start the dimension table load from the staging table data within Amazon Redshift using Lambda Data API. In order to achieve that, we can either call a stored procedure using the Amazon Redshift Data API, or we can also run series of SQL statements synchronously using Amazon Redshift Data API within a Lambda function.
  6. In a typical data warehouse, multiple dimension tables are loaded in parallel at the same time before the fact table gets loaded. Using Parallel flow in Step Functions, we will load two dimension tables at the same time using Amazon Redshift Data API within a Lambda function.
  7. Once the load is completed for both the dimension tables, we will load the fact table as the next step using Amazon Redshift Data API within a Lambda function.
  8. As the load completes successfully, the last step would be to update the DynamoDB audit table to change the JobState to “Ready” and JobEnd to the current time for the given JobID, using DynamoDB Data API within a Lambda function.
    Solution Overview

Components and dependencies

The following architecture diagram highlights the end-to-end solution using AWS services:

Architecture Diagram

Before diving deeper into the code, let’s look at the components first:

  • AWS Step Functions – You can orchestrate a workflow by creating a State Machine to manage failures, retries, parallelization, and service integrations.
  • Amazon EventBridge – You can run your state machine on a daily schedule by creating a Rule in Amazon EventBridge.
  • AWS Lambda – You can trigger a Lambda function to run Data API either from Amazon Redshift or DynamoDB.
  • Amazon DynamoDB – Amazon DynamoDB is a fully managed, serverless, key-value NoSQL database designed to run high-performance applications at any scale. DynamoDB is extremely efficient in running updates, which improves the performance of metadata management for customers with strict SLAs.
  • Amazon Redshift – Amazon Redshift is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, easy, and secure analytics at scale.
  • Amazon Redshift Data API – You can access your Amazon Redshift database using the built-in Amazon Redshift Data API. Using this API, you can access Amazon Redshift data with web services–based applications, including AWS Lambda.
  • DynamoDB API – You can access your Amazon DynamoDB tables from a Lambda function by importing boto3.

Prerequisites

To complete this walkthrough, you must have the following prerequisites:

  1. An AWS account.
  2. An Amazon Redshift cluster.
  3. An Amazon Redshift customizable IAM service role with the following policies:
    • AmazonS3ReadOnlyAccess
    • AmazonRedshiftFullAccess
  4. Above IAM role associated to the Amazon Redshift cluster.

Deploy the CloudFormation template

To set up the ETL orchestration demo, the steps are as follows:

  1. Sign in to the AWS Management Console.
  2. Click on Launch Stack.

    CreateStack-1
  3. Click Next.
  4. Enter a suitable name in Stack name.
  5. Provide the information for the Parameters as detailed in the following table.
CloudFormation template parameter Allowed values Description
RedshiftClusterIdentifier Amazon Redshift cluster identifier Enter the Amazon Redshift cluster identifier
DatabaseUserName Database user name in Amazon Redshift cluster Amazon Redshift database user name which has access to run SQL Script
DatabaseName Amazon Redshift database name Name of the Amazon Redshift primary database where SQL script would be run
RedshiftIAMRoleARN Valid IAM role ARN attached to Amazon Redshift cluster AWS IAM role ARN associated with the Amazon Redshift cluster

Create Stack 2

  1. Click Next and a new page appears. Accept the default values in the page and click Next. On the last page check the box to acknowledge resources might be created and click on Create stack.
    Create Stack 3
  2. Monitor the progress of the stack creation and wait until it is complete.
  3. The stack creation should complete approximately within 5 minutes.
  4. Navigate to Amazon Redshift console.
  5. Launch Amazon Redshift query editor v2 and connect to your cluster.
  6. Browse to the database name provided in the parameters while creating the cloudformation template e.g., dev, public schema and expand Tables. You should see the tables as shown below.
    Redshift Query Editor v2 1
  7. Validate the sample data by running the following SQL query and confirm the row count match above the screenshot.
select 'customer',count(*) from public.customer
union all
select 'fact_yearly_sale',count(*) from public.fact_yearly_sale
union all
select 'lineitem',count(*) from public.lineitem
union all
select 'nation',count(*) from public.nation
union all
select 'orders',count(*) from public.orders
union all
select 'supplier',count(*) from public.supplier

Run the ELT orchestration

  1. After you deploy the CloudFormation template, navigate to the stack detail page. On the Resources tab, choose the link for DynamoDBETLAuditTable to be redirected to the DynamoDB console.
  2. Navigate to Tables and click on table name beginning with <stackname>-DynamoDBETLAuditTable. In this demo, the stack name is DemoETLOrchestration, so the table name will begin with DemoETLOrchestration-DynamoDBETLAuditTable.
  3. It will expand the table. Click on Explore table items.
  4. Here you can see the current status of the job, which will be in Ready status.
    DynamoDB 1
  5. Navigate again to stack detail page on the CloudFormation console. On the Resources tab, choose the link for RedshiftETLStepFunction to be redirected to the Step Functions console.
    CFN Stack Resources
  6. Click Start Execution. When it successfully completes, all steps will be marked as green.
    Step Function Running
  7. While the job is running, navigate back to DemoETLOrchestration-DynamoDBETLAuditTable in the DynamoDB console screen. You will see JobState as Running with JobStart time.
    DynamoDB 2
  1. After Step Functions completes, JobState will be changed to Ready with JobStart and JobEnd time.
    DynamoDB 3

Handling failure

In the real world sometimes, the ELT process can fail due to unexpected data anomalies or object related issues. In that case, the step function execution will also fail with the failed step marked in red as shown in the screenshot below:
Step Function 2

Once you identify and fix the issue, please follow the below steps to restart the step function:

  1. Navigate to the DynamoDB table beginning with DemoETLOrchestration-DynamoDBETLAuditTable. Click on Explore table items and select the row with the specific JobID for the failed job.
  2. Go to Action and select Edit item to modify the JobState to Ready as shown below:
    DynamoDB 4
  3. Follow steps 5 and 6 under the “Run the ELT orchestration” section to restart execution of the step function.

Validate the ELT orchestration

The step function loads the dimension tables public.supplier and public.customer and the fact table public.fact_yearly_sale. To validate the orchestration, the process steps are as follows:

  1. Navigate to the Amazon Redshift console.
  2. Launch Amazon Redshift query editor v2 and connect to your cluster.
  3. Browse to the database name provided in the parameters while creating the cloud formation template e.g., dev, public schema.
  4. Validate the data loaded by Step Functions by running the following SQL query and confirm the row count to match as follows:
select 'customer',count(*) from public.customer
union all
select 'fact_yearly_sale',count(*) from public.fact_yearly_sale
union all
select 'supplier',count(*) from public.supplier

Redshift Query Editor v2 2

Schedule the ELT orchestration

The steps are as follows to schedule the Step Functions:

  1. Navigate to the Amazon EventBridge console and choose Create rule.
    Event Bridge 1
  1. Under Name, enter a meaningful name, for example, Trigger-Redshift-ELTStepFunction.
  2. Under Event bus, choose default.
  3. Under Rule Type, select Schedule.
  4. Click on Next.
    Event Bridge 2
  5. Under Schedule pattern, select A schedule that runs at a regular rate, such as every 10 minutes.
  6. Under Rate expression, enter Value as 5 and choose Unit as Minutes.
  7. Click on Next.
    Event Bridge 3
  8. Under Target types, choose AWS service.
  9. Under Select a Target, choose Step Functions state machine.
  10. Under State machine, choose the step function created by the CloudFormation template.
  11. Under Execution role, select Create a new role for this specific resource.
  12. Click on Next.
    Event Bridge 4
  13. Review the rule parameters and click on Create Rule.

After the rule has been created, it will automatically trigger the step function every 5 minutes to perform ELT processing in Amazon Redshift.

Clean up

Please note that deploying a CloudFormation template incurs cost. To avoid incurring future charges, delete the resources you created as part of the CloudFormation stack by navigating to the AWS CloudFormation console, selecting the stack, and choosing Delete.

Conclusion

In this post, we described how to easily implement a modern, serverless, highly scalable, and cost-effective ELT workflow orchestration process in Amazon Redshift using AWS Step Functions, Amazon DynamoDB and Amazon Redshift Data API. As an alternate solution, you can also use Amazon Redshift for metadata management instead of using Amazon DynamoDB. As part of this demo, we show how a single job entry in DynamoDB gets updated for each run, but you can also modify the solution to maintain a separate audit table with the history of each run for each job, which would help with debugging or historical tracking purposes. Step Functions manage failures, retries, parallelization, service integrations, and observability so your developers can focus on higher-value business logic. Step Functions can integrate with Amazon SNS to send notifications in case of failure or success of the workflow. Please follow this AWS Step Functions documentation to implement the notification mechanism.


About the Authors

Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.

Raks KhareRaks Khare is an Analytics Specialist Solutions Architect at AWS based out of Pennsylvania. He helps customers architect data analytics solutions at scale on the AWS platform.

Tahir Aziz is an Analytics Solution Architect at AWS. He has worked with building data warehouses and big data solutions for over 13 years. He loves to help customers design end-to-end analytics solutions on AWS. Outside of work, he enjoys traveling
and cooking.

Add your own libraries and application dependencies to Spark and Hive on Amazon EMR Serverless with custom images

Post Syndicated from Veena Vasudevan original https://aws.amazon.com/blogs/big-data/add-your-own-libraries-and-application-dependencies-to-spark-and-hive-on-amazon-emr-serverless-with-custom-images/

Amazon EMR Serverless allows you to run open-source big data frameworks such as Apache Spark and Apache Hive without managing clusters and servers. Many customers who run Spark and Hive applications want to add their own libraries and dependencies to the application runtime. For example, you may want to add popular open-source extensions to Spark, or add a customized encryption-decryption module that is used by your application.

We are excited to announce a new capability that allows you to customize the runtime image used in EMR Serverless by adding custom libraries that your applications need to use. This feature enables you to do the following:

  • Maintain a set of version-controlled libraries that are reused and available for use in all your EMR Serverless jobs as part of the EMR Serverless runtime
  • Add popular extensions to open-source Spark and Hive frameworks such as pandas, NumPy, matplotlib, and more that you want your EMR serverless application to use
  • Use established CI/CD processes to build, test, and deploy your customized extension libraries to the EMR Serverless runtime
  • Apply established security processes, such as image scanning, to meet the compliance and governance requirements within your organization
  • Use a different version of a runtime component (for example the JDK runtime or the Python SDK runtime) than the version that is available by default with EMR Serverless

In this post, we demonstrate how to use this new feature.

Solution Overview

To use this capability, customize the EMR Serverless base image using Amazon Elastic Container Registry (Amazon ECR), which is a fully managed container registry that makes it easy for your developers to share and deploy container images. Amazon ECR eliminates the need to operate your own container repositories or worry about scaling the underlying infrastructure. After the custom image is pushed to the container registry, specify the custom image while creating your EMR Serverless applications.

The following diagram illustrates the steps involved in using custom images for your EMR Serverless applications.

In the following sections, we demonstrate using custom images with Amazon EMR Serverless to address three common use cases:

  • Add popular open-source Python libraries into the EMR Serverless runtime image
  • Use a different or newer version of the Java runtime for the EMR Serverless application
  • Install a Prometheus agent and customize the Spark runtime to push Spark JMX metrics to Amazon Managed Service for Prometheus, and visualize the metrics in a Grafana dashboard

General prerequisites

The following are the prerequisites to use custom images with EMR Serverless. Complete the following steps before proceeding with the subsequent steps:

  1. Create an AWS Identity and Access Management (IAM) role with IAM permissions for Amazon EMR Serverless applications, Amazon ECR permissions, and Amazon S3 permissions for the Amazon Simple Storage Service (Amazon S3) bucket aws-bigdata-blog and any S3 bucket in your account where you will store the application artifacts.
  2. Install or upgrade to the latest AWS Command Line Interface (AWS CLI) version and install the Docker service in an Amazon Linux 2 based Amazon Elastic Compute Cloud (Amazon EC2) instance. Attach the IAM role from the previous step for this EC2 instance.
  3. Select a base EMR Serverless image from the following public Amazon ECR repository. Run the following commands on the EC2 instance with Docker installed to verify that you are able to pull the base image from the public repository:
    # If docker is not started already, start the process
    $ sudo service docker start 
    
    # Check if you are able to pull the latest EMR 6.9.0 runtime base image 
    $ sudo docker pull public.ecr.aws/emr-serverless/spark/emr-6.9.0:latest

  4. Log in to Amazon ECR with the following commands and create a repository called emr-serverless-ci-examples, providing your AWS account ID and Region:
    $ sudo aws ecr get-login-password --region <region> | sudo docker login --username AWS --password-stdin <your AWS account ID>.dkr.ecr.<region>.amazonaws.com
    
    $ aws ecr create-repository --repository-name emr-serverless-ci-examples --region <region>

  5. Provide IAM permissions to the EMR Serverless service principal for the Amazon ECR repository:
    1. On the Amazon ECR console, choose Permissions under Repositories in the navigation pane.
    2. Choose Edit policy JSON.
    3. Enter the following JSON and save:
      {
        "Version": "2012-10-17",
        "Statement": [
          {
            "Sid": "Emr Serverless Custom Image Support",
            "Effect": "Allow",
            "Principal": {
              "Service": "emr-serverless.amazonaws.com"
            },
            "Action": [
              "ecr:BatchGetImage",
              "ecr:DescribeImages",
              "ecr:GetDownloadUrlForLayer"
            ]
          }
        ]
      }

Make sure that the policy is updated on the Amazon ECR console.

For production workloads, we recommend adding a condition in the Amazon ECR policy to ensure only allowed EMR Serverless applications can get, describe, and download images from this repository. For more information, refer to Allow EMR Serverless to access the custom image repository.

In the next steps, we create and use custom images in our EMR Serverless applications for the three different use cases.

Use case 1: Run data science applications

One of the common applications of Spark on Amazon EMR is the ability to run data science and machine learning (ML) applications at scale. For large datasets, Spark includes SparkML, which offers common ML algorithms that can be used to train models in a distributed fashion. However, you often need to run many iterations of simple classifiers to fit for hyperparameter tuning, ensembles, and multi-class solutions over small-to-medium-sized data (100,000 to 1 million records). Spark is a great engine to run multiple iterations of such classifiers in parallel. In this example, we demonstrate this use case, where we use Spark to run multiple iterations of an XGBoost model to select the best parameters. The ability to include Python dependencies in the EMR Serverless image should make it easy to make the various dependencies (xgboost, sk-dist, pandas, numpy, and so on) available for the application.

Prerequisites

The EMR Serverless job runtime IAM role should be given permissions to your S3 bucket where you will be storing your PySpark file and application logs:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AccessToS3Buckets",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::<YOUR-BUCKET>",
                "arn:aws:s3:::<YOUR-BUCKET>/*"
            ]
        }
    ]
}

Create an image to install ML dependencies

We create a custom image from the base EMR Serverless image to install dependencies required by the SparkML application. Create the following Dockerfile in your EC2 instance that runs the docker process inside a new directory named datascience:

FROM public.ecr.aws/emr-serverless/spark/emr-6.9.0:latest

USER root

# python packages
RUN pip3 install boto3 pandas numpy
RUN pip3 install -U scikit-learn==0.23.2 scipy 
RUN pip3 install sk-dist
RUN pip3 install xgboost
RUN sed -i 's|import Parallel, delayed|import Parallel, delayed, logger|g' /usr/local/lib/python3.7/site-packages/skdist/distribute/search.py

# EMRS will run the image as hadoop
USER hadoop:hadoop

Build and push the image to the Amazon ECR repository emr-serverless-ci-examples, providing your AWS account ID and Region:

# Build the image locally. This command will take a minute or so to complete
sudo docker build -t local/emr-serverless-ci-ml /home/ec2-user/datascience/ --no-cache --pull
# Create tag for the local image
sudo docker tag local/emr-serverless-ci-ml:latest <your AWS account ID>.dkr.ecr.<region>.amazonaws.com/emr-serverless-ci-examples:emr-serverless-ci-ml
# Push the image to Amazon ECR. This command will take a few seconds to complete
sudo docker push <your AWS account ID>.dkr.ecr.<region>.amazonaws.com/emr-serverless-ci-examples:emr-serverless-ci-ml

Submit your Spark application

Create an EMR Serverless application with the custom image created in the previous step:

aws --region <region>  emr-serverless create-application \
    --release-label emr-6.9.0 \
    --type "SPARK" \
    --name data-science-with-ci \
    --image-configuration '{ "imageUri": "<your AWS account ID>.dkr.ecr.<region>.amazonaws.com/emr-serverless-ci-examples:emr-serverless-ci-ml" }'

Make a note of the value of applicationId returned by the command.

After the application is created, we’re ready to submit our job. Copy the application file to your S3 bucket:

aws s3 cp s3://aws-bigdata-blog/artifacts/BDB-2771/code/emrserverless-xgboost-spark-example.py s3://<YOUR BUCKET>/<PREFIX>/emrserverless-xgboost-spark-example.py

Submit the Spark data science job. In the following command, provide the name of the S3 bucket and prefix where you stored your application file. Additionally, provide the applicationId value obtained from the create-application command and your EMR Serverless job runtime IAM role ARN.

aws emr-serverless start-job-run \
        --region <region> \
        --application-id <applicationId> \
        --execution-role-arn <jobRuntimeRole> \
        --job-driver '{
            "sparkSubmit": {
                "entryPoint": "s3://<YOUR BUCKET>/<PREFIX>/emrserverless-xgboost-spark-example.py"
            }
        }' \
        --configuration-overrides '{
              "monitoringConfiguration": {
                "s3MonitoringConfiguration": {
                  "logUri": "s3://<YOUR BUCKET>/emrserverless/logs"
                }
              }
            }'

After the Spark job succeeds, you can view the best model estimates from our application by viewing the Spark driver’s stdout logs. Navigate to Spark History Server, Executors, Driver, Logs, stdout.

Use case 2: Use a custom Java runtime environment

Another use case for custom images is the ability to use a custom Java version for your EMR Serverless applications. For example, if you’re using Java11 to compile and package your Java or Scala applications, and try to run them directly on EMR Serverless, it may lead to runtime errors because EMR Serverless uses Java 8 JRE by default. To make the runtime environments of your EMR Serverless applications compatible with your compile environment, you can use the custom images feature to install the Java version you are using to package your applications.

Prerequisites

An EMR Serverless job runtime IAM role should be given permissions to your S3 bucket where you will be storing your application JAR and logs:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AccessToS3Buckets",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::<YOUR-BUCKET>",
                "arn:aws:s3:::<YOUR-BUCKET>/*"
            ]
        }
    ]
}

Create an image to install a custom Java version

We first create an image that will install a Java 11 runtime environment. Create the following Dockerfile in your EC2 instance inside a new directory named customjre:

FROM public.ecr.aws/emr-serverless/spark/emr-6.9.0:latest

USER root

# Install JDK 11
RUN amazon-linux-extras install java-openjdk11

# EMRS will run the image as hadoop
USER hadoop:hadoop

Build and push the image to the Amazon ECR repository emr-serverless-ci-examples, providing your AWS account ID and Region:

sudo docker build -t local/emr-serverless-ci-java11 /home/ec2-user/customjre/ --no-cache --pull
sudo docker tag local/emr-serverless-ci-java11:latest <your AWS account ID>.dkr.ecr.<region>.amazonaws.com/emr-serverless-ci-examples:emr-serverless-ci-java11
sudo docker push <your AWS account ID>.dkr.ecr.<region>.amazonaws.com/emr-serverless-ci-examples:emr-serverless-ci-java11

Submit your Spark application

Create an EMR Serverless application with the custom image created in the previous step:

aws --region <region>  emr-serverless create-application \
    --release-label emr-6.9.0 \
    --type "SPARK" \
    --name custom-jre-with-ci \
    --image-configuration '{ "imageUri": "<your AWS account ID>.dkr.ecr.<region>.amazonaws.com/emr-serverless-ci-examples:emr-serverless-ci-java11" }'

Copy the application JAR to your S3 bucket:

aws s3 cp s3://aws-bigdata-blog/artifacts/BDB-2771/code/emrserverless-custom-images_2.12-1.0.jar s3://<YOUR BUCKET>/<PREFIX>/emrserverless-custom-images_2.12-1.0.jar

Submit a Spark Scala job that was compiled with Java11 JRE. This job also uses Java APIs that may produce different results for different versions of Java (for example: java.time.ZoneId). In the following command, provide the name of the S3 bucket and prefix where you stored your application JAR. Additionally, provide the applicationId value obtained from the create-application command and your EMR Serverless job runtime role ARN with IAM permissions mentioned in the prerequisites. Note that in the sparkSubmitParameters, we pass a custom Java version for our Spark driver and executor environments to instruct our job to use the Java11 runtime.

aws emr-serverless start-job-run \
        --region <region> \
        --application-id <applicationId> \
        --execution-role-arn <jobRuntimeRole> \
        --job-driver '{
            "sparkSubmit": {
                "entryPoint": "s3://<YOUR BUCKET>/<PREFIX>/emrserverless-custom-images_2.12-1.0.jar",
                "entryPointArguments": ["40000000"],
                "sparkSubmitParameters": "--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.16.0.8-1.amzn2.0.1.x86_64 --conf spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.16.0.8-1.amzn2.0.1.x86_64 --class emrserverless.customjre.SyntheticAnalysis"
            }
        }' \
        --configuration-overrides '{
              "monitoringConfiguration": {
                "s3MonitoringConfiguration": {
                  "logUri": "s3://<YOUR BUCKET>/emrserverless/logs"
                }
              }
            }'

You can also extend this use case to install and use a custom Python version for your PySpark applications.

Use case 3: Monitor Spark metrics in a single Grafana dashboard

Spark JMX telemetry provides a lot of fine-grained details about every stage of the Spark application, even at the JVM level. These insights can be used to tune and optimize the Spark applications to reduce job runtime and cost. Prometheus is a popular tool used for collecting, querying, and visualizing application and host metrics of several different processes. After the metrics are collected in Prometheus, we can query these metrics or use Grafana to build dashboards and visualize them. In this use case, we use Amazon Managed Prometheus to gather Spark driver and executor metrics from our EMR Serverless Spark application, and we use Grafana to visualize the collected metrics. The following screenshot is an example Grafana dashboard for an EMR Serverless Spark application.

Prerequisites

Complete the following prerequisite steps:

  1. Create a VPC, private subnet, and security group. The private subnet should have a NAT gateway or VPC S3 endpoint attached. The security group should allow outbound access to the HTTPS port 443 and should have a self-referencing inbound rule for all traffic.


    Both the private subnet and security group should be associated with the two Amazon Managed Prometheus VPC endpoint interfaces.
  2. On the Amazon Virtual Private Cloud (Amazon VPC) console, create two endpoints for Amazon Managed Prometheus and the Amazon Managed Prometheus workspace. Associate the endpoints to the VPC, private subnet, and security group to both endpoints. Optionally, provide a name tag for your endpoints and leave everything else as default.

  3. Create a new workspace on the Amazon Managed Prometheus console.
  4. Note the ARN and the values for Endpoint – remote write URL and Endpoint – query URL.
  5. Attach the following policy to your Amazon EMR Serverless job runtime IAM role to provide remote write access to your Prometheus workspace. Replace the ARN copied from the previous step in the Resource section of "Sid": "AccessToPrometheus". This role should also have permissions to your S3 bucket where you will be storing your application JAR and logs.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AccessToPrometheus",
                "Effect": "Allow",
                "Action": [
                    "aps:RemoteWrite"
                ],
                "Resource": "arn:aws:aps:<region>:<your AWS account>:workspace/<Workspace_ID>"
            }, {
                "Sid": "AccessToS3Buckets",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::<YOUR-BUCKET>",
                    "arn:aws:s3:::<YOUR-BUCKET>/*"
                ]
            }
        ]
    }

  6. Create an IAM user or role with permissions to create and query the Amazon Managed Prometheus workspace.

We use the same IAM user or role to authenticate in Grafana or query the Prometheus workspace.

Create an image to install the Prometheus agent

We create a custom image from the base EMR Serverless image to do the following:

  • Update the Spark metrics configuration to use PrometheusServlet to publish driver and executor JMX metrics in Prometheus format
  • Download and install the Prometheus agent
  • Upload the configuration YAML file to instruct the Prometheus agent to send the metrics to the Amazon Managed Prometheus workspace

Create the Prometheus config YAML file to scrape the driver, executor, and application metrics. You can run the following example commands on the EC2 instance.

  1. Copy the prometheus.yaml file from our S3 path:
    aws s3 cp s3://aws-bigdata-blog/artifacts/BDB-2771/prometheus-config/prometheus.yaml .

  2. Modify prometheus.yaml to replace the Region and value of the remote_write URL with the remote write URL obtained from the prerequisites:
    ## Replace your AMP workspace remote write URL 
    endpoint_url="https://aps-workspaces.<region>.amazonaws.com/workspaces/<ws-xxxxxxx-xxxx-xxxx-xxxx-xxxxxx>/api/v1/remote_write"
    
    ## Replace the remote write URL and region. Following is example for us-west-2 region. Modify the command for your region. 
    sed -i "s|region:.*|region: us-west-2|g" prometheus.yaml
    sed -i "s|url:.*|url: ${endpoint_url}|g" prometheus.yaml

  3. Upload the file to your own S3 bucket:
    aws s3 cp prometheus.yaml s3://<YOUR BUCKET>/<PREFIX>/

  4. Create the following Dockerfile inside a new directory named prometheus on the same EC2 instance that runs the Docker service. Provide the S3 path where you uploaded the prometheus.yaml file.
    # Pull base image
    FROM public.ecr.aws/emr-serverless/spark/emr-6.9.0:latest
    
    USER root
    
    # Install Prometheus agent
    RUN yum install -y wget && \
        wget https://github.com/prometheus/prometheus/releases/download/v2.26.0/prometheus-2.26.0.linux-amd64.tar.gz && \
        tar -xvf prometheus-2.26.0.linux-amd64.tar.gz && \
        rm -rf prometheus-2.26.0.linux-amd64.tar.gz && \
        cp prometheus-2.26.0.linux-amd64/prometheus /usr/local/bin/
    
    # Change Spark metrics configuration file to use PrometheusServlet
    RUN cp /etc/spark/conf.dist/metrics.properties.template /etc/spark/conf/metrics.properties && \
        echo -e '\
     *.sink.prometheusServlet.class=org.apache.spark.metrics.sink.PrometheusServlet\n\
     *.sink.prometheusServlet.path=/metrics/prometheus\n\
     master.sink.prometheusServlet.path=/metrics/master/prometheus\n\
     applications.sink.prometheusServlet.path=/metrics/applications/prometheus\n\
     ' >> /etc/spark/conf/metrics.properties
    
     # Copy the prometheus.yaml file locally. Change the value of bucket and prefix to where you stored your prometheus.yaml file
    RUN aws s3 cp s3://<YOUR BUCKET>/<PREFIX>/prometheus.yaml .
    
     # Create a script to start the prometheus agent in the background
    RUN echo -e '#!/bin/bash\n\
     nohup /usr/local/bin/prometheus --config.file=/home/hadoop/prometheus.yaml </dev/null >/dev/null 2>&1 &\n\
     echo "Started Prometheus agent"\n\
     ' >> /home/hadoop/start-prometheus-agent.sh && \ 
        chmod +x /home/hadoop/start-prometheus-agent.sh
    
     # EMRS will run the image as hadoop
    USER hadoop:hadoop

  5. Build the Dockerfile and push to Amazon ECR, providing your AWS account ID and Region:
    sudo docker build -t local/emr-serverless-ci-prometheus /home/ec2-user/prometheus/ --no-cache --pull
    sudo docker tag local/emr-serverless-ci-prometheus <your AWS account ID>.dkr.ecr.<region>.amazonaws.com/emr-serverless-ci-examples:emr-serverless-ci-prometheus
    sudo docker push <your AWS account ID>.dkr.ecr.<region>.amazonaws.com/emr-serverless-ci-examples:emr-serverless-ci-prometheus
    

Submit the Spark application

After the Docker image has been pushed successfully, you can create the serverless Spark application with the custom image you created. We use the AWS CLI to submit Spark jobs with the custom image on EMR Serverless. Your AWS CLI has to be upgraded to the latest version to run the following commands.

  1. In the following AWS CLI command, provide your AWS account ID and Region. Additionally, provide the subnet and security group from the prerequisites in the network configuration. In order to successfully push metrics from EMR Serverless to Amazon Managed Prometheus, make sure that you are using the same VPC, subnet, and security group you created based on the prerequisites.
    aws emr-serverless create-application \
    --name monitor-spark-with-ci \
    --region <region> \
    --release-label emr-6.9.0 \
    --type SPARK \
    --network-configuration subnetIds=<subnet-xxxxxxx>,securityGroupIds=<sg-xxxxxxx> \
    --image-configuration '{ "imageUri": "<your AWS account ID>.dkr.ecr.<region>.amazonaws.com/emr-serverless-ci-examples:emr-serverless-ci-prometheus" }'

  2. Copy the application JAR to your S3 bucket:
    aws s3 cp s3://aws-bigdata-blog/artifacts/BDB-2771/code/emrserverless-custom-images_2.12-1.0.jar s3://<YOUR BUCKET>/<PREFIX>/emrserverless-custom-images_2.12-1.0.jar

  3. In the following command, provide the name of the S3 bucket and prefix where you stored your application JAR. Additionally, provide the applicationId value obtained from the create-application command and your EMR Serverless job runtime IAM role ARN from the prerequisites, with permissions to write to the Amazon Managed Prometheus workspace.
    aws emr-serverless start-job-run \
        --region <region> \
        --application-id <applicationId> \
        --execution-role-arn <jobRuntimeRole> \
        --job-driver '{
            "sparkSubmit": {
                "entryPoint": "s3://<YOUR BUCKET>/<PREFIX>/emrserverless-custom-images_2.12-1.0.jar",
                "entryPointArguments": ["40000000"],
                "sparkSubmitParameters": "--conf spark.ui.prometheus.enabled=true --conf spark.executor.processTreeMetrics.enabled=true --class emrserverless.prometheus.SyntheticAnalysis"
            }
        }' \
        --configuration-overrides '{
              "monitoringConfiguration": {
                "s3MonitoringConfiguration": {
                  "logUri": "s3://<YOUR BUCKET>/emrserverless/logs"
                }
              }
            }'
    

Inside this Spark application, we run the bash script in the image to start the Prometheus process. You will need to add the following lines to your Spark code after initiating the Spark session if you’re planning to use this image to monitor your own Spark application:

import scala.sys.process._
Seq("/home/hadoop/start-prometheus-agent.sh").!!

For PySpark applications, you can use the following code:

import os
os.system("/home/hadoop/start-prometheus-agent.sh")

Query Prometheus metrics and visualize in Grafana

About a minute after the job changes to Running status, you can query Prometheus metrics using awscurl.

  1. Replace the value of AMP_QUERY_ENDPOINT with the query URL you noted earlier, and provide the job run ID obtained after submitting the Spark job. Make sure that you’re using the credentials of an IAM user or role that has permissions to query the Prometheus workspace before running the commands.
    $ export AMP_QUERY_ENDPOINT="https://aps-workspaces.<region>.amazonaws.com/workspaces/<Workspace_ID>/api/v1/query"
    $ awscurl -X POST --region <region> \
                              --service aps "$AMP_QUERY_ENDPOINT?query=metrics_<jobRunId>_driver_ExecutorMetrics_TotalGCTime_Value{}"
    

    The following is example output from the query:

    {
        "status": "success",
        "data": {
            "resultType": "vector",
            "result": [{
                "metric": {
                    "__name__": "metrics_00f6bueadgb0lp09_driver_ExecutorMetrics_TotalGCTime_Value",
                    "instance": "localhost:4040",
                    "instance_type": "driver",
                    "job": "spark-driver",
                    "spark_cluster": "emrserverless",
                    "type": "gauges"
                },
                "value": [1671166922, "271"]
            }]
        }
    }

  2. Install Grafana on your local desktop and configure our AMP workspace as a data source.Grafana is a commonly used platform for visualizing Prometheus metrics.
  3. Before we start the Grafana server, enable AWS SIGv4 authentication in order to sign queries to AMP with IAM permissions.
    ## Enable SIGv4 auth 
    export AWS_SDK_LOAD_CONFIG=true 
    export GF_AUTH_SIGV4_AUTH_ENABLED=true

  4. In the same session, start the Grafana server. Note that the Grafana installation path may vary based on your OS configurations. Modify the command to start the Grafana server in case your installation path is different from /usr/local/. Also, make sure that you’re using the credentials of an IAM user or role that has permissions to query the Prometheus workspace before running the following commands
    ## Start Grafana server
    grafana-server --config=/usr/local/etc/grafana/grafana.ini \
      --homepath /usr/local/share/grafana \
      cfg:default.paths.logs=/usr/local/var/log/grafana \
      cfg:default.paths.data=/usr/local/var/lib/grafana \
      cfg:default.paths.plugins=/usr/local/var/lib/grafana/plugin

  5. Log in to Grafana and go on the data sources configuration page /datasources to add your AMP workspace as a data source.The URL should be without the /api/v1/query at the end. Enable SigV4 auth, then choose the appropriate Region and save.

When you explore the saved data source, you can see the metrics from the application we just submitted.

You can now visualize these metrics and create elaborate dashboards in Grafana.

Clean up

When you’re done running the examples, clean up the resources. You can use the following script to delete resources created in EMR Serverless, Amazon Managed Prometheus, and Amazon ECR. Pass the Region and optionally the Amazon Managed Prometheus workspace ID as arguments to the script. Note that this script will not remove EMR Serverless applications in Running status.

aws s3 cp s3://aws-bigdata-blog/artifacts/BDB-2771/cleanup/cleanup_resources.sh .
chmod +x cleanup_resources.sh
sh cleanup_resources.sh <region> <AMP Workspace ID> 

Conclusion

In this post, you learned how to use custom images with Amazon EMR Serverless to address some common use cases. For more information on how to build custom images or view sample Dockerfiles, see Customizing the EMR Serverless image and Custom Image Samples.


About the Author

Veena Vasudevan is a Senior Partner Solutions Architect and an Amazon EMR specialist at AWS focusing on Big Data and Analytics. She helps customers and partners build highly optimized, scalable, and secure solutions; modernize their architectures; and migrate their Big Data workloads to AWS.

Code conversion from Greenplum to Amazon Redshift: Handling arrays, dates, and regular expressions

Post Syndicated from Jagrit Shrestha original https://aws.amazon.com/blogs/big-data/code-conversion-from-greenplum-to-amazon-redshift-handling-arrays-dates-and-regular-expressions/

Amazon Redshift is a fully managed service for data lakes, data analytics, and data warehouses for startups, medium enterprises, and large enterprises. Amazon Redshift is used by tens of thousands of businesses around the globe for modernizing their data analytics platform.

Greenplum is an open-source, massively parallel database used for analytics, mostly for on-premises infrastructure. Greenplum is based on the PostgreSQL database engine.

Many customers have found migration to Amazon Redshift from Greenplum an attractive option instead of managing on-premises Greenplum for the following reasons:

Even though both Greenplum and Amazon Redshift use the open-source PostgreSQL database engine, migration still requires a lot of planning and manual intervention. This post covers the key functions and considerations while performing code conversion from Greenplum to Amazon Redshift. It is focused on the migration of procedures, functions, and views.

Solution overview

AWS Database Migration Service (AWS DMS) and the AWS Schema Conversion Tool (AWS SCT) can migrate most of the objects in a heterogeneous database migration from Greenplum to Amazon Redshift. But there are some situations where code conversion teams encounter errors and warnings for views, procedures, and functions while creating them in Amazon Redshift. To address this type of situation, manual conversion of the code is required.

The posts focuses on how to handle the following while migrating from Greenplum to Amazon Redshift:

  • Arrays
  • Dates and timestamps
  • Regular expressions (regex)

Please note that for this post, we use Greenplum 4.3 and Amazon Redshift PostgreSQL 8.2.

Working with array functions

The AWS SCT doesn’t convert array functions while migrating from Greenplum or PostgreSQL to Amazon Redshift. Developers need to extensively convert those functions manually. This post outlines the most common array functions:

  • ARRAY_UPPER
  • JSON_EXTACT_ARRAY_ELEMENT_TEXT and JSON_ARRAY_LENGTH
  • UNNEST ()
  • STRING_AGG()
  • ANY ARRAY()

ARRAY_UPPER()

This function returns the upper bound of an array. It can be used to extract the nth element from an array in PostgreSQL or Greenplum.

The Greenplum code is as follows:

With temp1 as
(
Select 'John' as FirstName, 'Smith' as LastName ,
array['"111-222-3333"','"101-201-3001"','"XXX-YYY-ZZZZ"','NULL'] as PhoneNumbers
union all
Select 'Bob' as FirstName, 'Haris' as LastName ,
array['222-333-4444','201-301-4001','AAA-BBB-CCCC'] as PhoneNumbers
union all
Select 'Mary' as FirstName, 'Jane' as LastName ,
array['333-444-5555','301-401-3001','DDD-EEE-FFFF'] as PhoneNumbers
)
Select Firstname, PhoneNumbers[ARRAY_UPPER(PhoneNumbers,1)]

There is no function to extract an element from an array in Amazon Redshift; however, there are two JSON functions that can be used for this purpose:

  • JSON_EXTRACT_ARRAY_ELEMENT_TEXT() – Returns a JSON array element in the outermost array of a JSON string
  • JSON_ARRAY_LENGTH() – Returns the number of elements in the outer array of a JSON string

See the following code:

With temp1 as
(
Select 'John' as FirstName, 'Smith' as LastName ,
array['"111-222-3333"','"101-201-3001"','"XXX-YYY-ZZZZ"'] as PhoneNumbers
union all
Select 'Bob' as FirstName, 'Haris' as LastName ,
array['"222-333-4444"','"201-301-4001"','"AAA-BBB-CCCC"'] as PhoneNumbers
union all
Select 'Mary' as FirstName, 'Jane' as LastName ,
array['"333-444-5555"','"301-401-3001"','"DDD-EEE-FFFF"'] as PhoneNumbers
)

Select
FirstName
,('['+array_to_string(phoneNumbers,',')+']') as JSONConvertedField
,JSON_EXTRACT_ARRAY_ELEMENT_TEXT
(
'['+array_to_string(phoneNumbers,',')+']'
,JSON_ARRAY_LENGTH('['+array_to_string(phoneNumbers,',')+']')-1
) as LastElementFromArray
from temp1

UNNEST()

UNNEST() is PostgreSQL’s system function for semi-structured data, expanding an array, or a combination of arrays to a set of rows. It is introduced to improve the database performance of thousands or records for inserts, updates, and deletes.

You can use UNNEST() for basic array, multiple arrays, and multiple arrays with different lengths.

Some of Amazon Redshift functions used to unnest arrays are split_part, json_extract_path_text, json_array_length, and json_extract_array_element_text.

In Greenplum, the UNNEST function is used to expand an array to a set of rows:

Select ‘A’,unnest(array([1,2])

Output
A 1
A 2

with temp1 as
(
Select 'John' as FirstName, 'Smith' as LastName ,
'111-222-3333' as Mobilephone,'101-201-3001' as HomePhone
union all
Select 'Bob' as FirstName, 'Haris' as LastName ,
'222-333-4444' as Mobilephone,'201-301-4001' as HomePhone
union all
Select 'Mary' as FirstName, 'Jane' as LastName ,
'333-444-5555' as Mobilephone,'301-401-3001' as HomePhone
)

select
FirstName
,LastName
,unnest(array[‘Mobile’::text,’HomePhone’::text]) as PhoneType
,unnest(array[MobilePhone::text,HomePhone::text]) as PhoneNumber
from
temp1
order by 1,2,3

Amazon Redshift doesn’t support the UNNEST function; you can use the following workaround:

with temp1 as
(
Select 'John' as FirstName, 'Smith' as LastName ,
'111-222-3333' as Mobilephone,'101-201-3001' as HomePhone
union all
Select 'Bob' as FirstName, 'Haris' as LastName ,
'222-333-4444' as Mobilephone,'201-301-4001' as HomePhone
union all
Select 'Mary' as FirstName, 'Jane' as LastName ,
'333-444-5555' as Mobilephone,'301-401-3001' as HomePhone
),
ns as
(
Select row_number() over(order by 1) as n from pg_tables
)

Select
FirstName
,LastName
,split_part('Mobile,Home',',',ns.n::int) as PhoneType
,split_part(MobilePhone|| '&&' || HomePhone, '&&', ns.n::int) as PhoneNumber
from
temp1, ns
where
ns.n<=regexp_count('Mobile,Home',',')+1
order by 1,2,3

When the element of array is in the form of array itself, use the JSON_EXTRACT_ARRAY_ELEMENT_TEXT() function and JSON_ARRAY_LENGTH:

with ns as
(
Select row_number() over(order by 1) as n from pg_tables
)

Select JSON_EXTRACT_ARRAY_ELEMENT_TEXT('["arrayelement1","arrayelement2"]',ns.n-1)
from ns
where
ns.n<=JSON_ARRAY_LENGTH('["arrayelement1","arrayelement2"]')

STRING_AGG()

The STRING_AGG() function is an aggregate function that concatenates a list of strings and places a separator between them. The function doesn’t add the separator at the end of the string. See the following code:

STRING_AGG ( expression, separator [order_by_clause] )

The Greenplum code is as follows:

with temp1 as
(
Select 'Finance'::text as Dept, 'John'::text as FirstName, 'Smith'::text as LastName
union all
Select 'Finance'::text as Dept, 'John'::text as FirstName, 'Doe'::text as LastName
union all
Select 'Finance'::text as Dept, 'Mary'::text as FirstName, 'Jane'::text as LastName
union all
Select 'Marketing'::text as Dept, 'Bob'::text as FirstName, 'Smith'::text as LastName
union all
Select 'Marketing'::text as Dept, 'Steve'::text as FirstName, 'Smith'::text as LastName
union all
Select 'Account'::text as Dept, 'Phil'::text as FirstName, 'Adams'::text as LastName
union all
Select 'Account'::text as Dept, 'Jim'::text as FirstName, 'Smith'::text as LastName
)
Select dept,STRING_AGG(FirstName||' '||LastName,' ; ') as Employees from temp1 group by dept order by 1

The Amazon Redshift equivalent for the STRING_AGG() function is LISTAGG(). This aggregate function orders the rows for that group according to the ORDER BY expression, then concatenates the values into a single string:

LISTAGG(expression, separator [order_by_clause])

See the following code:

Create temporary Table temp1 as
(
Select 'Finance'::text as Dept, 'John'::text as FirstName, 'Smith'::text as LastName
union all
Select 'Finance'::text as Dept, 'John'::text as FirstName, 'Doe'::text as LastName
union all
Select 'Finance'::text as Dept, 'Mary'::text as FirstName, 'Jane'::text as LastName
union all
Select 'Marketing'::text as Dept, 'Bob'::text as FirstName, 'Smith'::text as LastName
union all
Select 'Marketing'::text as Dept, 'Steve'::text as FirstName, 'Smith'::text as LastName
union all
Select 'Account'::text as Dept, 'Phil'::text as FirstName, 'Adams'::text as LastName
union all
Select 'Account'::text as Dept, 'Jim'::text as FirstName, 'Smith'::text as LastName
)

Select dept,LISTAGG(FirstName||' '||LastName,' ; ') as Employees from temp1
group by dept
order by 1

ANY ARRAY()

The PostgreSQL ANY ARRAY() function evaluates and compare the left-hand expression to each element in array:

Select * from temp1 where DeptName = ANY ARRAY('10-F','20-F','30-F')

In Amazon Redshift, the evaluation can be achieved with an IN operator:

Select * from temp1 where DeptName IN ('10-F','20-F','30-F')

Working with date functions

In this section, we discuss calculating the difference between date_part for Greenplum and datediff for Amazon Redshift.

When the application needs to calculate the difference between the subfields of dates for Greenplum, it uses the function date_part, which allows you to retrieve subfields such as year, month, week, and day. In the following example queries, we calculate the number of completion_days by calculating the difference between originated_date and eco_date.

To calculate the difference between the subfields of the date, Amazon Redshift has the function datediff. The following queries show an example of how to calculate the completion_days as the difference between eco_date and orginated_date. DATEDIFF determines the number of date part boundaries that are crossed between the two expressions.

We compare the Greenplum and Amazon Redshift queries as follows:

  • Difference by year

The following Greenplum query returns 1 year between 2009-01-01 and 2009-12-31:

SELECT date_part(‘year’, TIMESTAMP ‘2009-01-01’) - date_part(‘year’, 2008-12-31’) as year;

The following Amazon Redshift query returns 1 year between 2009-01-01 and 2009-12-31:

SELECT datediff (year, ‘2008-12-31’ , ‘2009-01-01’ ) as year;
  • Difference by month

The following Greenplum query returns 1 month between 2009-01-01 and 2008-12-31:

SELECT (date_part(‘year’, ‘2009-01-01’ :: date) - date_part(‘year’, ‘2008-12-31’ :: date)) * 12 +<br />(date_part(‘month’, ‘2009-01-01’) - date_part(‘month’, ‘2008-12-31’ :: date)) as month;

The following Amazon Redshift query returns 1 month between 2009-01-01 and 2008-12-31:

SELECT datediff( month, ‘2008-12-31’ , ‘2009-01-01’ ) as month;
  • Difference by week

The following Greenplum query returns 0 weeks between 2009-01-01 and 2009-12-31:

SELECT date_part(‘week’, timestamp ‘2009-01-01’ ) - date_part(‘week’, timestamp ‘2008-12-31’) as week;

The following Amazon Redshift query returns 0 weeks between 2009-01-01 and 2009-12-31:

SELECT datediff( week, ‘2008-12-31’ , ‘2009-01-01’ ) as week;
  • Difference by day

The following Greenplum query returns 1 day:

SELECT date_part ('day', '2009-01-01 24:00:00' :: timestamp - '2008-12-31 24:00:00 :: timestamp) as day;

The following Amazon Redshift query returns 1 day:

SELECT datediff (day, ‘2008-12-31’, ‘2009-01-01’) as day;
  • Difference by hour

The following Greenplum query returns 1 hour:

SELECT date_part(‘hour’, ‘2009-01-01 22:56:10’ :: timestamp - ‘2008-12-31 21:54:55' :: timestamp)

The following Amazon Redshift query returns 1 hour:

SELECT datediff (hour, ‘2009-01-01 21:56:10’, ‘2009-01-01’ ) as hour;
  • Difference by minute

The following Greenplum query returns 3 minutes:

SELECT date_part(‘minute’, ‘2009-01-01 22:56:10’ :: timestamp - ‘2009-01-01 21:53:10’ :: timestamp) as minutes;

The following Amazon Redshift query returns 1 minute:

SELECT datediff(minute, ‘2009-01-01 21:56:10’, ‘2009-01-01 21:57:55’) as minute;
  • Difference by second

The following Greenplum query returns 40 seconds:

SELECT date_part(‘second’, ‘2009-01-01 22:56:50’ :: timestamp - ‘2009-01-01 21:53:10’ : : timestamp) as seconds;

The following Amazon Redshift query returns 45 seconds:

SELECT datediff(second, ‘2009-01-01 21:56:10’, ‘2009-01-01- 21:56:55’) as seconds;

Now let’s look at how we use Amazon Redshift to calculate days and weeks in seconds.

The following Amazon Redshift query displays 2 days:

SELECT datediff(second, ‘2008-12-30 21:56:10’, ‘2009-01-01- 21:56:55’)/(60*60*24) as days;

The following Amazon Redshift query displays 9 weeks:

SELECT datediff(second, ‘2008-10-30 21:56:10’, ‘2009-01-01- 21:56:55’)/(60*60*24*7) as weeks;

For Greenplum, the date subfields need to be in single quotes, whereas for Amazon Redshift, we can use date subfields such as year, month, week, day, minute, second without quotes. For Greenplum, we have to subtract the subfield from one part to another part, whereas for Amazon Redshift we can use commas to separate the two dates.

Extract ISOYEAR from date

ISOYEAR 8601 is a week-numbering year. It begins with the Monday of the week containing the 4th of January. So for the date of early January or late December, the ISO year may be different from the Gregorian year. ISO year has 52 or 53 full weeks (364 or 371 days). The extra week is called a leap week; a year with such a week is called a leap year.

The following Greenplum query displays the ISOYEAR 2020:

SELECT extract (ISOYEAR from ‘2019-12-30’ :: date) as ISOYEARS;

The following Amazon Redshift query displays the ISOYEAR 2020:

SELECT to_char(‘2019-12-30’ :: date, ‘IYYYY’) as ISOYEARS;

Function to generate_series()

Greenplum has adopted the PostgreSQL function generate_series(). But the generate_series function works differently with Amazon Redshift while retrieving records from the table because it’s a leader node-only function.

To display a series of numbers in Amazon Redshift, run the following query on the leader node. In this example, it displays 10 rows, numbered 1–10:

SELECT generate_series(1,10);

To display a series of days for a given date, use the following query. It extracts the day from the given date and subtracts 1, to display a series of numbers from 0–6:

SELECT generate_series(0, extract(day from date ‘2009-01-07’) :: int -1);

But for the queries fetching the record from the table, joining with another table’s row, and processing data at the compute node, it doesn’t work, and generates an error message with Invalid Operation. The following code is an example of a SQL statement that works for Greenplum but fails for Amazon Redshift:

SELECT column_1,
FROM table_1t1
JOIN table_2 t2
ON t2.code = t1.code
CROSS JOIN generate_series(1,12) gen(fiscal_month)
WHERE condition_1

For Amazon Redshift, the solution is to create a table to store the series data, and rewrite the code as follows:

SELECT column1,
FROM table_t1 t1
JOIN table_t2 t2
ON t2.code = t1.code
CROSS JOIN (select “number” as fiscal_month FROM table_t3 WHERE “number”<=12) gen
WHERE condition_1

Working with regular expressions (regex functions)

Amazon Redshift and Greenplum both support three conditions for pattern matching:

  • LIKE
  • SIMILAR TO
  • POSIX operators

In this post, we don’t discuss all of these pattern matching in detail. Instead, we discuss a few regex functions and regex escape characters that aren’t supported by Amazon Redshift.

Regexp_split_to_table function

The Regex_split_to_table function splits a string using a POSIX regular expression pattern as delimiter.

This function has the following syntax:

Regexp_split_to_table(string,pattern [,flags])

For Greenplum, we use the following query:

select regexp_split_to_table ('bat,cat,hat',’\,’) as regexp_split_table_GP

For Amazon Redshift, the regexp_split_to_table function has to be converted using the Amazon Redshift split_part function:

SELECT column1,
FROM table_t1 t1
JOIN table_t2 t2
ON t2.code = t1.code
CROSS JOIN (select “number” as fiscal_month FROM table_t3 WHERE “number”<=12) gen
WHERE condition_1

Another way to convert regexp_split_to_table is as follows:

SELECT column1,
FROM table_t1 t1
JOIN table_t2 t2
ON t2.code = t1.code
CROSS JOIN (select “number” as fiscal_month FROM table_t3 WHERE “number”<=12) gen
WHERE condition_1

Substring from regex expressions

Substring (the string from the regex pattern) extracts the substring or value matching the pattern that is passed on. If there is no match, null is returned. For more information, refer to Pattern Matching.

We use the following code in Greenplum:

create temp table data1 ( col1 varchar );
insert into data1 values ('hellohowareyou 12\687687abcd');
select substring( col1 from '[A-Za-z]+$') from data1;
from data1

We can use the regexp_substr function to convert this code to Amazon Redshift. It returns the characters extracted from a string by searching for a regular expression pattern. The syntax is as follows:

REGEXP_SUBSTR ( source_string, pattern [, position [, occurrence [, parameters ] ] ] )
select regexp_substr( col1, '[A-Za-z]+$') as substring_from_rs from data1

Key points while converting regular expression escapes

The Postgres escape character E doesn’t work in Amazon Redshift. Additionally, the following Greenplum regular expression constraints aren’t supported in Amazon Redshift:

  • \m – Matches only at the beginning of a word
  • \y – Matches only at the beginning or end of a word

For Amazon Redshift, use \\< and \\>, or [[:<:]] and [[:>:]] instead.

Use the following code for Greenplum:

select col1,
case
when (col1) ~ E '\\m[0-9]{2}[A-Z]{1}[0-9]{1}' then
regexp_replace(col1, E '([0-9]{2})([A-Z]{1})([0-9]{1})',E '\\2')
else 'nothing'
end as regex_test
from temp1123

Use the following code for Amazon Redshift:

select col1,
case
when (col1) ~ '\\<[0-9]{2}[A-Z]{1}[0-9]{1}>\\' then
regexp_replace(col1,'([0-9]{2})([A-Z]{1})([0-9]{1})','\\2')
else 'nothing'
end as regex_test
from temp1123

OR

select col1,
case
when (col1) ~ '[[:<:]][0-9]{2}[A-Z]{1}[0-9]{1}[[:>:]]' then
regexp_replace(col1,'([0-9]{2})([A-Z]{1})([0-9]{1}) (.*)','\\2')
else 'nothing'
end as regex_test
from temp1123

Conclusion

For heterogeneous database migration from Greenplum to the Amazon Redshift, you can use AWS DMS and the AWS SCT to migrate most of the database objects, such as tables, views, stored procedures, and functions.

There are some situations in which one function is used for the source environment, and the target environment doesn’t support the same function. In this case, manual conversion is required to produce the same results set and complete the database migration.

In some cases, use of a new window function supported by the target environment proves more efficient for analytical queries to process petabytes of data.

This post included several situations where manual code conversion is required, which also improves the code efficiency and make queries efficient.

If you have any questions or suggestions, please share your feedback.


About the Authors

Jagrit Shrestha is a Database consultant at Amazon Web Services (AWS). He works as a database specialist helping customers migrate their on-premises database workloads to AWS and provide technical guidance.

Ishwar Adhikary is a Database Consultant at Amazon Web Services (AWS). He works closely with customers to modernize their database and application infrastructures. His focus area is migration of relational databases from On-premise data center to AWS Cloud.

Shrenik Parekh works as a Database Consultants at Amazon Web Services (AWS). He is expertise in database migration assessment, database migration, modernizing database environment with purpose-built database using AWS cloud database services. He is also focused on AWS web services for data analytics. In his spare time, he loves hiking, yoga and other outdoor activities.

Santhosh Meenhallimath is a Data Architect at AWS. He works on building analytical solutions, building data lakes and migrate Database into AWS.

Build a search application with Amazon OpenSearch Serverless

Post Syndicated from Aish Gunasekar original https://aws.amazon.com/blogs/big-data/build-a-search-application-with-amazon-opensearch-serverless/

In this post, we demonstrate how to build a simple web-based search application using the recently announced Amazon OpenSearch Serverless, a serverless option for Amazon OpenSearch Service that makes it easy to run petabyte-scale search and analytics workloads without having to think about clusters. The benefit of using OpenSearch Serverless as a backend for your search application is that it automatically provisions and scales the underlying resources based on the search traffic demands, so you don’t have to worry about infrastructure management. You can simply focus on building your search application and analyzing the results. OpenSearch Serverless is powered by the open-source OpenSearch project, which consists of a search engine, and OpenSearch Dashboards, a visualization tool to analyze your search results.

Solution overview

There are many ways to build a search application. In our example, we create a simple Java script front end and call Amazon API Gateway, which triggers an AWS Lambda function upon receiving user queries. As shown in the following diagram, API Gateway acts as a broker between the front end and the OpenSearch Serverless collection. When the user queries the front-end webpage, API Gateway passes requests to the Python Lambda function, which runs the queries on the OpenSearch Serverless collection and returns the search results.

To get started with the search application, you must first upload the relevant dataset, a movie catalog in this case, to the OpenSearch collection and index them to make them searchable.

Create a collection in OpenSearch Serverless

A collection in OpenSearch Serverless is a logical grouping of one or more indexes that represent a workload. You can create a collection using the AWS Management Console or AWS Software Development Kit (AWS SDK). Follow the steps in Preview: Amazon OpenSearch Serverless – Run Search and Analytics Workloads without Managing Clusters to create and configure a collection in OpenSearch Serverless.

Create an index and ingest data

After your collection is created and active, you can upload the movie data to an index in this collection. Indexes hold documents, and each document in this example represents a movie record. Documents are comparable to rows in the database table. Each document (the movie record) consists of 10 fields that are typically searched for in a movie catalog, like the director, actor, release date, genre, title, or plot of the movie. The following is a sample movie JSON document:

{
"directors": ["David Yates"],
"release_date": "2011-07-07T00:00:00Z",
"rating": 8.1,
"genres": ["Adventure", "Family", "Fantasy", "Mystery"],
"plot": "Harry, Ron and Hermione search for Voldemort's remaining Horcruxes in their effort to destroy the Dark Lord.",
"title": "Harry Potter and the Deathly Hallows: Part 2",
"rank": 131,
"running_time_secs": 7800,
"actors": ["Daniel Radcliffe", "Emma Watson", "Rupert Grint"],
"year": 2011
}

For the search catalog, you can upload the sample-movies.bulk dataset sourced from the Internet Movies Database (IMDb). OpenSearch Serverless offers the same ingestion pipeline and clients to ingest the data as OpenSearch Service, such as Fluentd, Logstash, and Postman. Alternatively, you can use the OpenSearch Dashboards Dev Tools to ingest and search the data without configuring any additional pipelines. To do so, log in to OpenSearch Dashboards using your SAML credentials and choose Dev tools.

To create a new index, use the PUT command followed by the index name:

PUT movies-index

A confirmation message is displayed upon successful creation of your index.

After the index is created, you can ingest documents into the index. OpenSearch provides the option to ingest multiple documents in one request using the _bulk request. Enter POST /_bulk in the left pane as shown in the following screenshot, then copy and paste the contents of the sample-movies.bulk file you downloaded earlier.

You have successfully created the movies index and uploaded 1,500 records into the catalog! Now let’s integrate the movie catalog with your search application.

Integrate the Lambda function with an OpenSearch Serverless endpoint

In this step, you create a Lambda function that queries the movie catalog in OpenSearch Serverless and returns the result. For more information, see our tutorial on creating a Lambda function for connecting to and querying an OpenSearch Service domain. You can reuse the same code by replacing the parameters to align to OpenSearch Serverless’s requirements. Replace <my-region> with your corresponding region (for example, us-west-2), use aoss instead of es for service, replace <hostname> with the OpenSearch collection endpoint, and <index-name> with your index (in this case, movies-index).

The following is a snippet of the Lambda code. You can find the complete code in the tutorial.

import boto3
import json
import requests
from requests_aws4auth import AWS4Auth

region = '<my-region>'
service = 'aoss'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)

host = '<hostname>' 
# The OpenSearch collection endpoint 
index = '<index-name>'
url = host + '/' + index + '/_search'

# Lambda execution starts here
def Lambda_handler(event, context):

This Lambda function returns a list of movies based on a search string (such as movie title, director, or actor) provided by the user.

Next, you need to configure the permissions in OpenSearch Serverless’s data access policy to let the Lambda function access the collection.

  1. On the Lambda console, navigate to your function.
  2. On the Configuration tab, in the Permissions section, under Execution role, copy the value for Role name.
  3. Add this role name as one of the principals of your movie-search collection’s data access policy.

Principals can be AWS Identity and Access Management (IAM) users, role ARNs, or SAML identities. These principals must be within the current AWS account.

After you add the role name as a principal, you can see the role ARN updated in your rule, as show in the following screenshot.

Now you can grant collection and index permissions to this principal.

For more details about data access policies, refer to Data access control for Amazon OpenSearch Serverless. Skipping this step or not running it correctly will result in permission errors, and your Lambda code won’t be able to query the movie catalog.

Configure API Gateway

API Gateway acts as a front door for applications to access the code running on Lambda. To create, configure, and deploy the API for the GET method, refer to the steps in the tutorial. For API Gateway to pass the requests to the Lambda function, configure it as a trigger to invoke the Lambda function.

The next step is to integrate it with the front end.

Test the web application

To build the front-end UI, you can download the following sample JavaScript web service. Open the scripts/search.js file and update the apigatewayendpoint variable to point to your API Gateway endpoint:

var apigatewayendpoint = 'https://kxxxxxxzzz.execute-api.us-west-2.amazonaws.com/opensearch-api-test/';
// Update this variable to point to your API Gateway endpoint.

You can access the front-end application by opening index.html in your browser. When the user runs a query on the front-end application, it calls API Gateway and Lambda to serve up the content hosted in the OpenSearch Serverless collection.

When you search the movie catalog, the Lambda function runs the following query:

    # Put the user query into the query DSL for more accurate search results.
    # Note that certain fields are boosted (^).
    query = {
        "size": 25,
        "query": {
            "multi_match": {
                "query": event['queryStringParameters']['q'],
                "fields": ["title", "plot", "actors"]
            }
        }
    }

The query returns documents based on a provided query string. Let’s look at the parameters used in the query:

  • size – The size parameter is the maximum number of documents to return. In this case, a maximum of 25 results is returned.
  • multi_match – You use a match query when matching larger pieces of text, especially when you’re using OpenSearch’s relevance to sort your results. With a multi_match query, you can query across multiple fields specified in the query.
  • fields – The list of fields you are querying.

In a search for “Harry Potter,” the document with the matching term both in the title and plot fields appears higher than other documents with the matching term only in the title field.

Congratulations! You have configured and deployed a search application fronted by API Gateway, running Lambda functions for the queries served by OpenSearch Serverless.

Clean up

To avoid unwanted charges, delete the OpenSearch Service collection, Lambda function, and API Gateway that you created.

Conclusion

In this post, you learned how to build a simple search application using OpenSearch Serverless. With OpenSearch Serverless, you don’t have to worry about managing the underlying infrastructure. OpenSearch Serverless supports the same ingestion and query APIs as the OpenSearch Project. You can quickly get started by ingesting the data into your OpenSearch Service collection, and then perform searches on the data using your web interface.

In subsequent posts, we dive deeper into many other search queries and features that you can use to make your search application even more effective.

We would love to hear how you are building your search applications today. If you’re just getting started with OpenSearch Serverless, we recommend getting hands-on with the Getting started with Amazon OpenSearch Serverless workshop.


About the authors

Aish Gunasekar is a Specialist Solutions architect with a focus on Amazon OpenSearch Service. Her passion at AWS is to help customers design highly scalable architectures and help them in their cloud adoption journey. Outside of work, she enjoys hiking and baking.

Pavani Baddepudi is a senior product manager working in search services at AWS. Her interests include distributed systems, networking, and security.

Graph service platform

Post Syndicated from Grab Tech original https://engineering.grab.com/graph-service-platform

Introduction

In earlier articles of this series, we covered the importance of graph networks, graph concepts, how graph visualisation makes fraud investigations easier and more effective, and how graphs for fraud detection work. In this article, we elaborate on the need for a graph service platform and how it works.

In the present age, data linkages can generate significant business value. Whether we want to learn about the relationships between users in online social networks, between users and products in e-commerce, or understand credit relationships in financial networks, the capability to understand and analyse large amounts of highly interrelated data is becoming more important to businesses.

As the amount of consumer data grows, the GrabDefence team must continuously enhance fraud detection on mobile devices to proactively identify the presence of fraudulent or malicious users. Even simple financial transactions between users must be monitored for transaction loops and money laundering. To preemptively detect such scenarios, we need a graph service platform to help discover data linkages. 

Background

As mentioned in an earlier article, a graph is a model representation of the association of entities and holds knowledge in a structured way by marginalising entities and relationships. In other words, graphs hold a natural interpretability of linked data and graph technology plays an important role. Since the early days, large tech companies started to create their own graph technology infrastructure, which is used for things like social relationship mining, web search, and sorting and recommendation systems with great commercial success.

As graph technology was developed, the amount of data gathered from graphs started to grow as well, leading to a need for graph databases. Graph databases1 are used to store, manipulate, and access graph data on the basis of graph models. It is similar to the relational database with the feature of Online Transactional Processing (OLTP), which supports transactions, persistence, and other features.

A key concept of graphs is the edge or relationship between entities. The graph relates the data items in the store to a collection of nodes and edges, the edges representing the relationships between the nodes. These relationships allow data in the store to be linked directly and retrieved with one operation.

With graph databases, relationships between data can be queried fast as they are perpetually stored in the database. Additionally, relationships can be intuitively visualised using graph databases, making them useful for heavily interconnected data. To have real-time graph search capabilities, we must leverage the graph service platform and graph databases.

Architecture details

Graph services with graph databases are Platforms as a Service (PaaS) that encapsulate the underlying implementation of graph technology and support easier discovery of data association relationships with graph technologies.

They also provide universal graph operation APIs and service management for users. This means that users do not need to build graph runtime environments independently and can explore the value of data with graph service directly.

Fig. 1 Graph service platform system architecture

As shown in Fig. 1, the system can be divided into four layers:

  1. Storage backend – Different forms of data (for example, CSV files) are stored in Amazon S3, graph data stores in Neptune and meta configuration stores in DynamoDB.
  2. Driver – Contains drivers such as Gremlin, Neptune, S3, and DynamoDB.
  3. Service – Manages clusters, instances, databases etc, provides management API, includes schema and data load management, graph operation logic, and other graph algorithms.
  4. RESTful APIs – Currently supports the standard and uniform formats provided by the system, the Management API, Search API for OLTP, and Analysis API for online analytical processing (OLAP).

How it works

Graph flow

Fig. 2 Graph flow

CSV files stored in Amazon S3 are processed by extract, transform, and load (ETL) tools to generate graph data. This data is then managed by an Amazon Neptune DB cluster, which can only be accessed by users through graph service. Graph service converts user requests into asynchronous interactions with Neptune Cluster, which returns the results to users.

When users launch data load tasks, graph service synchronises the entity and attribute information with the CSV file in S3, and the schema stored in DynamoDB. The data is only imported into Neptune if there are no inconsistencies.

The most important component in the system is the graph service, which provides RESTful APIs for two scenarios: graph search for real-time streams and graph analysis for batch processing. At the same time, the graph service manages clusters, databases, instances, users, tasks, and meta configurations stored in DynamoDB, which implements features of service monitor and data loading offline or stream ingress online.

Use case in fraud detection

In Grab’s mobility business, we have come across situations where multiple accounts use shared physical devices to maximise their earning potential. With the graph capabilities provided by the graph service platform, we can clearly see the connections between multiple accounts and shared devices.

Historical device and account data are stored in the graph service platform via offline data loading or online stream injection. If the device and account data exists in the graph service platform, we can find the adjacent account IDs or the shared device IDs by using the device ID or account ID respectively specified in the user request.

In our experience, fraudsters tend to share physical resources to maximise their revenue. The following image shows a device that is shared by many users. With our Graph Visualisation platform based on graph service, you can see exactly what this pattern looks like.

Fig 3. Example of a device being shared with many users

Data injection

Fig. 4 Data injection

Graph service also supports data injection features, including data load by request (task with a type of data load) and real-time stream write by Kafka.  

When connected to GrabDefence’s infrastructure, Confluent with Kafka is used as the streaming engine.  The purpose of using Kafka as a streaming write engine is two-fold: to provide primary user authentication and to relieve the pressure on Neptune.

Impact

Graph service supports data management of Labelled Property Graphs and provides the capability to add, delete, update, and get vertices, edges, and properties for some graph models. Graph traversal and searching relationships with RESTful APIs are also more convenient with graph service.

Businesses usually do not need to focus on the underlying data storage, just designing graph schemas for model definition according to their needs. With the graph service platform, platforms or systems can be built for personalised search, intelligent Q&A, financial fraud, etc.

For big organisations, extensive graph algorithms provide the power to mine various entity connectivity relationships in massive amounts of data. The growth and expansion of new businesses is driven by discovering the value of data.

What’s next?

Fig. 5 Graph-centric ecosystems

We are building an integrated graph ecosystem inside and outside Grab. The infrastructure and service, or APIs are key components in graph-centric ecosystems; they provide graph arithmetic and basic capabilities of graphs in relation to search, computing, analysis etc. Besides that, we will also consider incorporating applications such as risk prediction and fraud detection in order to serve our current business needs.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

References

How Novo Nordisk built a modern data architecture on AWS

Post Syndicated from Jonatan Selsing original https://aws.amazon.com/blogs/big-data/how-novo-nordisk-built-a-modern-data-architecture-on-aws/

Novo Nordisk is a leading global pharmaceutical company, responsible for producing life-saving medicines that reach more than 34 million patients each day. They do this following their triple bottom line—that they must strive to be environmentally sustainable, socially sustainable, and financially sustainable. The combination of using AWS and data supports all these targets.

Data is pervasive throughout the entire value chain of Novo Nordisk. From foundational research, manufacturing lines, sales and marketing, clinical trials, pharmacovigilance, through patient-facing data-driven applications. Therefore, getting the foundation around how data is stored, safeguarded, and used in a way that provides the most value is one of the central drivers of improved business outcomes.

Together with AWS Professional Services, we’re building a data and analytics solution using a modern data architecture. The collaboration between Novo Nordisk and AWS Professional Services is a strategic and long-term close engagement, where developers from both organizations have worked together closely for years. The data and analytics environments are built around of the core tenets of the data mesh—decentralized domain ownership of data, data as a product, self-service data infrastructure, and federated computational governance. This enables the users of the environment to work with data in the way that drives the best business outcomes. We have combined this with elements from evolutionary architectures that will allow us to adapt different functionalities as AWS continuously develops new services and capabilities.

In this series of posts, you will learn how Novo Nordisk and AWS Professional Services built a data and analytics ecosystem to speed up innovation at petabyte scale:

  • In this first post, you will learn how the overall design has enabled the individual components to come together in a modular way. We dive deep into how we built a data management solution based on the data mesh architecture.
  • The second post discusses how we built a trust network between the systems that comprise the entire solution. We show how we use event-driven architectures, coupled with the use of attribute-based access controls, to ensure permission boundaries are respected at scale.
  • In the third post, we show how end-users can consume data from their tool of choice, without compromising data governance. This includes how to configure Okta, AWS Lake Formation, and Microsoft Power BI to enable SAML-based federated use of Amazon Athena for an enterprise business intelligence (BI) activity.

Pharma-compliant environment

As a pharmaceutical industry, GxP compliance is a mandate for Novo Nordisk. GxP is a general abbreviation for the “Good x Practice” quality guidelines and regulations defined by regulators such as European Medicines Agency, U.S. Food and Drug Administration, and others. These guidelines are designed to ensure that medicinal products are safe and effective for their intended use. In the context of a data environment, GxP compliance involves implementing integrity controls for data used to in decision making and processes and is used to guide how change management processes are implemented to continuously ensure compliance over time.

Because this data environment supports teams across the whole organization, each individual data owner must retain accountability on their data. Features were designed to provide data owners autonomy and transparency when managing their data, enabling them to take this responsibility. This includes the capability to handle personally identifiable information (PII) data and other sensitive workloads. To provide traceability on the environment, audit capabilities were added, which we describe more in this post.

Solution overview

The full solution is a sprawling landscape of independent services that work together to enable data and analytics with a decentralized data governance model at petabyte scale. Schematically, it can be represented as in the following figure.

Novo Nordisk Modern Data Architecture on AWS

The architecture is split into three independent layers: data management, virtualization, and consumption. The end-user sits in the consumption layer and works with their tool of choice. It’s meant to abstract as much of the AWS-native resources to application primitives. The consumption layer is integrated into the virtualization layer, which abstracts the access to data. The purpose of the virtualization layer is to translate between data consumption and data management solutions. The access to data is managed by what we refer to as data management solutions. We discuss one of our versatile data management solutions later in this post. Each layer in this architecture is independent of each other and instead only relies on well-defined interfaces.

Central to this architecture is that access is encapsulated in an AWS Identity and Access Management (IAM) role session. The data management layer focuses on providing the IAM role with the right permissions and governance, the virtualization layer provides access to the role, and the consumption layer abstracts the use of the roles in the tools of choice.

Technical architecture

Each of the three layers in the overall architecture has a distinct responsibility, but no singular implementation. Think of them as abstract classes. They can be implemented in concrete classes, and in our case they rely on foundational AWS services and capabilities. Let’s go through each of the three layers.

Data management layer

The data management layer is responsible for providing access to and governance of data. As illustrated in the following diagram, a minimal construct in the data management layer is the combination of an Amazon Simple Storage Service (Amazon S3) bucket and an IAM role that gives access to the S3 bucket. This construct can be expanded to include granular permission with Lake Formation, auditing with AWS CloudTrail, and security response capabilities from AWS Security Hub. The following diagram also shows that a single data management solution has no singular span. It can cross many AWS accounts and be comprised of any number of IAM role combinations.Data Mamangement Architecture

We have purposely not illustrated the trust policy of these roles in this figure, because those are a collaborative responsibility between the virtualization layer and the data management layer. We go into detail of how that works in the next post in this series. Data engineering professionals often interface directly with the data management layer, where they curate and prepare data for consumption.

Virtualization layer

The purpose of the virtualization layer is to keep track of who can do what. It doesn’t have any capabilities in itself, but translates the requirements from the data management ecosystems to the consumption layers and vice versa. It enables end-users on the consumption layer to access and manipulate data on one or more data management ecosystems, according to their permissions. This layer abstracts from end-users the technical details on data access, such as permission model, role assumptions, and storage location. It owns the interfaces to the other layers and enforces the logic of the abstraction. In the context of hexagonal architectures (see Developing evolutionary architecture with AWS Lambda), the interface layer plays the role of the domain logic, ports, and adapters. The other two layers are actors. The data management layer communicates the state of the layer to the virtualization layer and conversely receives information about the service landscape to trust. The virtualization layer architecture is shown in the following diagram.

Virtualization Layer Architecture

Consumption layer

The consumption layer is where the end-users of the data products are sitting. This can be data scientists, business intelligence analysts, or any third party that generates value from consuming the data. It’s important for this type of architecture that the consumption layer has a hook-based sign-in flow, where the authorization into the application can be modified at sign-in time. This is to translate the AWS-specific requirement into the target applications. After the session in the client-side application has successfully been started, it’s up to the application itself to instrument for data layer abstraction, because this will be application specific. And this is an additional important decoupling, where some responsibility is pushed to the decentralized units. Many modern software as a service (SaaS) applications support these built-in mechanisms, such as Databricks or Domino Data Lab, whereas more traditional client-side applications like RStudio Server have more limited native support for this. In the case where native support is missing, a translation down to the OS user session can be done to enable the abstraction. The consumption layer is shown schematically in the following diagram.

Consumption Layer Architecture

When using the consumption layer as intended, the users don’t know that the virtualization layer exists. The following diagram illustrates the data access patterns.

Data Access Patterns

Modularity

One of the main advantages of adopting the hexagonal architecture pattern, and delegating both the consuming layer and the data management layer to primary and secondary actors, means that they can be changed or replaced as new functionalities are released that require new solutions. This gives a hub-and-spoke type pattern, where many different types of producer/consumer type systems can be connected and work simultaneously in union. An example of this is that the current solution running in Novo Nordisk supports multiple, simultaneous data management solutions and are exposed in a homogenous way in the consuming layer. This includes both a data lake, the data mesh solution presented in this post, and several independent data management solutions. And these are exposed to multiple types of consuming applications, from custom managed, self-hosted applications, to SaaS offerings.

Data management ecosystem

To scale the usage of the data and increase the freedom, Novo Nordisk, jointly with AWS Professional Services, built a data management and governance environment, named Novo Nordisk Enterprise DataHub (NNEDH). NNEDH implements a decentralized distributed data architecture, and data management capabilities such as an enterprise business data catalog and data sharing workflow. NNEDH is an example of a data management ecosystem in the conceptual framework introduced earlier.

Decentralized architecture: From a centralized data lake to a distributed architecture

Novo Nordisk’s centralized data lake consists of 2.3 PB of data from more than 30 business data domains worldwide serving over 2000+ internal users throughout the value chain. It has been running successfully for several years. It is one of the data management ecosystems currently supported.

Within the centralized data architecture, data from each data domain is copied, stored, and processed in one central location: a central data lake hosted in one data storage. This pattern has challenges at scale because it retains the data ownership with the central team. At scale, this model slows down the journey toward a data-driven organization, because ownership of the data isn’t sufficiently anchored with the professionals closest to the domain.

The monolithic data lake architecture is shown in the following diagram.Monolithic Data Lake Architecture

Within the decentralized distributed data architecture, the data from each domain is kept within the domain on its own data storage and compute account. In this case, the data is kept close to domain experts, because they’re the ones who know their own data best and are ultimately the owner of any data products built around their data. They often work closely with business analysts to build the data product and therefore know what good data means to consumers of their data products. In this case, the data responsibility is also decentralized, where each domain has its own data owner, putting the accountability onto the true owners of the data. Nevertheless, this model might not work at small scale, for example an organization with only one business unit and tens of users, because it would introduce more overhead on the IT team to manage the organization data. It better suits large organizations, or small and medium ones that would like to grow and scale.

The Novo Nordisk data mesh architecture is shown in the following diagram.

Novo Nordisk Data Mesh Architecture

Data domains and data assets

To enable the scalability of data domains across the organization, it’s mandatory to have a standard permission model and data access pattern. This standard must not be too restrictive in such a way that it may be a blocker for specific use cases, but it should be standardized in such a way to use the same interface between the data management and virtualization layers.

The data domains on NNEDH are implemented by a construct called an environment. An environment is composed of at least one AWS account and one AWS Region. It’s a workplace where data domain teams can work and collaborate to build data products. It links the NNEDH control plane to the AWS accounts where the data and compute of the domain reside. The data access permissions are also defined at the environment level, managed by the owner of the data domain. The environments have three main components: a data management and governance layer, data assets, and optional blueprints for data processing.

For data management and governance, the data domains rely on Lake Formation, AWS Glue, and CloudTrail. The deployment method and setup of these components is standardized across data domains. This way, the NNEDH control plane can provide connectivity and management to data domains in a standardized way.

The data assets of each domain residing in an environment are organized in a dataset, which is a collection of related data used for building a data product. It includes technical metadata such as data format, size, and creation time, and business metadata such as the producer, data classification, and business definition. A data product can use one or several datasets. It is implemented through managed S3 buckets and the AWS Glue Data Catalog.

Data processing can be implemented in different ways. NNEDH provides blueprints for data pipelines with predefined connectivity to data assets to speed up the delivery of data products. Data domain users have the freedom to use any other compute capability on their domain, for example using AWS services not predefined on the blueprints or accessing the datasets from other analytics tools implemented in the consumption layer, as mentioned earlier in this post.

Data domain personas and roles

On NNEDH, the permission levels on data domains are managed through predefined personas, for example data owner, data stewards, developers, and readers. Each persona is associated with an IAM role that has a predefined permission level. These permissions are based on the typical needs of users on these roles. Nevertheless, to give more flexibility to data domains, these permissions can be customized and extended as needed.

The permissions associated with each persona are related only to actions allowed on the AWS account of the data domain. For the accountability on data assets, the data access to the assets is managed by specific resource policies instead of IAM roles. Only the owner of each dataset, or data stewards delegated by the owner, can grant or revoke data access.

On the dataset level, a required persona is the data owner. Typically, they work closely with one or many data stewards as data products managers. The data steward is the data subject matter expert of the data product domain, responsible for interpreting collected data and metadata to derive deep business insights and build the product. The data steward bridges between business users and technical teams on each data domain.

Enterprise business data catalog

To enable freedom and make the organization data assets discoverable, a web-based portal data catalog is implemented. It indexes in a single repository metadata from datasets built on data domains, breaking data silos across the organization. The data catalog enables data search and discovery across different domains, as well as automation and governance on data sharing.

The business data catalog implements data governance processes within the organization. It ensures the data ownership—someone in the organization is responsible for the data origin, definition, business attributes, relationships, and dependencies.

The central construct of a business data catalog is a dataset. It’s the search unit within the business catalog, having both technical and business metadata. To collect technical metadata from structured data, it relies on AWS Glue crawlers to recognize and extract data structures from the most popular data formats, including CSV, JSON, Avro, and Apache Parquet. It provides information such as data type, creation date, and format. The metadata can be enriched by business users by adding a description of the business context, tags, and data classification.

The dataset definition and related metadata are stored in an Amazon Aurora Serverless database and Amazon OpenSearch Service, enabling you to run textual queries on the data catalog.

Data sharing

NNEDH implements a data sharing workflow, enabling peer-to-peer data sharing across AWS accounts using Lake Formation. The workflow is as follows:

  1. A data consumer requests access to the dataset.
  2. The data owner grants access by approving the access request. They can delegate the approval of access requests to the data steward.
  3. Upon the approval of an access request, a new permission is added to the specific dataset in Lake Formation of the producer account.

The data sharing workflow is shown schematically in the following figure.

Data Sharing Workflow

Security and audit

The data in the Novo Nordisk data mesh lies in AWS accounts owned by Novo Nordisk business accounts. The configuration and the states of the data mesh are stored in Amazon Relational Database Service (Amazon RDS). The Novo Nordisk security architecture is shown in the following figure.

Novo Nordisk Distributed Security and Audit Architecture

Access and edits to the data in NNEDH needs to be logged for audit purposes. We need to be able to tell who modified data, when the modification happened, and what modifications were applied. In addition, we need to be able to answer why the modification was allowed by that person at that time.

To meet these requirements, we use the following components:

  • CloudTrail to log API calls. We specifically enable CloudTrail data event logging for S3 buckets and objects. By activating the logging, we can trace back any modification to any files in the data lake to the person who made the modification. We enforce usage of source identity for IAM role sessions to ensure user traceability.
  • We use Amazon RDS to store the configuration of the data mesh. We log queries against the RDS database. Together with CloudTrail, this log allows us to answer the question of why a modification to a file in Amazon S3 at a specific time by a specific person is possible.
  • Amazon CloudWatch to log activities across the mesh.

In addition to those logging mechanisms, the S3 buckets are created using the following properties:

  • The bucket is encrypted using server-side encryption with AWS Key Management Service (AWS KMS) and customer managed keys
  • Amazon S3 versioning is activated by default

Access to the data in NNEDH is controlled at the group level instead of individual users. The group corresponds to the group defined in the Novo Nordisk directory group. To keep track of the person who modified the data in the data lakes, we use the source identity mechanism explained in the post How to relate IAM role activity to corporate identity.

Conclusion

In this post, we showed how Novo Nordisk built a modern data architecture to speed up the delivery of data-driven use cases. It includes a distributed data architecture, to scale the usage to petabyte scale for over 2,000 internal users throughout the value chain, as well as a distributed security and audit architecture handling data accountability and traceability on the environment to meet their compliance requirements.

The next post in this series describes the implementation of distributed data governance and control at scale of Novo Nordisk’s modern data architecture.


About the Authors

Jonatan Selsing is former research scientist with a PhD in astrophysics that has turned to the cloud. He is currently the Lead Cloud Engineer at Novo Nordisk, where he enables data and analytics workloads at scale. With an emphasis on reducing the total cost of ownership of cloud-based workloads, while giving full benefit of the advantages of cloud, he designs, builds, and maintains solutions that enable research for future medicines.

Hassen Riahi is a Sr. Data Architect at AWS Professional Services. He holds a PhD in Mathematics & Computer Science on large-scale data management. He works with AWS customers on building data-driven solutions.

Anwar Rizal is a Senior Machine Learning consultant based in Paris. He works with AWS customers to develop data and AI solutions to sustainably grow their business.

Moses Arthur comes from a mathematics and computational research background and holds a PhD in Computational Intelligence specialized in Graph Mining. He is currently a Cloud Product Engineer at Novo Nordisk building GxP-compliant enterprise data lakes and analytics platforms for Novo Nordisk global factories producing digitalized medical products.

Alessandro FiorAlessandro Fior is a Sr. Data Architect at AWS Professional Services. With over 10 years of experience delivering data and analytics solutions, he is passionate about designing and building modern and scalable data platforms that accelerate companies to get value from their data.

Kumari RamarKumari Ramar is an Agile certified and PMP certified Senior Engagement Manager at AWS Professional Services. She delivers data and AI/ML solutions that speed up cross-system analytics and machine learning models, which enable enterprises to make data-driven decisions and drive new innovations.

Create your own reusable visual transforms for AWS Glue Studio

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/create-your-own-reusable-visual-transforms-for-aws-glue-studio/

AWS Glue Studio has recently added the possibility of adding custom transforms that you can use to build visual jobs to use them in combination with the AWS Glue Studio components provided out of the box. You can now define custom visual transform by simply dropping a JSON file and a Python script onto Amazon S3, which defines the component and the processing logic, respectively.

Custom visual transform lets you define, reuse, and share business-specific ETL logic among your teams. With this new feature, data engineers can write reusable transforms for the AWS Glue visual job editor. Reusable transforms increase consistency between teams and help keep jobs up-to-date by minimizing duplicate effort and code.

In this blog post, I will show you a fictional use case that requires the creation of two custom transforms to illustrate what you can accomplish with this new feature. One component will generate synthetic data on the fly for testing purposes, and the other will prepare the data to store it partitioned.

Use case: Generate synthetic data on the fly

There are multiple reasons why you would want to have a component that generates synthetic data. Maybe the real data is heavily restricted or not yet available, or there is not enough quantity or variety at the moment to test performance. Or maybe using the real data imposes some cost or load to the real system, and we want to reduce its usage during development.

Using the new custom visual transforms framework, let’s create a component that builds synthetic data for fictional sales during a natural year.

Define the generator component

First, define the component by giving it a name, description, and parameters. In this case, use salesdata_generator for both the name and the function, with two parameters: how many rows to generate and for which year.

For the parameters, we define them both as int, and you can add a regex validation to make sure the parameters provided by the user are in the correct format.

There are further configuration options available; to learn more, refer to the AWS Glue User Guide.

This is how the component definition would look like. Save it as salesdata_generator.json. For convenience, we’ll match the name of the Python file, so it’s important to choose a name that doesn’t conflict with an existing Python module.
If the year is not specified, the script will default to last year.

{
  "name": "salesdata_generator",
  "displayName": "Synthetic Sales Data Generator",
  "description": "Generate synthetic order datasets for testing purposes.",
  "functionName": "salesdata_generator",
  "parameters": [
    {
      "name": "numSamples",
      "displayName": "Number of samples",
      "type": "int",
      "description": "Number of samples to generate"
    },
    {
      "name": "year",
      "displayName": "Year",
      "isOptional": true,
      "type": "int",
      "description": "Year for which generate data distributed randomly, by default last year",
      "validationRule": "^\\d{4}$",
      "validationMessage": "Please enter a valid year number"
    }
  ]
}

Implement the generator logic

Now, you need to create a Python script file with the implementation logic.
Save the following script as salesdata_generator.py. Notice the name is the same as the JSON, just with a different extension.

from awsglue import DynamicFrame
import pyspark.sql.functions as F
import datetime
import time

def salesdata_generator(self, numSamples, year=None):
    if not year:
        # Use last year
        year = datetime.datetime.now().year - 1
    
    year_start_ts = int(time.mktime((year,1,1,0,0,0,0,0,0)))
    year_end_ts = int(time.mktime((year + 1,1,1,0,0,0,0,0,0)))
    ts_range = year_end_ts - year_start_ts
    
    departments = ["bargain", "checkout", "food hall", "sports", "menswear", "womenwear", "health and beauty", "home"]
    dep_array = F.array(*[F.lit(x) for x in departments])
    dep_randomizer = (F.round(F.rand() * (len(departments) -1))).cast("int")

    df = self.glue_ctx.sparkSession.range(numSamples) \
      .withColumn("sale_date", F.from_unixtime(F.lit(year_start_ts) + F.rand() * ts_range)) \
      .withColumn("amount_dollars", F.round(F.rand() * 1000, 2)) \
      .withColumn("department", dep_array.getItem(dep_randomizer))  
    return DynamicFrame.fromDF(df, self.glue_ctx, "sales_synthetic_data")

DynamicFrame.salesdata_generator = salesdata_generator

The function salesdata_generator in the script receives the source DynamicFrame as “self”, and the parameters must match the definition in the JSON file. Notice the “year” is an optional parameter, so it has assigned a default function on call, which the function detects and replaces with the previous year. The function returns the transformed DynamicFrame. In this case, it’s not derived from the source one, which is the common case, but replaced by a new one.

The transform leverages Spark functions as well as Python libraries in order to implement this generator.
To keep things simple, this example only generates four columns, but we could do the same for many more by either hardcoding values, assigning them from a list, looking for some other input, or doing whatever makes sense to make the data realistic.

Deploy and using the generator transform

Now that we have both files ready, all we have to do is upload them on Amazon S3 under the following path.

s3://aws-glue-assets-<account id>-<region name>/transforms/

If AWS Glue has never been used in the account and Region, then that bucket might not exist and needs to be created. AWS Glue will automatically create this bucket when you create your first job.

You will need to manually create a folder called “transforms” in that bucket to upload the files into.

Once you have uploaded both files, the next time we open (or refresh) the page on AWS Glue Studio visual editor, the transform should be listed among the other transforms. You can search for it by name or description.

Because this is a transform and not a source, when we try to use the component, the UI will demand a parent node. You can use as a parent the real data source (so you can easily remove the generator and use the real data) or just use a placeholder. I’ll show you how:

  1. Go to the AWS Glue, and in the left menu, select Jobs under AWS Glue Studio.
  2. Leave the default options (Visual with a source and target and S3 source and destination), and choose Create.
  3. Give the job a name by editing Untitled job at the top left; for example, CustomTransformsDemo
  4. Go to the Job details tab and select a role with AWS Glue permissions as the IAM role. If no role is listed on the dropdown, then follow these instructions to create one.
    For this demo, you can also reduce Requested number of workers to 2 and Number of retries to 0 to minimize costs.
  5. Delete the Data target node S3 bucket at the bottom of the graph by selecting it and choosing Remove. We will restore it later when we need it.
  6. Edit the S3 source node by selecting it in the Data source properties tab and selecting source type S3 location.
    In the S3 URL box, enter a path that doesn’t exist on a bucket the role selected can access, for instance: s3://aws-glue-assets-<account id>-<region name>/file_that_doesnt_exist. Notice there is no trailing slash.
    Choose JSON as the data format with default settings; it doesn’t matter.
    You might get a warning that it cannot infer schema because the file doesn’t exist; that’s OK, we don’t need it.
  7. Now search for the transform by typing “synthetic” in the search box of transforms. Once the result appears (or you scroll and search it on the list), choose it so it is added to the job.
  8. Set the parent of the transform just added to be S3 bucket source in the Node properties tab. Then for the ApplyMapping node, replace the parent S3 bucket with transforms Synthetic Sales Data Generator. Notice this long name is coming from the displayName defined in the JSON file uploaded before.
  9. After these changes, your job diagram should look as follows (if you tried to save, there might be some warnings; that’s OK, we’ll complete the configuration next).
  10. Select the Synthetic Sales node and go to the Transform tab. Enter 10000 as the number of samples and leave the year by default, so it uses last year.
  11. Now we need the generated schema to be applied. This would be needed if we had a source that matches the generator schema.
    In the same node, select the tab Data preview and start a session. Once it is running, you should see sample synthetic data. Notice the sale dates are randomly distributed across the year.
  12. Now select the tab Output schema and choose Use datapreview schema That way, the four fields generated by the node will be propagated, and we can do the mapping based on this schema.
  13. Now we want to convert the generated sale_date timestamp into a date column, so we can use it to partition the output by day. Select the node ApplyMapping in the Transform tab. For the sale_date field, select date as the target type. This will truncate the timestamp to just the date.
  14. Now it’s a good time to save the job. It should let you save successfully.

Finally, we need to configure the sink. Follow these steps:

  1. With the ApplyMapping node selected, go to the Target dropdown and choose Amazon S3. The sink will be added to the ApplyMapping node. If you didn’t select the parent node before adding the sink, you can still set it in the Node details tab of the sink.
  2. Create an S3 bucket in the same Region as where the job will run. We’ll use it to store the output data, so we can clean up easily at the end. If you create it via the console, the default bucket config is OK.
    You can read more information about bucket creation on the Amazon S3 documentation 
  3. In the Data target properties tab, enter in S3 Target Location the URL of the bucket and some path and a trailing slash, for instance: s3://<your output bucket here>/output/
    Leave the rest with the default values provided.
  4. Choose Add partition key at the bottom and select the field sale_date.

We could create a partitioned table at the same time just by selecting the corresponding catalog update option. For simplicity, generate the partitioned files at this time without updating the catalog, which is the default option.

You can now save and then run the job.

Once the job has completed, after a couple of minutes (you can verify this in the Runs tab), explore the S3 target location entered above. You can use the Amazon S3 console or the AWS CLI. You will see files named like this: s3://<your output bucket here>/output/sale_date=<some date yyyy-mm-dd>/<filename>.

If you count the files, there should be close to but not more than 1,460 (depending on the year used and assuming you are using 2 G.1X workers and AWS Glue version 3.0)

Use case: Improve the data partitioning

In the previous section, you created a job using a custom visual component that produced synthetic data, did a small transformation on the date, and saved it partitioned on S3 by day.

You might be wondering why this job generated so many files for the synthetic data. This is not ideal, especially when they are as small as in this case. If this data was saved as a table with years of history, generating small files has a detrimental impact on tools that consume it, like Amazon Athena.

The reason for this is that when the generator calls the “range” function in Apache Spark without specifying a number of memory partitions (notice they are a different kind from the output partitions saved to S3), it defaults to the number of cores in the cluster, which in this example is just 4.

Because the dates are random, each memory partition is likely to contain rows representing all days of the year, so when the sink needs to split the dates into output directories to group the files, each memory partition needs to create one file for each day present, so you can have 4 * 365 (not in a leap year) is 1,460.

This example is a bit extreme, and normally data read from the source is not so spread over time. The issue can often be found when you add other dimensions, such as output partition columns.

Now you are going to build a component that optimizes this, trying to reduce the number of output files as much as possible: one per output directory.
Also, let’s imagine that on your team, you have the policy of generating S3 date partition separated by year, month, and day as strings, so the files can be selected efficiently whether using a table on top or not.

We don’t want individual users to have to deal with these optimizations and conventions individually but instead have a component they can just add to their jobs.

Define the repartitioner transform

For this new transform, create a separate JSON file, let’s call it repartition_date.json, where we define the new transform and the parameters it needs.

{
  "name": "repartition_date",
  "displayName": "Repartition by date",
  "description": "Split a date into partition columns and reorganize the data to save them as partitions.",
  "functionName": "repartition_date",
  "parameters": [
    {
      "name": "dateCol",
      "displayName": "Date column",
      "type": "str",
      "description": "Column with the date to split into year, month and day partitions. The column won't be removed"
    },
    {
      "name": "partitionCols",
      "displayName": "Partition columns",
      "type": "str",
      "isOptional": true,
      "description": "In addition to the year, month and day, you can specify additional columns to partition by, separated by commas"
    },
    {
      "name": "numPartitionsExpected",
      "displayName": "Number partitions expected",
      "isOptional": true,
      "type": "int",
      "description": "The number of partition column value combinations expected, if not specified the system will calculate it."
    }
  ]
}

Implement the transform logic

The script splits the date into multiple columns with leading zeros and then reorganizes the data in memory according to the output partitions. Save the code in a file named repartition_date.py:

from awsglue import DynamicFrame
import pyspark.sql.functions as F

def repartition_date(self, dateCol, partitionCols="", numPartitionsExpected=None):
    partition_list = partitionCols.split(",") if partitionCols else []
    partition_list += ["year", "month", "day"]
    
    date_col = F.col(dateCol)
    df = self.toDF()\
      .withColumn("year", F.year(date_col).cast("string"))\
      .withColumn("month", F.format_string("%02d", F.month(date_col)))\
      .withColumn("day", F.format_string("%02d", F.dayofmonth(date_col)))
    
    if not numPartitionsExpected:
        numPartitionsExpected = df.selectExpr(f"COUNT(DISTINCT {','.join(partition_list)})").collect()[0][0]
    
    # Reorganize the data so the partitions in memory are aligned when the file partitioning on s3
    # So each partition has the data for a combination of partition column values
    df = df.repartition(numPartitionsExpected, partition_list)    
    return DynamicFrame.fromDF(df, self.glue_ctx, self.name)

DynamicFrame.repartition_date = repartition_date

Upload the two new files onto the S3 transforms folder like you did for the previous transform.

Deploy and use the generator transform

Now edit the job to make use of the new component to generate a different output.
Refresh the page in the browser if the new transform is not listed.

  1. Select the generator transform and from the transforms dropdown, find Repartition by date and choose it; it should be added as a child of the generator.
    Now change the parent of the Data target node to the new node added and remove the ApplyMapping; we no longer need it.
  2. Repartition by date needs you to enter the column that contains the timestamp.
    Enter sale_date (the framework doesn’t yet allow field selection using a dropdown) and leave the other two as defaults.
  3. Now we need to update the output schema with the new date split fields. To do so, use the Data preview tab to check it’s working correctly (or start a session if the previous one has expired). Then in the Output schema, choose Use datapreview schema so the new fields get added. Notice the transform doesn’t remove the original column, but it could if you change it to do so.
  4. Finally, edit the S3 target to enter a different location so the folders don’t mix with the previous run, and it’s easier to compare and use. Change the path to /output2/.
    Remove the existing partition column and instead add year, month, and day.

Save and run the job. After one or two minutes, once it completes, examine the output files. They should be much closer to the optimal number of one per day, maybe two. Consider that in this example, we only have four partitions. In a real dataset, the number of files without this repartitioning would explode very easily.
Also, now the path follows the traditional date partition structure, for instance: output2/year=2021/month=09/day=01/run-AmazonS3_node1669816624410-4-part-r-00292

Notice that at the end of the file name is the partition number. While we now have more partitions, we have fewer output files because the data is organized in memory more aligned with the desired output.

The repartition transform has additional configuration options that we have left empty. You can now go ahead and try different values and see how they affect the output.
For instance, you can specify “department ” as “Partition columns” in the transform and then add it in the sink partition column list. Or you can enter a “Number of partitions expected” and see how it affects the runtime (it no longer needs to determine this at runtime) and the number of files produced as you enter a higher number, for instance, 3,000.

How this feature works under the hood

  1. Upon loading the AWS Glue Studio visual job authoring page, all your transforms stored in the aforementioned S3 bucket will be loaded in the UI. AWS Glue Studio will parse the JSON definition file to display transform metadata such as name, description, and list of parameters.
  2. Once the user is done creating and saving his job using custom visual transforms, AWS Glue Studio will generate the job script and update the Python library path (also referred as —extra-py-files job parameters) with the list of transform Python file S3 paths, separated by comma.
  3. Before running your script, AWS Glue will add all file paths stored in the —extra-py-files job parameters to the Python path, allowing your script to run all custom visual transform functions you defined.

Cleanup

In order to avoid running costs, if you don’t want to keep the generated files, you can empty and delete the output bucket created for this demo. You might also want to delete the AWS Glue job created.

Conclusion

In this post, you have seen how you can create your own reusable visual transforms and then use them in AWS Glue Studio to enhance your jobs and your team’s productivity.

You first created a component to use synthetically generated data on demand and then another transform to optimize the data for partitioning on Amazon S3.


About the authors

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Michael Benattar is a Senior Software Engineer on the AWS Glue Studio team. He has led the design and implementation of the custom visual transform feature.

AWS Specialist Insights Team uses Amazon QuickSight to provide operational insights across the AWS Worldwide Specialist Organization

Post Syndicated from David Adamson original https://aws.amazon.com/blogs/big-data/aws-specialist-insights-team-uses-amazon-quicksight-to-provide-operational-insights-across-the-aws-worldwide-specialist-organization/

The AWS Worldwide Specialist Organization (WWSO) is a team of go-to-market experts that support strategic services, customer segments, and verticals at AWS. Working together, the Specialist Insights Team (SIT) and the Finance, Analytics, Science, and Technology team (FAST) support WWSO in acquiring, securing, and delivering information and business insights at scale by working with the broader AWS community (Sales, Business Units, Finance) enabling data-driven decisions to be made.

SIT is made up of analysts who bring deep knowledge of the business intelligence (BI) stack to support the WWSO business. Some analysts work across multiple areas, whereas others are deeply knowledgeable in their specific verticals, but all are technically proficient in BI tools and methodologies. The team’s ability to combine technical and operational knowledge, in partnership with domain experts within WWSO, helps us build a common, standard data platform that can be used throughout AWS.

Untapped potential in data availability

One of the ongoing challenges for the team was how to turn the 2.1 PB of data inside the data lake into actionable business intelligence that can drive actions and verifiable outcomes. The resources needed to translate the data, analyze it, and succinctly articulate what the data shows had been a blocker of our ability to be agile and responsive to our customers.

After reviewing several vendor products and evaluating the pros and cons of each, Amazon QuickSight was chosen to replace our existing legacy BI solution. It not only satisfied all of the criteria necessary to provide actionable insights across WWSO business but allows us to scale securely across tens of thousands of users at AWS.

In this post, we discuss what influenced the decision to implement QuickSight, and will detail some of the benefits our team has seen since implementation.

Legacy tool deprecation

The legacy BI solution presented a number of challenges, starting with scaling, complex governance, and siloed reporting. This resulted in poor performance, cumbersome development processes, multiple versions of truth, and high costs. Ultimately, the legacy BI solution had significant barriers to widespread adoption, including long time to insights, lack of trust, low innovation, and return-on-investment (ROI) justification.

After the decision was made to deprecate the previous BI tool our team had been using to provide reports and insights to the organization, the team began to make preparations for the impending switch. We met with analysts across the specialist organization to gather feedback on what they’d like to see in the next iteration of reporting capabilities. Based on that feedback, and with guidance from our leadership teams, the following criteria needed to be met in our next BI tool:

  • Accessible insights – To ensure users with varying levels of technical aptitude could understand the information, the insights format needed to be easy to understand.
  • Speed – With millions of records, processing speed needed to be lightning fast, and we also didn’t want to invest a lot of time in technical implementation or user education training.
  • Cost – Being a frugal company, we needed to ensure that our BI solution would not only do what we needed it to do but that it wouldn’t blow up our budget.
  • Security – Built-in row-level security, and a custom solution developed internally, had the ability to give access to thousands of users across AWS.

Among other considerations that ultimately influenced the decision to use QuickSight was that it’s a fully managed service, which meant no need to maintain a separate server or manage any upgrades. Because our team handles sensitive data, security was also top of mind. QuickSight passed that test as well; we were able to implement fine-grained security measures and saw no trade-off in performance.

A simple, speedy, single source of truth

With such a wide variety of teams needing access to the data and insights our team provides, our BI solution needed to be user-friendly and intuitive without the need for extensive training or convoluted instructions. With millions of records used to generate insights on sales pipelines, revenue, headcount, etc., queries could become quite complex. To meet our first top priority for accessible insights, we were looking for a combination of easy-to-operate and easy-to-understand visualizations.

Once our QuickSight implementation was complete, near-real-time, actionable insights with informative visuals were just a few clicks away. We were impressed by how simple it was to get at-a-glance insights that told data-driven stories about the key performance indicators that were most important to our stakeholder community. For business-critical metrics, we’re able to set up alerts that trigger emails to owners when certain thresholds are met, providing peace of mind that nothing important will slip through the cracks.

With the goal of migrating 400+ dashboards from the legacy BI solution over to QuickSight successfully, there were three critical components that we had to get right. Not only did we need to have the right technology, we also needed to set up the right processes while also keeping change management—from a people perspective—top of mind.

This migration project provided us with an opportunity to standardize our data, ensuring that we have a uniform source of truth that enables efficiency, as well as governed access and self-service across the company. In the spirit of working smarter (not harder), we kickstarted the migration in parallel with the data standardization project.

We started by establishing clear organization goals for alignment and a solid plan from start to finish. Next steps were to focus on row-level security design and evolution to ensure that we can provide governance and security at scale. To ensure success, we first piloted migrating 35+ dashboards and 500+ users. We then established a core technical team whose focus was to be experts at QuickSight and migrate another 400+ dashboards, 4,000+ users, and 60,0000+ impressions. The technical team also trained other members of the team to bring everyone along on the change management journey. We were able to complete the migration in 18 months across thousands of users.

With the base in place, we shifted focus to move from foundational business metrics to machine learning (ML) based insights and outcomes to help drive data-driven actions.

The following screenshot shows an example of what one of our QuickSight dashboards looks like, though the numbers do not reflect real values; this is test data.

With speed being next on our list of key criteria, we were delighted to learn more about how QuickSight works. SPICE, an acronym for Super-fast, Parallel, In-memory Calculation Engine, is the robust engine that QuickSight uses to rapidly perform advanced calculations and serve data. The query runtimes and dashboard development speed were both appreciably faster in comparison to other data visualization tools we had used, where we’d need to wait for it to process every time we added a new calculation or a new field to the visual. The dashboard load times were also noticeably faster than the load times from our previous BI tool; most load in under 5 seconds, compared to several minutes with the previous BI tool.

Another benefit of having chosen QuickSight is that there has been a significant reduction in the number of disagreements over data definitions or questions about discrepancies between reports. With standardized SPICE datasets built in QuickSight, we can now offer data as a service to the organization, creating a single source of truth for our insights shared across the organization. This saved the organization hours of time investigating unanswered questions, enabling us to be more agile and responsive, which makes us better able to serve our customers.

Dashboards are just the beginning

We’re very happy with QuickSight’s performance and scalability, and we are very excited to improve and expand on the solid reporting foundation we’ve begun to build. Having driven adoption from 50 percent to 83 percent, as well as seeing a 215 percent growth in views and a 157 percent growth in users since migrating to QuickSight, it’s clear we made the right choice.

We were intrigued by a recent post by Amy Laresch and Kellie Burton, AWS Analytics sales team uses QuickSight Q to save hours creating monthly business reviews. Based on what we learned from that post, we also plan to test out and eventually implement Amazon QuickSight Q, a ML powered natural language capability that gives anyone in the organization the ability to ask business questions in natural language and receive accurate answers with relevant visualizations. We’re also considering integrations with Amazon SageMaker and Amazon Honeycode-built apps for write back.

To learn more, visit Amazon QuickSight.


About the Authors

David Adamson is the head of WWSO Insights. He is leading the team on the journey to a data driven organization that delivers insightful, actionable data products to WWSO and shared in partnership with the broader AWS organization. In his spare time, he likes to travel across the world and can be found in his back garden, weather permitting exploring and photography the night sky.

Yash Agrawal is an Analytics Lead at Amazon Web Services. Yash’s role is to define the analytics roadmap, develop standardized global dashboards & deliver insightful analytics solutions for stakeholders across AWS.

Addis Crooks-Jones is a Sr. Manager of Business Intelligence Engineering at Amazon Web Services Finance, Analytics and Science Team (FAST). She is responsible for partnering with business leaders in the World Wide Specialist Organization to build a culture of data  to support strategic initiatives. The technology solutions developed are used globally to drive decision making across AWS. When not thinking about new plans involving data, she like to be on adventures big and small involving food, art and all the fun people in her life.

Graham Gilvar is an Analytics Lead at Amazon Web Services. He builds and maintains centralized QuickSight dashboards, which enable stakeholders across all WWSO services to interlock and make data driven decisions. In his free time, he enjoys walking his dog, golfing, bowling, and playing hockey.

Shilpa Koogegowda is the Sales Ops Analyst at Amazon Web Services and has been part of the WWSO Insights team for the last two years. Her role involves building standardized metrics, dashboards and data products to provide data and insights to the customers.

Impact of Infrastructure failures on shard in Amazon OpenSearch Service

Post Syndicated from Bukhtawar Khan original https://aws.amazon.com/blogs/big-data/impact-of-infrastructure-failures-on-shard-in-amazon-opensearch-service/

Amazon OpenSearch Service is a managed service that makes it easy to secure, deploy, and operate OpenSearch and legacy Elasticsearch clusters at scale in the AWS Cloud. Amazon OpenSearch Service provisions all the resources for your cluster, launches it, and automatically detects and replaces failed nodes, reducing the overhead of self-managed infrastructures. The service makes it easy for you to perform interactive log analytics, real-time application monitoring, website searches, and more by offering the latest versions of OpenSearch, support for 19 versions of Elasticsearch (1.5 to 7.10 versions), and visualization capabilities powered by OpenSearch Dashboards and Kibana (1.5 to 7.10 versions).

In the latest service software release, we’ve updated the shard allocation logic to be load-aware so that when shards are redistributed in case of any node failures, the service disallows surviving nodes from getting overloaded by shards previously hosted on the failed node. This is especially important for Multi-AZ domains to provide consistent and predictable cluster performance.

If you would like more background on shard allocation logic in general, please see Demystifying Elasticsearch shard allocation.

The challenge

An Amazon OpenSearch Service domain is said to be “balanced” when the number of nodes are equally distributed across configured Availability Zones, and the total number of shards are distributed equally across all the available nodes without concentration of shards of any one index on any one node. Also, OpenSearch has a property called “Zone Awareness” that, when enabled, ensures that the primary shard and its corresponding replica are allocated in different Availability Zones. If you have more than one copy of data, having multiple Availability Zones provides better fault tolerance and availability. In the event, the domain is scaled out or scaled in or during the failure of node(s), OpenSearch automatically redistributes the shards between available nodes while obeying the allocation rules based on zone awareness.

While the shard-balancing process ensures that shards are evenly distributed across Availability Zones, in some cases, if there is an unexpected failure in a single zone, shards will get reallocated to the surviving nodes. This might result in the surviving nodes getting overwhelmed, impacting cluster stability.

For instance, if one node in a three-node cluster goes down, OpenSearch redistributes the unassigned shards, as shown in the following diagram. Here “P“ represents a primary shard copy, whereas “R“ represents a replica shard copy.

This behavior of the domain can be explained in two parts – during failure and during recovery.

During failure

A domain deployed across multiple Availability Zones can encounter multiple types of failures during its lifecycle.

Complete zone failure

A cluster may lose a single Availability Zone due to a variety of reasons and also all the nodes in that zone. Today, the service tries to place the lost nodes in the remaining healthy Availability Zones. The service also tries to re-create the lost shards in the remaining nodes while still following the allocation rules. This can result in some unintended consequences.

  • When the shards of the impacted zone are getting reallocated to healthy zones, they trigger shard recoveries that can increase the latencies as it consumes additional CPU cycles and network bandwidth.
  • For an n-AZ, n-copy setup, (n>1), the surviving n-1 Availability Zones are allocated with the nth shard copy, which can be undesirable as it can cause skewness in shard distribution, which can also result in unbalanced traffic across nodes. These nodes can get overloaded, leading to further failures.

Partial zone failure

In the event of a partial zone failure or when the domain loses only some of the nodes in an Availability Zone, Amazon OpenSearch Service tries to replace the failed nodes as quickly as possible. However, in case it takes too long to replace the nodes, OpenSearch tries to allocate the unassigned shards of that zone into the surviving nodes in the Availability Zone. If the service cannot replace the nodes in the impacted Availability Zone, it may allocate them in the other configured Availability Zone, which may further skew the distribution of shards both across and within the zone. This again has unintended consequences.

  • If the nodes on the domain do not have enough storage space to accommodate the additional shards, the domain can be write-blocked, impacting indexing operation.
  • Due to the skewed distribution of shards, the domain may also experience skewed traffic across the nodes, which can further increase the latencies or timeouts for read and write operations.

Recovery

Today, in order to maintain the desired node count of the domain, Amazon OpenSearch Service launches data nodes in the remaining healthy Availability Zones, similar to the scenarios described in the failure section above. In order to ensure proper node distribution across all the Availability Zones after such an incident, manual intervention was needed by AWS.

What’s changing

To improve the overall failure handling and minimizing the impact of failure on the domain health and performance, Amazon OpenSearch Service is performing the following changes:

  • Forced Zone Awareness: OpenSearch has a preexisting shard balancing configuration called forced awareness that is used to set the Availability Zones to which shards need to be allocated. For example, if you have an awareness attribute called zone and configure nodes in zone1 and zone2, you can use forced awareness to prevent OpenSearch from allocating replicas if only one zone is available:
cluster.routing.allocation.awareness.attributes: zone
cluster.routing.allocation.awareness.force.zone.values: zone1,zone2

With this example configuration, if you start two nodes with node.attr.zone set to zone1 and create an index with five shards and one replica, OpenSearch creates the index and allocates the five primary shards but no replicas. Replicas are only allocated once nodes with node.attr.zone set to zone2 are available.

Amazon OpenSearch Service will use the forced awareness configuration on Multi-AZ domains to ensure that shards are only allocated according to the rules of zone awareness. This would prevent the sudden increase in load on the nodes of the healthy Availability Zones.

  • Load-Aware Shard Allocation: Amazon OpenSearch Service will take into consideration factors like the provisioned capacity, actual capacity, and total shard copies to calculate if any node is overloaded with more shards based on expected average shards per node. It would prevent shard assignment when any node has allocated a shard count that goes beyond this limit.

Note that any unassigned primary copy would still be allowed on the overloaded node to prevent the cluster from any imminent data loss.

Similarly, to address the manual recovery issue (as described in the Recovery section above), Amazon OpenSearch Service is also making changes to its internal scaling component. With the newer changes in place, Amazon OpenSearch Service will not launch nodes in the remaining Availability Zones even when it goes through the previously described failure scenario.

Visualizing the current and new behavior

For example, an Amazon OpenSearch Service domain is configured with 3-AZ, 6 data nodes, 12 primary shards, and 24 replica shards. The domain is provisioned across AZ-1, AZ-2, and AZ-3, with two nodes in each of the zones.

Current shard allocation:
Total number of shards: 12 Primary + 24 Replica = 36 shards
Number of Availability Zones: 3
Number of shards per zone (zone awareness is true): 36/3 = 12
Number of nodes per Availability Zone: 2
Number of shards per node: 12/2 = 6

The following diagram provides a visual representation of the domain setup. The circles denote the count of shards allocated to the node. Amazon OpenSearch Service will allocate six shards per node.

During a partial zone failure, where one node in AZ-3 fails, the failed node is assigned to the remaining zone, and the shards in the zone are redistributed based on the available nodes. After the changes described above, the cluster will not create a new node or redistribute shards after the failure of the node.


In the diagram above, with the loss of one node in AZ-3, Amazon OpenSearch Service would try to launch the replacement capacity in the same zone. However, due to some outage, the zone might be impaired and would fail to launch the replacement. In such an event, the service tries to launch deficit capacity in another healthy zone, which might lead to zone imbalance across Availability Zones. Shards on the impacted zone get stuffed on the surviving node in the same zone. However, with the new behavior, the service would try to attempt launching capacity in the same zone but would avoid launching deficit capacity in other zones to avoid imbalance. The shard allocator would also ensure that the surviving nodes don’t get overloaded.


Similarly, in case all the nodes in AZ-3 are lost, or the AZ-3 becomes impaired, Amazon OpenSearch Service brings up the lost nodes in the remaining Availability Zone and also redistributes the shards on the nodes. However, after the new changes, Amazon OpenSearch Service will neither allocate nodes to the remaining zone or it will try to re-allocate lost shards to the remaining zone. Amazon OpenSearch Service will wait for the Recovery to happen and for the domain to return to the original configuration after recovery.

If your domain is not provisioned with enough capacity to withstand the loss of an Availability Zone, you may experience a drop in throughput for your domain. It is therefore strongly recommended to follow the best practices while sizing your domain, which means having enough resources provisioned to withstand the loss of a single Availability Zone failure.


Currently, once the domain recovers, the service requires manual intervention to balance capacity across Availability Zones, which also involves shard movements. However, with the new behaviour, there is no intervention needed during the recovery process because the capacity returns in the impacted zone and the shards are also automatically allocated to the recovered nodes. This ensures that there are no competing priorities on the remaining resources.

What you can expect

After you update your Amazon OpenSearch Service domain to the latest service software release, the domains that have been configured with best practices will have more predictable performance even after losing one or many data nodes in an Availability Zone. There will be reduced cases of shard overallocation in a node. It is a good practice to provision sufficient capacity to be able to tolerate a single zone failure

You might at times see a domain turning yellow during such unexpected failures since we won’t assign replica shards to overloaded nodes. However, this does not mean that there will be data loss in a well-configured domain. We will still make sure that all primaries are assigned during the outages. There is an automated recovery in place, which will take care of balancing the nodes in the domain and ensuring that the replicas are assigned once the failure recovers.

Update the service software of your Amazon OpenSearch Service domain to get these new changes applied to your domain. More details on the service software update process are in the Amazon OpenSearch Service documentation.

Conclusion

In this post we saw how Amazon OpenSearch Service recently improved the logic to distribute nodes and shards across Availability Zones during zonal outages.

This change will help the service to ensure more consistent and predictable performance during node or zonal failures. Domains won’t see any increased latencies or write blocks during processing writes and reads, which used to surface earlier at times due to over-allocation of shards on nodes.


About the authors

Bukhtawar Khan is a Senior Software Engineer working on Amazon OpenSearch Service. He is interested in distributed and autonomous systems. He is an active contributor to OpenSearch.

Anshu Agarwal is a Senior Software Engineer working on AWS OpenSearch at Amazon Web Services. She is passionate about solving problems related to building scalable and highly reliable systems.

Shourya Dutta Biswas is a Software Engineer working on AWS OpenSearch at Amazon Web Services. He is passionate about building highly resilient distributed systems.

Rishab Nahata is a Software Engineer working on OpenSearch at Amazon Web Services. He is fascinated about solving problems in distributed systems. He is active contributor to OpenSearch.

Ranjith Ramachandra is an Engineering Manager working on Amazon OpenSearch Service at Amazon Web Services.

Jon Handler is a Senior Principal Solutions Architect, specializing in AWS search technologies – Amazon CloudSearch, and Amazon OpenSearch Service. Based in Palo Alto, he helps a broad range of customers get their search and log analytics workloads deployed right and functioning well.

How to get best price performance from your Amazon Redshift Data Sharing deployment

Post Syndicated from BP Yau original https://aws.amazon.com/blogs/big-data/how-to-get-best-price-performance-from-your-amazon-redshift-data-sharing-deployment/

Amazon Redshift is a fast, scalable, secure, and fully-managed data warehouse that enables you to analyze all of your data using standard SQL easily and cost-effectively. Amazon Redshift Data Sharing allows customers to securely share live, transactionally consistent data in one Amazon Redshift cluster with another Amazon Redshift cluster across accounts and regions without needing to copy or move data from one cluster to another.

Amazon Redshift Data Sharing was initially launched in March 2021, and added support for cross-account data sharing was added in August 2021. The cross-region support became generally available in February 2022. This provides full flexibility and agility to share data across Redshift clusters in the same AWS account, different accounts, or different regions.

Amazon Redshift Data Sharing is used to fundamentally redefine Amazon Redshift deployment architectures into a hub-spoke, data mesh model to better meet performance SLAs, provide workload isolation, perform cross-group analytics, easily onboard new use cases, and most importantly do all of this without the complexity of data movement and data copies. Some of the most common questions asked during data sharing deployment are, “How big should my consumer clusters and producer clusters be?”, and “How do I get the best price performance for workload isolation?”. As workload characteristics like data size, ingestion rate, query pattern, and maintenance activities can impact data sharing performance, a continuous strategy to size both consumer and producer clusters to maximize the performance and minimize cost should be implemented. In this post, we provide a step-by-step approach to help you determine your producer and consumer clusters sizes for the best price performance based on your specific workload.

Generic consumer sizing guidance

The following steps show the generic strategy to size your producer and consumer clusters. You can use it as a starting point and modify accordingly to cater your specific use case scenario.

Size your producer cluster

You should always make sure that you properly size your producer cluster to get the performance that you need to meet your SLA. You can leverage the sizing calculator from the Amazon Redshift console to get a recommendation for the producer cluster based on the size of your data and query characteristic. Look for Help me choose on the console in AWS Regions that support RA3 node types to use this sizing calculator. Note that this is just an initial recommendation to get started, and you should test running your full workload on the initial size cluster and elastic resize the cluster up and down accordingly to get the best price performance.

Size and setup initial consumer cluster

You should always size your consumer cluster based on your compute needs. One way to get started is to follow the generic cluster sizing guide similar to the producer cluster above.

Setup Amazon Redshift data sharing

Setup data sharing from producer to consumer once you have both the producer and consumer cluster setup. Refer to this post for guidance on how to setup data sharing.

Test consumer only workload on initial consumer cluster

Test consumer only workload on the new initial consumer cluster. This can be done by pointing consumer applications, for example ETL tools, BI applications, and SQL clients, to the new consumer cluster and rerunning the workload to evaluate the performance against your requirements.

Test consumer only workload on different consumer cluster configurations

If the initial size consumer cluster meets or exceeds your workload performance requirements, then you can either continue to use this cluster configuration or you can test on smaller configurations to see if you can further reduce the cost and still get the performance that you need.

On the other hand, if the initial size consumer cluster fails to meet your workload performance requirements, then you can further test larger configurations to get the configuration that meets your SLA.

As a rule of thumb, size up the consumer cluster by 2x the initial cluster configuration incrementally until it meets your workload requirements.

Once you plan out what configuration you want to test, use elastic resize to resize the initial cluster to the target cluster configuration. After elastic resize is completed, perform the same workload test and evaluate the performance against your SLA. Select the configuration that meets your price performance target.

Test producer only workload on different producer cluster configurations

Once you move your consumer workload to the consumer cluster with the optimum price performance, there might be an opportunity to reduce the compute resource on the producer to save on costs.

To achieve this, you can rerun the producer only workload on 1/2x of the original producer size and evaluate the workload performance. Resizing the cluster up and down accordingly depends on the result, and then you select the minimum producer configuration that meets your workload performance requirements.

Re-evaluate after a full workload run over time

As Amazon Redshift continues evolving, and there are continuous performance and scalability improvement releases, data sharing performance will continue improving. Furthermore, numerous variables might impact the performance of data sharing queries. The following are just some examples:

  • Ingestion rate and amount of data change
  • Query pattern and characteristic
  • Workload changes
  • Concurrency
  • Maintenance activities, for example vacuum, analyze, and ATO

This is why you must re-evaluate the producer and consumer cluster sizing using the strategy above on occasion, especially after a full workload deployment, to gain the new best price performance from your cluster’s configuration.

Automated sizing solutions

If your environment involved more complex architecture, for example with multiple tools or applications (BI, ingestion or streaming, ETL, data science), then it might not feasible to use the manual method from the generic guidance above. Instead, you can leverage solutions in this section to automatically replay the workload from your production cluster on the test consumer and producer clusters to evaluate the performance.

Simple Replay utility will be leveraged as the automated solution to guide you through the process of getting the right producer and consumer clusters size for the best price performance.

Simple Replay is a tool for conducting a what-if analysis and evaluating how your workload performs in different scenarios. For example, you can use the tool to benchmark your actual workload on a new instance type like RA3, evaluate a new feature, or assess different cluster configurations. It also includes enhanced support for replaying data ingestion and export pipelines with COPY and UNLOAD statements. To get started and replay your workloads, download the tool from the Amazon Redshift GitHub repository.

Here we walk through the steps to extract your workload logs from the source production cluster and replay them in an isolated environment. This lets you perform a direct comparison between these Amazon Redshift clusters seamlessly and select the clusters configuration that best meet your price performance target.

The following diagram shows the solution architecture.

Architecutre for testing simple replay

Solution walkthrough

Follow these steps to go through the solution to size your consumer and producer clusters.

Size your production cluster

You should always make sure to properly size your existing production cluster to get the performance that you need to meet your workload requirements. You can leverage the sizing calculator from the Amazon Redshift console to get a recommendation on the production cluster based on the size of your data and query characteristic. Look for Help me choose on the console in AWS Regions that support RA3 node types to use this sizing calculator. Note that this is just an initial recommendation to get started. You should test running your full workload on the initial size cluster and elastic resize the cluster up and down accordingly to get the best price performance.

Identify the workload to be isolated

You might have different workloads running on your original cluster, but the first step is to identify the most critical workload to the business that we want to isolate. This is because we want to make sure that the new architecture can meet your workload requirements. This post is a good reference on a data sharing workload isolation use case that can help you decide which workload can be isolated.

Setup Simple Replay

Once you know your critical workload, you must enable audit logging in your production cluster where the critical workload identified above is running to capture query activities and store in Amazon Simple Storage Service (Amazon S3). Note that it may take up to three hours for the audit logs to be delivered to Amazon S3. Once the audit log is available, proceed to setup Simple Replay and then extract the critical workload from the audit log. Note that start_time and end_time could be used as parameters to filter out the critical workload if those workloads run in certain time periods, for example 9am to 11am. Otherwise it will extract all of the logged activities.

Baseline workload

Create a baseline cluster with the same configuration as the producer cluster by restoring from the production snapshot. The purpose of starting with the same configuration is to baseline the performance with an isolated environment.

Once the baseline cluster is available, replay the extracted workload in the baseline cluster. The output from this replay will be the baseline used to compare against subsequent replays on different consumer configurations.

Setup initial producer and consumer test clusters

Create a producer cluster with the same production cluster configuration by restoring from the production snapshot. Create a consumer cluster with the recommended initial consumer size from the previous guidance. Furthermore, setup data sharing between the producer and consumer.

Replay workload on initial producer and consumer

Replay the producer only workload on the initial size producer cluster. This can be achieved using the “Exclude” filter parameter to exclude consumer queries, for example the user that runs consumer queries.

Replay the consumer only workload on the initial size consumer cluster. This can be achieved using the “Include” filter parameter to exclude consumer queries, for example the user that runs consumer queries.

Evaluate the performance of these replays against the baseline and workload performance requirements.

Replay consumer workload on different configurations

If the initial size consumer cluster meets or exceeds your workload performance requirements, then you can either use this cluster configuration or you can follow these steps to test on smaller configurations to see if you can further reduce costs and still get the performance that you need.

Compare initial consumer performance results against your workload requirements:

  1. If the result exceeds your workload performance requirements, then you can reduce the size of the consumer cluster incrementally, starting with 1/2x, retry the replay and evaluate the performance, then resize up or down accordingly based on the result until it meets your workload requirements. The purpose is to get a sweet spot where you’re comfortable with the performance requirements and get the lowest price possible.
  2. If the result fails to meet your workload performance requirements, then you can increase the size of the cluster incrementally, starting with 2x the original size, retry the replay and evaluate the performance until it meets your workload performance requirements.

Replay producer workload on different configurations

Once you split your workloads out to consumer clusters, the load on the producer cluster should be reduced and you should evaluate your producer cluster’s workload performance to seek the opportunity to downsize to save on costs.

The steps are similar to consumer replay. Elastic resize the producer cluster incrementally starting with 1/2x the original size, replay the producer only workload and evaluate the performance, and then further resize up or down until it meets your workload performance requirements. The purpose is to get a sweet spot where you’re comfortable with the workload performance requirements and get the lowest price possible. Once you have the desired producer cluster configuration, retry replay consumer workloads on the consumer cluster to make sure that the performance wasn’t impacted by producer cluster configuration changes. Finally, you should replay both producer and consumer workloads concurrently to make sure that the performance is achieved in a full workload scenario.

Re-evaluate after a full workload run over time

Similar to the generic guidance, you should re-evaluate the producer and consumer clusters sizing using the previous strategy on occasion, especially after full workload deployment to gain the new best price performance from your cluster’s configuration.

Clean up

Running these sizing tests in your AWS account may have some cost implications because it provisions new Amazon Redshift clusters, which may be charged as on-demand instances if you don’t have Reserved Instances. When you complete your evaluations, we recommend deleting the Amazon Redshift clusters to save on costs. We also recommend pausing your clusters when they’re not in use.

Applying Amazon Redshift and data sharing best practices

Proper sizing of both your producer and consumer clusters will give you a good start to get the best price performance from your Amazon Redshift deployment. However, sizing isn’t the only factor that can maximize your performance. In this case, understanding and following best practices are equally important.

General Amazon Redshift performance tuning best practices are applicable to data sharing deployment. Make sure that your deployment follows these best practices.

There numerous data sharing specific best practices that you should follow to make sure that you maximize the performance. Refer to this post for more details.

Summary

There is no one-size-fits-all recommendation on producer and consumer cluster sizes. It varies by workloads and your performance SLA. The purpose of this post is to provide you with guidance for how you can evaluate your specific data sharing workload performance to determine both consumer and producer cluster sizes to get the best price performance. Consider testing your workloads on producer and consumer using simple replay before adopting it in production to get the best price performance.


About the Authors

BP Yau is a Sr Product Manager at AWS. He is passionate about helping customers architect big data solutions to process data at scale. Before AWS, he helped Amazon.com Supply Chain Optimization Technologies migrate its Oracle data warehouse to Amazon Redshift and build its next generation big data analytics platform using AWS technologies.

Sidhanth Muralidhar is a Principal Technical Account Manager at AWS. He works with large enterprise customers who run their workloads on AWS. He is passionate about working with customers and helping them architect workloads for costs, reliability, performance and operational excellence at scale in their cloud journey. He has a keen interest in Data Analytics as well.

Run fault tolerant and cost-optimized Spark clusters using Amazon EMR on EKS and Amazon EC2 Spot Instances

Post Syndicated from Kinnar Kumar Sen original https://aws.amazon.com/blogs/big-data/run-fault-tolerant-and-cost-optimized-spark-clusters-using-amazon-emr-on-eks-and-amazon-ec2-spot-instances/

Amazon EMR on EKS is a deployment option in Amazon EMR that allows you to run Spark jobs on Amazon Elastic Kubernetes Service (Amazon EKS). Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances save you up to 90% over On-Demand Instances, and is a great way to cost optimize the Spark workloads running on Amazon EMR on EKS. Because Spot is an interruptible service, if we can move or reuse the intermediate shuffle files, it improves the overall stability and SLA of the job. The latest versions of Amazon EMR on EKS have integrated Spark features to enable this capability.

In this post, we discuss these features—Node Decommissioning and Persistent Volume Claim (PVC) reuse—and their impact on increasing the fault tolerance of Spark jobs on Amazon EMR on EKS when cost optimizing using EC2 Spot Instances.

Amazon EMR on EKS and Spot

EC2 Spot Instances are spare EC2 capacity provided at a steep discount of up to 90% over On-Demand prices. Spot Instances are a great choice for stateless and flexible workloads. The caveat with this discount and spare capacity is that Amazon EC2 can interrupt an instance with a proactive or reactive (2-minute) warning when it needs the capacity back. You can provision compute capacity in an EKS cluster using Spot Instances using a managed or self-managed node group and provide cost optimization for your workloads.

Amazon EMR on EKS uses Amazon EKS to run jobs with the EMR runtime for Apache Spark, which can be cost optimized by running the Spark executors on Spot. It provides up to 61% lower costs and up to 68% performance improvement for Spark workloads on Amazon EKS. The Spark application launches a driver and executors to run the computation. Spark is a semi-fault tolerant framework that is resilient to executor loss due to an interruption and therefore can run on EC2 Spot. On the other hand, when the driver is interrupted, the job fails. Hence, we recommend running drivers on on-demand instances. Some of the best practices for running Spark on Amazon EKS are applicable with Amazon EMR on EKS.

EC2 Spot instances also helps in cost optimization by improving the overall throughput of the job. This can be achieved by auto-scaling the cluster using Cluster Autoscaler (for managed nodegroups) or Karpenter.

Though Spark executors are resilient to Spot interruptions, the shuffle files and RDD data is lost when the executor gets killed. The lost shuffle files need to be recomputed, which increases the overall runtime of the job. Apache Spark has released two features (in versions 3.1 and 3.2) that addresses this issue. Amazon EMR on EKS released features such as node decommissioning (version 6.3) and PVC reuse (version 6.8) to simplify recovery and reuse shuffle files, which increases the overall resiliency of your application.

Node decommissioning

The node decommissioning feature works by preventing scheduling of new jobs on the nodes that are to be decommissioned. It also moves any shuffle files or cache present in those nodes to other executors (peers). If there are no other available executors, the shuffle files and cache are moved to a remote fallback storage.

Node Decommissioning

Fig 1 : Node Decommissioning

Let’s look at the decommission steps in more detail.

If one of the nodes that is running executors is interrupted, the executor starts the process of decommissioning and sends the message to the driver:

21/05/05 17:41:41 WARN KubernetesClusterSchedulerBackend$KubernetesDriverEndpoint: Received executor 7 decommissioned message
21/05/05 17:41:41 DEBUG TaskSetManager: Valid locality levels for TaskSet 2.0: NO_PREF, ANY
21/05/05 17:41:41 INFO KubernetesClusterSchedulerBackend: Decommission executors: 7
21/05/05 17:41:41 DEBUG TaskSchedulerImpl: parentName: , name: TaskSet_2.0, runningTasks: 10
21/05/05 17:41:41 INFO BlockManagerMasterEndpoint: Mark BlockManagers (BlockManagerId(7, 192.168.82.107, 39007, None)) as being decommissioning.
21/05/05 20:22:17 INFO CoarseGrainedExecutorBackend: Decommission executor 1.
21/05/05 20:22:17 INFO CoarseGrainedExecutorBackend: Will exit when finished decommissioning
21/05/05 20:22:17 INFO BlockManager: Starting block manager decommissioning process...
21/05/05 20:22:17 DEBUG FileSystem: Looking for FS supporting s3a

The executor looks for RDD or shuffle files and tries to replicate or migrate those files. It first tries to find a peer executor. If successful, it will move the files to the peer executor:

22/06/07 20:41:38 INFO ShuffleStatus: Updating map output for 46 to BlockManagerId(4, 192.168.13.235, 34737, None)
22/06/07 20:41:38 DEBUG BlockManagerMasterEndpoint: Received shuffle data block update for 0 46, ignore.
22/06/07 20:41:38 DEBUG BlockManagerMasterEndpoint: Received shuffle index block update for 0 46, updating.

However, if It is not able to find a peer executor, it will try to move the files to a fallback storage if available.

Fallback Storage

Fig 2: Fallback Storage

The executor is then decommissioned. When a new executor comes up, the shuffle files are reused:

22/06/07 20:42:50 INFO BasicExecutorFeatureStep: Adding decommission script to lifecycle
22/06/07 20:42:50 DEBUG ExecutorPodsAllocator: Requested executor with id 19 from Kubernetes.
22/06/07 20:42:50 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-bfd0a5813fd1b80f-exec-19, action ADDED
22/06/07 20:42:50 DEBUG BlockManagerMasterEndpoint: Received shuffle index block update for 0 52, updating.
22/06/07 20:42:50 INFO ShuffleStatus: Recover 52 BlockManagerId(fallback, remote, 7337, None)

The key advantage of this process is that it enables migrates blocks and shuffle data, thereby reducing recomputation, which adds to the overall resiliency of the system and reduces runtime. This process can be triggered by a Spot interruption signal (Sigterm) and node draining. Node draining  may happen due to high-priority task scheduling or independently.

When you use Amazon EMR on EKS with managed node groups/Karpenter, the Spot interruption handling is automated, wherein Amazon EKS gracefully drains and rebalances the Spot nodes to minimize application disruption when a Spot node is at elevated risk of interruption. If you’re using managed node groups/Karpenter, the decommission gets triggered when the nodes are getting drained and because it’s proactive, it gives you more time (at least 2 minutes) to move the files. In the case of self-managed node groups, we recommend installing the AWS Node Termination Handler to handle the interruption, and the decommission is triggered when the reactive (2-minute) notification is received. We recommend to use Karpenter with Spot Instances as it has faster node scheduling with early pod binding and binpacking to optimize the resource utilization.

The following code enables this configuration; more details are available on GitHub:

"spark.decommission.enabled": "true"
"spark.storage.decommission.rddBlocks.enabled": "true"
"spark.storage.decommission.shuffleBlocks.enabled" : "true"
"spark.storage.decommission.enabled": "true"
"spark.storage.decommission.fallbackStorage.path": "s3://<<bucket>>"

PVC reuse

Apache Spark enabled dynamic PVC in version 3.1, which is useful with dynamic allocation because we don’t have to pre-create the claims or volumes for the executors and delete them after completion. PVC enables true decoupling of data and processing when we’re running Spark jobs on Kubernetes, because we can use it as a local storage to spill in-process files too. The latest version of Amazon EMR 6.8 has integrated the PVC reuse feature of Spark, wherein if an executor is terminated due to EC2 Spot interruption or any other reason (JVM), then the PVC is not deleted but persisted and reattached to another executor. If there are shuffle files in that volume, then they are reused.

As with node decommission, this reduces the overall runtime because we don’t have to recompute the shuffle files. We also save the time required to request a new volume for an executor, and shuffle files can be reused without moving the files round.

The following diagram illustrates this workflow.

PVC Reuse

Fig 3: PVC Reuse

Let’s look at the steps in more detail.

If one or more of the nodes that are running executors is interrupted, the underlying pods get terminated and the driver gets the update. Note that the driver is the owner of the PVC of the executors, and they are not terminated. See the following code:

22/06/15 23:25:07 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-3, action DELETED
22/06/15 23:25:07 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-6, action MODIFIED
22/06/15 23:25:07 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-6, action DELETED
22/06/15 23:25:07 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-3, action MODIFIED

The ExecutorPodsAllocator tries to allocate new executor pods to replace the ones terminated due to interruption. During the allocation, it figures out how many of the existing PVCs have files and can be reused:

22/06/15 23:25:23 INFO ExecutorPodsAllocator: Found 2 reusable PVCs from 10 PVCs

The ExecutorPodsAllocator requests for a pod and when it launches it, the PVC is reused. In the following example, the PVC from executor 6 is reused for new executor pod 11:

22/06/15 23:25:23 DEBUG ExecutorPodsAllocator: Requested executor with id 11 from Kubernetes.
22/06/15 23:25:24 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-11, action ADDED
22/06/15 23:25:24 INFO KubernetesClientUtils: Spark configuration files loaded from Some(/usr/lib/spark/conf) : log4j.properties,spark-env.sh,hive-site.xml,metrics.properties
22/06/15 23:25:24 INFO BasicExecutorFeatureStep: Decommissioning not enabled, skipping shutdown script
22/06/15 23:25:24 DEBUG ExecutorPodsWatchSnapshotSource: Received executor pod update for pod named amazon-reviews-word-count-9ee82b8169a75183-exec-11, action MODIFIED
22/06/15 23:25:24 INFO ExecutorPodsAllocator: Reuse PersistentVolumeClaim amazon-reviews-word-count-9ee82b8169a75183-exec-6-pvc-0

The shuffle files, if present in the PVC are reused.

The key advantage of this technique is that it allows us to reuse pre-computed shuffle files in their original location, thereby reducing the time of the overall job run.

This works for both static and dynamic PVCs. Amazon EKS offers three different storage offerings, which can be encrypted too: Amazon Elastic Block Store (Amazon EBS), Amazon Elastic File System (Amazon EFS), and Amazon FSx for Lustre. We recommend using dynamic PVCs with Amazon EBS because with static PVCs, you would need to create multiple PVCs.

The following code enables this configuration; more details are available on GitHub:

"spark.kubernetes.driver.ownPersistentVolumeClaim": "true"
"spark.kubernetes.driver.reusePersistentVolumeClaim": "true"

For this to work, we need to enable PVC with Amazon EKS and mention the details in the Spark runtime configuration. For instructions, refer to How do I use persistent storage in Amazon EKS? The following code contains the Spark configuration details for using PVC as local storage; other details are available on GitHub:

"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.readOnly": "false"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.claimName": "OnDemand"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.storageClass": "spark-sc"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.options.sizeLimit": "10Gi"
"spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1.mount.path": "/var/data/spill"

Conclusion

With Amazon EMR on EKS (6.9) and the features discussed in this post, you can further reduce the overall runtime for Spark jobs when running with Spot Instances. This also improves the overall resiliency and flexibility of the job while cost optimizing the workload on EC2 Spot.

Try out the EMR on EKS workshop for improved performance when running Spark workloads on Kubernetes and cost optimize using EC2 Spot Instances.


About the Author

Kinnar Kumar Sen is a Sr. Solutions Architect at Amazon Web Services (AWS) focusing on Flexible Compute. As a part of the EC2 Flexible Compute team, he works with customers to guide them to the most elastic and efficient compute options that are suitable for their workload running on AWS. Kinnar has more than 15 years of industry experience working in research, consultancy, engineering, and architecture.

Introducing native Delta Lake table support with AWS Glue crawlers

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-native-delta-lake-table-support-with-aws-glue-crawlers/

Delta Lake is an open-source project that helps implement modern data lake architectures commonly built on Amazon S3 or other cloud storages. With Delta Lake, you can achieve ACID transactions, time travel queries, CDC, and other common use cases on the cloud. Delta Lake is available with multiple AWS services, such as AWS Glue Spark jobs, Amazon EMR, Amazon Athena, and Amazon Redshift Spectrum.

AWS Glue includes Delta crawler, a capability that makes discovering datasets simpler by scanning Delta Lake transaction logs in Amazon Simple Storage Service (Amazon S3), extracting their schema, creating manifest files in Amazon S3, and automatically populating the AWS Glue Data Catalog, which keeps the metadata current.  The newly created AWS Glue Data Catalog table has format SymlinkTextInputFormat. Delta crawler creates a manifest file, which is a text file containing the list of data files that query engines such as Presto, Trino, or Athena can use to query the table rather than finding the files with the directory listing. A previous blog post demonstrated how it works. Manifest files needed to be regenerated on a periodic basis to include newer transactions in the original Delta Lake tables which resulted in expensive I/O operations, longer processing times, and increased storage footprint.

With today’s launch, Glue crawler is adding support for creating AWS Glue Data Catalog tables for native Delta Lake tables and does not require generating manifest files. This improves customer experience because now you don’t have to regenerate manifest files whenever a new partition becomes available or a table’s metadata changes. With the native Delta Lake tables and automatic schema evolution with no additional manual intervention, this reduces the time to insight by making newly ingested data quickly available for analysis with your preferred analytics and machine learning (ML) tools.

Amazon Athena SQL engine version 3 started supporting Delta Lake native connector. AWS Glue for Apache Spark also started supporting Delta Lake native connector in Glue version 3.0 and later. Amazon EMR started supporting Delta Lake in EMR release version 6.9.0 and later. It means that you can query the Delta transaction log directly in Amazon Athena, AWS Glue for Apache Spark, and Amazon EMR. It makes the experience of working with native Delta Lake tables seamless across the platforms.

This post demonstrates how AWS Glue crawlers work with native Delta Lake tables and describes typical use cases to query native Delta Lake tables.

How AWS Glue crawler works with native Delta Lake tables

Now AWS Glue crawler has two different options:

  • Native table: Create a native Delta Lake table definition on AWS Glue Data Catalog.
  • Symlink table: Create a symlink-based manifest table definition on AWS Glue Data Catalog from a Delta Lake table, and generate its symlink files on Amazon S3.

Native table

Native Delta Lake tables are accessible from Amazon Athena (engine version 3), AWS Glue for Apache Spark (Glue version 3.0 and later), Amazon EMR (release version 6.9.0 and later), and other platforms that support Delta Lake tables. With the native Delta Lake tables, you have the capabilities such as ACID transactions, all while needing to maintain just a single source of truth.

Symlink table

Symlink tables are a consistent snapshot of a native Delta Lake table, represented using the SymlinkTextInputFormat using parquet files. The symlink tables are accessible from Amazon Athena and Amazon Redshift Spectrum.

Since the symlink tables are a snapshot of the original native Delta Lake tables, you need to maintain both the original native Delta Lake tables and the symlink tables. When the data or schema in an original Delta Lake table is updated, the symlink tables in the AWS Glue Data Catalog may become out of sync. It means that you can still query the symlink table and get a consistent result, but the result of the table is at the previous point in time.

Crawl native Delta Lake tables using AWS Glue crawler

In this section, let’s go through how to crawl native Delta Lake tables using AWS Glue crawler.

Prerequisite

Here’s the prerequisite for this tutorial:

  1. Install and configure AWS Command Line Interface (AWS CLI).
  2. Create your S3 bucket if you do not have it.
  3. Create your IAM role for AWS Glue crawler if you do not have it.
  4. Run the following command to copy the sample Delta Lake table into your S3 bucket. (Replace your_s3_bucket with your S3 bucket name.)
$ aws s3 sync s3://aws-bigdata-blog/artifacts/delta-lake-crawler/sample_delta_table/ s3://your_s3_bucket/data/sample_delta_table

Create a Delta Lake crawler

A Delta Lake crawler can be created through the AWS Glue console, AWS Glue SDK, or AWS CLI. Specify a DeltaTarget with the following configurations:

  • DeltaTables – A list of S3 DeltaPaths where the Delta Lake tables are located. (Note that each path must be the parent of a _delta_log folder. If the Delta transaction log is located at s3://bucket/sample_delta_table/_delta_log, then the path s3://bucket/sample_delta_table/ should be provided.
  • WriteManifest – A Boolean value indicating whether or not the crawler should write the manifest files for each DeltaPath. This parameter is only applicable for Delta Lake tables created via manifest files
  • CreateNativeDeltaTable – A Boolean value indicating whether the crawler should create a native Delta Lake table. If set to False, the crawler would create a symlink table instead. Note that both WriteManifest and CreateNativeDeltaTable options can’t be set to True.
  • ConnectionName – An optional connection name stored in the Data Catalog that the crawler should use to access Delta Lake tables backed by a VPC.

In this instruction, create the crawler through the console. Complete the following steps to create a Delta Lake crawler:

  1. Open the AWS Glue console.
  2. Choose Crawlers.
  3. Choose Create crawler.
  4. For Name, enter delta-lake-native-crawler, and choose Next.
  5. Under Data sources, choose Add a data source.
  6. For Data source, select Delta Lake.
  7. For Include delta lake table path(s), enter s3://your_s3_bucket/data/sample_delta_table/.
  8. For Create tables for querying, choose Create Native tables,
  9. Choose Add a Delta Lake data source.
  10. Choose Next.
  11. For Existing IAM role, choose your IAM role, then choose Next.
  12. For Target database, choose Add database, then Add database dialog appears. For Database name, enter delta_lake_native, then choose Create. Choose Next.
  13. Choose Create crawler.
  14. The Delta Lake crawler can be triggered to run through the console or through the SDK or AWS CLI using the StartCrawl API. It could also be scheduled through the console to trigger the crawlers at specific times. In this instruction, run the crawler through the console.
  15. Select delta-lake-native-crawler, and choose Run.
  16. Wait for the crawler to complete.

After the crawler has run, you can see the Delta Lake table definition in the AWS Glue console:

You can also verify an AWS Glue table definition through the following AWS CLI command:

$ aws glue get-table --database delta_lake_native --name sample_delta_table
{
    "Table": {
        "Name": "sample_delta_table",
        "DatabaseName": "delta_lake_native",
        "Owner": "owner",
        "CreateTime": "2022-11-08T12:11:20+09:00",
        "UpdateTime": "2022-11-08T13:19:06+09:00",
        "LastAccessTime": "2022-11-08T13:19:06+09:00",
        "Retention": 0,
        "StorageDescriptor": {
            "Columns": [
                {
                    "Name": "product_id",
                    "Type": "string"
                },
                {
                    "Name": "product_name",
                    "Type": "string"
                },
                {
                    "Name": "price",
                    "Type": "bigint"
                },
                {
                    "Name": "currency",
                    "Type": "string"
                },
                {
                    "Name": "category",
                    "Type": "string"
                },
                {
                    "Name": "updated_at",
                    "Type": "double"
                }
            ],
            "Location": "s3://your_s3_bucket/data/sample_delta_table/",
            "AdditionalLocations": [],
            "InputFormat": "org.apache.hadoop.mapred.SequenceFileInputFormat",
            "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat",
            "Compressed": false,
            "NumberOfBuckets": -1,
            "SerdeInfo": {
                "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
                "Parameters": {
                    "serialization.format": "1",
                    "path": "s3://your_s3_bucket/data/sample_delta_table/"
                }
            },
            "BucketColumns": [],
            "SortColumns": [],
            "Parameters": {
                "EXTERNAL": "true",
                "UPDATED_BY_CRAWLER": "delta-lake-native-connector",
                "spark.sql.sources.schema.part.0": "{\"type\":\"struct\",\"fields\":[{\"name\":\"product_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"product_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"price\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"CURRENCY\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"category\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"updated_at\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}",
                "CrawlerSchemaSerializerVersion": "1.0",
                "CrawlerSchemaDeserializerVersion": "1.0",
                "spark.sql.partitionProvider": "catalog",
                "classification": "delta",
                "spark.sql.sources.schema.numParts": "1",
                "spark.sql.sources.provider": "delta",
                "delta.lastCommitTimestamp": "1653462383292",
                "delta.lastUpdateVersion": "6",
                "table_type": "delta"
            },
            "StoredAsSubDirectories": false
        },
        "PartitionKeys": [],
        "TableType": "EXTERNAL_TABLE",
        "Parameters": {
            "EXTERNAL": "true",
            "UPDATED_BY_CRAWLER": "delta-lake-native-connector",
            "spark.sql.sources.schema.part.0": "{\"type\":\"struct\",\"fields\":[{\"name\":\"product_id\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"product_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"price\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"CURRENCY\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"category\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"updated_at\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}",
            "CrawlerSchemaSerializerVersion": "1.0",
            "CrawlerSchemaDeserializerVersion": "1.0",
            "spark.sql.partitionProvider": "catalog",
            "classification": "delta",
            "spark.sql.sources.schema.numParts": "1",
            "spark.sql.sources.provider": "delta",
            "delta.lastCommitTimestamp": "1653462383292",
            "delta.lastUpdateVersion": "6",
            "table_type": "delta"
        },
        "CreatedBy": "arn:aws:sts::012345678901:assumed-role/AWSGlueServiceRole/AWS-Crawler",
        "IsRegisteredWithLakeFormation": false,
        "CatalogId": "012345678901",
        "IsRowFilteringEnabled": false,
        "VersionId": "1",
        "DatabaseId": "0bd458e335a2402c828108f267bc770c"
    }
}

After you create the table definition on AWS Glue Data Catalog, AWS analytics services such as Athena and AWS Glue Spark jobs are able to query the Delta Lake table.

Query Delta Lake tables using Amazon Athena

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run on datasets at petabyte scale. You can use Athena to query your S3 data lake for use cases such as data exploration for machine learning (ML) and AI, business intelligence (BI) reporting, and ad hoc querying.

There are now two ways to use Delta Lake tables in Athena:

  • For native table: Use Athena’s newly launched native support for Delta Lake tables. You can learn more in Querying Delta Lake tables. This method no longer requires regenerating manifest files after every transaction. Data updates are available for queries in Athena as soon as they are performed in the original Delta Lake tables, and you get up to 40 percent improvement in query performance over querying manifest files. Since Athena optimizes data scans in native Delta Lake queries using statistics in Delta Lake files, you get the advantage of reduced cost for Athena queries. This post focuses on this approach.
  • For symlink table: Use SymlinkTextInputFormat to query symlink tables through manifest files generated from Delta Lake tables. This was previously the only manner in which Delta Lake table querying was supported via Athena and is no longer recommended when you use only Athena to query the Delta Lake tables.

To use the native Delta Lake connector in Athena, you need to use Athena engine version 3. If you are using an older engine version, change the engine version.

Complete following steps to start queries on Athena:

  1. Open the Amazon Athena console.
  2. Run the following query.
SELECT * FROM "delta_lake_native"."sample_delta_table" limit 10;

The following screenshot shows our output:

Query Delta Lake tables using AWS Glue for Apache Spark

AWS Glue for Apache Spark natively supports Delta Lake. AWS Glue version 3.0 (Apache Spark 3.1.1) supports Delta Lake 1.0.0, and AWS Glue version 4.0 (Apache Spark 3.3.0) supports Delta Lake 2.1.0. With this native support for Delta Lake, what you need for configuring Delta Lake is to provide a single job parameter --datalake-formats delta. There is no need to configure a separate connector for Delta Lake in AWS Marketplace. It reduces the configuration steps required to use these frameworks in AWS Glue for Apache Spark.

AWS Glue also provides a serverless notebook interface called AWS Glue Studio notebook to query and process data interactively. Complete the following steps to launch AWS Glue Studio notebook and query a Delta Lake table:

  1. On the AWS Glue console, choose Jobs in the navigation plane.
  2. Under Create job, select Jupyter Notebook.
  3. Choose Create a new notebook from scratch, and choose Create.
  4. For Job name, enter delta-sql.
  5. For IAM role,  choose your IAM role. If you don’t have your own role for the AWS Glue job, create it by following the steps documented in the AWS Glue Developer Guide.
  6. Choose Start notebook job.
  7. Copy and paste the following code to the first cell and run the cell.
    %glue_version 3.0
    %%configure
    {
      "--datalake-formats": "delta"
    }

  8. Run the existing cell containing the following code.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
      
    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)

  9. Copy and paste the following code to the third cell and run the cell.
    %%sql
    SELECT * FROM `delta_lake_native`.`sample_delta_table` limit 10

The following screenshot shows our output:

Clean up

Now for the final step, cleaning up the resources:

  • Delete your data under your S3 path: s3://your_s3_bucket/data/sample_delta_table/.
  • Delete the AWS Glue crawler delta-lake-native-crawler.
  • Delete the AWS Glue database delta_lake_native.
  • Delete the AWS Glue notebook job delta-sql.

Conclusion

This post demonstrated how to crawl native Delta Lake tables using an AWS Glue crawler and how to query the crawled tables from Athena and Glue Spark jobs. Start using AWS Glue crawlers for your own native Delta Lake tables.

If you have comments or feedback, please feel free to leave them in the comments.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Kyle Duong is a Software Development Engineer on the AWS Glue and Lake Formation team. He is passionate about building big data technologies and distributed systems. In his free time, he enjoys cycling or playing basketball.

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Getting started with AWS Glue Data Quality for ETL Pipelines

Post Syndicated from Deenbandhu Prasad original https://aws.amazon.com/blogs/big-data/getting-started-with-aws-glue-data-quality-for-etl-pipelines/

Today, hundreds of thousands of customers use data lakes for analytics and machine learning. However, data engineers have to cleanse and prepare this data before it can be used. The underlying data has to be accurate and recent for customer to make confident business decisions. Otherwise, data consumers lose trust in the data and make suboptimal or incorrect decisions. It is a common task for data engineers to evaluate whether the data is accurate and recent or not. Today there are various data quality tools. However, common data quality tools usually require manual processes to monitor data quality.

AWS Glue Data Quality is a preview feature of AWS Glue that measures and monitors the data quality of Amazon Simple Storage Service (Amazon S3) data lakes and in AWS Glue extract, transform, and load (ETL) jobs. This is an open preview feature so it is already enabled in your account in the available Regions. You can easily define and measure the data quality checks in AWS Glue Studio console without writing codes. It simplifies your experience of managing data quality.

This post is Part 2 of a four-post series to explain how AWS Glue Data Quality works. Check out the previous post in this series:

Getting started with AWS Glue Data Quality

In this post, we show how to create an AWS Glue job that measures and monitors the data quality of a data pipeline. We also show how to take action based on the data quality results.

Solution overview

Let’s consider an example use case in which a data engineer needs to build a data pipeline to ingest the data from a raw zone to a curated zone in a data lake. As a data engineer, one of your key responsibilities—along with extracting, transforming, and loading data—is validating the quality of data. Identifying data quality issues upfront helps you prevent placing bad data in the curated zone and avoid arduous data corruption incidents.

In this post, you’ll learn how to easily set up built-in and custom data validation checks in your AWS Glue job to prevent bad data from corrupting the downstream high-quality data.

The dataset used for this post is synthetically generated; the following screenshot shows an example of the data.

Set up resources with AWS CloudFormation

This post includes an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs.

The CloudFormation template generates the following resources:

  • An Amazon Simple Storage Service (Amazon S3) bucket (gluedataqualitystudio-*).
  • The following prefixes and objects in the S3 bucket:
    • datalake/raw/customer/customer.csv
    • datalake/curated/customer/
    • scripts/
    • sparkHistoryLogs/
    • temporary/
  • AWS Identity and Access Management (IAM) users, roles, and policies. The IAM role (GlueDataQualityStudio-*) has permission to read and write from the S3 bucket.
  • AWS Lambda functions and IAM policies required by those functions to create and delete this stack.

To create your resources, complete the following steps:

  1. Sign in to the AWS CloudFormation console in the us-east-1 Region.
  2. Choose Launch Stack:

  3. Select I acknowledge that AWS CloudFormation might create IAM resources.
  4. Choose Create stack and wait for the stack creation step to complete.

Implement the solution

To start configuring your solution, complete the following steps:

  1. On the AWS Glue Studio console, choose Jobs in the navigation pane.
  2. Select Visual with a blank canvas and choose Create.
  3. Choose the Job Details tab to configure the job.
  4. For Name, enter GlueDataQualityStudio.
  5. For IAM Role, choose the role starting with GlueDataQualityStudio-*.
  6. For Glue version, choose Glue 3.0.
  7. For Job bookmark, choose Disable. This allows you to run this job multiple times with the same input dataset.
  8. For Number of retries, enter 0.
  9. In the Advanced properties section, provide the S3 bucket created by the CloudFormation template (starting with gluedataqualitystudio-*).
  10. Choose Save.
  11. After the job is saved, choose the Visual tab and on the Source menu, choose Amazon S3.
  12. On the Data source properties – S3 tab, for S3 source type, select S3 location.
  13. Choose Browse S3 and navigate to prefix /datalake/raw/customer/ in the S3 bucket starting with gluedataqualitystudio-* .
  14. Choose Infer schema.
  15. On the Action menu, choose Evaluate Data Quality.
  16. Choose the Evaluate Data Quality node.

    On the Transform tab, you can now start building data quality rules. The first rule you create is to check if Customer_ID is unique and not null using the isPrimaryKey rule.
  17. On the Rule types tab of the DQDL rule builder, search for isprimarykey and choose the plus sign.
  18. On the Schema tab of the DQDL rule builder, choose the plus sign next to Customer_ID.
  19. In the rule editor, delete id.

    The next rule we add checks that the First_Name column value is present for all the rows.
  20. You can also enter the data quality rules directly in the rule editor. Add a comma (,) and enter IsComplete "First_Name", after the first rule.

    Next, you add a custom rule to validate that no row exists without Telephone or Email.
  21. Enter the following custom rule in the rule editor:
    CustomSql "select count(*) from primary where Telephone is null and Email is null" = 0


    The Evaluate Data Quality feature provides actions to manage the outcome of a job based on the job quality results.

  22. For this post, select Fail job when data quality fails and choose Fail job without loading target data actions. In the Data quality output setting section, choose Browse S3 and navigate to prefix dqresults in the S3 bucket starting with gluedataqualitystudio-*.
  23. On the Target menu, choose Amazon S3.
  24. Choose the Data target – S3 bucket node.
  25. On the Data target properties – S3 tab, for Format, choose Parquet, and for Compression Type, choose Snappy.
  26. For S3 Target Location, choose Browse S3 and navigate to the prefix /datalake/curated/customer/ in the S3 bucket starting with gluedataqualitystudio-*.
  27. Choose Save, then choose Run.
    You can view the job run details on the Runs tab. In our example, the job fails with the error message “AssertionError: The job failed due to failing DQ rules for node: <node>.”
    You can review the data quality result on the Data quality tab. In our example, the custom data quality validation failed because one of the rows in the dataset had no Telephone or Email value.Evaluate Data Quality results is also written to the S3 bucket in JSON format based on the data quality result location parameter of the node.
  28. Navigate to dqresults prefix under the S3 bucket starting gluedataqualitystudio-*. You will see that the data quality result is partitioned by date.

The following is the output of the JSON file. You can use this file output to build custom data quality visualization dashboards.

You can also monitor the Evaluate Data Quality node through Amazon CloudWatch metrics and set alarms to send notifications about data quality results. To learn more on how to set up CloudWatch alarms, refer to Using Amazon CloudWatch alarms.

Clean up

To avoid incurring future charges and to clean up unused roles and policies, delete the resources you created:

  1. Delete the GlueDataQualityStudio job you created as part of this post.
  2. On the AWS CloudFormation console, delete the GlueDataQualityStudio stack.

Conclusion

AWS Glue Data Quality offers an easy way to measure and monitor the data quality of your ETL pipeline. In this post, you learned how to take necessary actions based on the data quality results, which helps you maintain high data standards and make confident business decisions.

To learn more about AWS Glue Data Quality, check out the documentation:


About the Authors

Deenbandhu Prasad is a Senior Analytics Specialist at AWS, specializing in big data services. He is passionate about helping customers build modern data architecture on the AWS Cloud. He has helped customers of all sizes implement data management, data warehouse, and data lake solutions.

Yannis Mentekidis is a Senior Software Development Engineer on the AWS Glue team.

Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog

Post Syndicated from Aniket Jiddigoudar original https://aws.amazon.com/blogs/big-data/getting-started-with-aws-glue-data-quality-from-the-aws-glue-data-catalog/

AWS Glue is a serverless data integration service that makes it simple to discover, prepare, and combine data for analytics, machine learning (ML), and application development. You can use AWS Glue to create, run, and monitor data integration and ETL (extract, transform, and load) pipelines and catalog your assets across multiple data stores.

Hundreds of thousands of customers use data lakes for analytics and machine learning to make data-driven business decisions. Data consumers lose trust in data if it is not accurate and recent, making data quality essential for undertaking optimal and correct decisions.

Evaluation of the accuracy and freshness of data is a common task for engineers. Currently, there are various tools available to evaluate data quality. However, these tools often require manual processes of data discovery and expertise in data engineering and coding.

We are pleased to announce the public preview launch of AWS Glue Data Quality. You can access this feature today without requesting any additional access in the available Regions. AWS Glue Data Quality is a new preview feature of AWS Glue that measures and monitors the data quality of Amazon S3-based data lakes and in AWS Glue ETL jobs. It does not require any expertise in data engineering or coding. It simplifies your experience of monitoring and evaluating the quality of your data.

This is Part 1 of a four-part series of posts to explain how AWS Glue Data Quality works. Check out the next posts in the series:

Getting started with AWS Glue Data Quality

In this post, we will go over the simplicity of using the AWS Glue Data Quality feature by:

  1. Starting data quality recommendations and runs on your data in AWS Glue Data Catalog.
  2. Creating an Amazon CloudWatch alarm for getting notifications when data quality results are below a certain threshold.
  3. Analyzing your AWS Glue Data Quality run results through Amazon Athena.

Set up resources with AWS CloudFormation

The provided CloudFormation script creates the following resources for you:

  1. The IAM role required to run AWS Glue Data Quality runs
  2. An Amazon Simple Storage Service (Amazon S3) bucket to store the NYC Taxi dataset
  3. An S3 bucket to store and analyze the results of AWS Glue Data Quality runs
  4. An AWS Glue database and table created from the NYC Taxi dataset

Steps:

  1. Open the AWS CloudFormation console.
  2. Choose Create stack and then select With new resources (standard).
  3. For Template source, choose Upload a template File, and provide the above attached template file. Then choose Next.
  4. For Stack name, DataQualityDatabase, and DataQualityTable, leave as default. For DataQualityS3BucketName, enter the name of your S3 bucket. Then choose Next.
  5. On the final screen, make sure to acknowledge that this stack would create IAM resources for you, and choose Submit.
  6. Once the stack is successfully created, navigate to the S3 bucket created by the stack and upload the yellow_tripdata_2022-01.parquet file.

Start an AWS Glue Data Quality run on your data in AWS Glue Data Catalog

In this first section, we will generate data quality rule recommendations from the AWS Glue Data Quality service. Using these recommendations, we will then run a data quality task against our dataset to obtain an analysis of our data.

To get started, complete the following steps:

  1. Open AWS Glue console.
  2. Choose Tables under Data Catalog.
  3. Select the DataQualityTable table created via the CloudFormation stack.
  4. Select the Data quality tab.
  5. Choose Recommend ruleset.
  6. On the Recommend data quality rules page, check Save recommended rules as a ruleset. This will allow us to save the recommended rules in a ruleset automatically, for use in the next steps.
  7. For IAM Role, choose the IAM role that was created from the CloudFormation stack.
  8. For Additional configurations -optional, leave the default number of workers and timeout.
  9. Choose Recommend ruleset. This will start a data quality recommendation run, with the given number of workers.
  10. Wait for the ruleset to be completed.
  11. Once completed, navigate back to the Rulesets tab. You should see a successful recommendation run and a ruleset created.

Understand AWS Glue Data Quality recommendations

AWS Glue Data Quality recommendations are suggestions generated by the AWS Glue Data Quality service and are based on the shape of your data. These recommendations automatically take into account aspects like RowCounts, Mean, Standard Deviation etc. of your data, and generate a set of rules, for you to use as a starting point.

The dataset used here was the NYC Taxi dataset. Based on this, the columns in this dataset, and the values of those columns, AWS Glue Data Quality recommends a set of rules. In total, the recommendation service automatically took into consideration all the columns of the dataset, and recommended 55 rules.

Some of these rules are:

  • “RowCount between <> and <> ” → Expect a count of number of rows based on the data it saw
  • “ColumnValues “VendorID” in [ ] → Expect the ”VendorID“ column to be within a specific set of values
  • IsComplete “VendorID” → Expect the “VendorID” to be a non-null value

How do I use the recommended AWS Glue Data Quality rules?

  1. From the Rulesets section, you should see your generated ruleset. Select the generated ruleset, and choose Evaluate ruleset.
    • If you didn’t check the box to Save recommended rules as a ruleset when you ran the recommendation, you can still click on the recommendation task run and copy the rules to create a new ruleset
  2. For Data quality actions under Data quality properties, select Publish metrics to Amazon CloudWatch. If this box isn’t checked, the data quality run will not publish metrics to Amazon CloudWatch.
  3. For IAM role, select the GlueDataQualityBlogRole created in the AWS CloudFormation stack.
  4. For Requested number of workers under Advanced properties, leave as default. 
  5. For Data quality results location, select the value of the GlueDataQualityResultsS3Bucket location that was created via the AWS CloudFormation stack
  6. Choose Evaluate ruleset.
  7. Once the run begins, you can see the status of the run on the Data quality results tab.
  8. After the run reaches a successful stage, select the completed data quality task run, and view the data quality results shown in Run results.

Our recommendation service suggested that we enforce 55 rules, based on the column values and the data within our NYC Taxi dataset. We then converted the collection of 55 rules into a RuleSet. Then, we ran a Data Quality Evaluation task run using our RuleSet against our dataset. In our results above, we see the status of each within the RuleSet.

You can also utilize the AWS Glue Data Quality APIs to carry out these steps.

Get Amazon SNS notifications for my failing data quality runs through Amazon CloudWatch alarms

Each AWS Glue Data Quality evaluation run from the Data Catalog, emits a pair of metrics named glue.data.quality.rules.passed (indicating a number of rules that passed) and glue.data.quality.rules.failed (indicating the number of failed rules) per data quality run. This emitted metric can be used to create alarms to alert users if a given data quality run falls below a threshold.

To get started with setting up an alarm that would send an email via an Amazon SNS notification, follow the steps below:

  1. Open Amazon CloudWatch console.
  2. Choose All metrics under Metrics. You will see an additional namespace under Custom namespaces titled Glue Data Quality.

Note: When starting an AWS Glue Data Quality run, make sure the Publish metrics to Amazon CloudWatch checkbox is enabled, as shown below. Otherwise, metrics for that particular run will not be published to Amazon CloudWatch.

  1. Under the Glue Data Quality namespace, you should be able to see metrics being emitted per table, per ruleset. For the purpose of our blog, we shall be using the glue.data.quality.rules.failed rule and alarm, if this value goes over 1 (indicating that, if we see a number of failed rule evaluations greater than 1, we would like to be notified).
  2. In order to create the alarm, choose All alarms under Alarms.
  3. Choose Create alarm.
  4. Choose Select metric.
  5. Select the glue.data.quality.rules.failed metric corresponding to the table you’ve created, then choose Select metric.
  6. Under the Specify metric and conditions tab, under the Metrics section:
    1. For Statistic, select Sum.
    2. For Period, select 1 minute.
  7. Under the Conditions section:
    1. For Threshold type, choose Static.
    2. For Whenever glue.data.quality.rules.failed is…, select Greater/Equal.
    3. For than…, enter 1 as the threshold value.
    4. Expand the Additional configurations dropdown and select Treat missing data as good

These selections imply that if the glue.data.quality.rules.failed metric emits a value greater than or equal to 1, we will trigger an alarm. However, if there is no data, we will treat it as acceptable.

  1. Choose Next.
  2. On Configure actions:
    1. For the Alarm state trigger section, select In alarm .
    2. For Send a notification to the following SNS topic, choose Create a new topic to send a notification via a new SNS topic.
    3. For Email endpoints that will receive the notification…, enter your email address. Choose Next.
  3. For Alarm name, enter myFirstDQAlarm, then choose Next.
  4. Finally, you should see a summary of all the selections on the Preview and create screen. Choose Create alarm at the bottom.
  5. You should now be able to see the alarm being created from the Amazon CloudWatch alarms dashboard.

In order to demonstrate AWS Glue Data Quality alarms, we are going to go over a real-world scenario where we have corrupted data being ingested, and how we could use the AWS Glue Data Quality service to get notified of this, using the alarm we created in the previous steps. For this purpose, we will use the provided file malformed_yellow_taxi.parquet that contains data that has been tweaked on purpose.

  1. Navigate to the S3 location DataQualityS3BucketName mentioned in the CloudFormation template supplied at the beginning of the blog post.
  2. Upload the malformed_yellow_tripdata.parquet file to this location. This will help us simulate a flow where we have a file with poor data quality coming into our data lakes via our ETL processes.
  3. Navigate to the AWS Glue Data Catalog console, select the demo_nyc_taxi_data_input that was created via the provided AWS CloudFormation template and then navigate to the Data quality tab.
  4. Select the RuleSet we had created in the first section. Then select Evaluate ruleset.
  5. From the Evaluate data quality screen:
    1. Check the box to Publish metrics to Amazon CloudWatch. This checkbox is needed to ensure that the failure metrics are emitted to Amazon CloudWatch.
    2. Select the IAM role created via the AWS CloudFormation template.
    3. Optionally, select an S3 location to publish your AWS Glue Data Quality results.
    4. Select Evaluate ruleset.
  6.  Navigate to the Data Quality results tab. You should now see two runs, one from the previous steps of this blog and one that we currently triggered. Wait for the current run to complete.
  7. As you see, we have a failed AWS Glue Data Quality run result, with only 52 of our original 55 rules passing. These failures are attributed to the new file we uploaded to S3.
  8. Navigate to the Amazon CloudWatch console and select the alarm we created at the beginning of this section.
  9. As you can see, we configured the alarm to fire every time the glue.data.quality.rules.failed metric crosses a threshold of 1. After the above AWS Glue Data Quality run, we see 3 rules failing, which triggered the alarm. Further, you also should have gotten an email detailing the alarm’s firing.

We have thus demonstrated an example where incoming malformed data, coming into our data lakes can be identified via the AWS Glue Data Quality rules, and subsequent alerting mechanisms can be created to notify appropriate personas.

Analyze your AWS Glue Data Quality run results through Amazon Athena

In scenarios where you have multiple AWS Glue Data Quality run results against a dataset, over a period of time, you might want to track the trends of the dataset’s quality over a period of time. To achieve this, we can export our AWS Glue Data Quality run results to S3, and use Amazon Athena to run analytical queries against the exported run. The results can then be further used in Amazon QuickSight to build dashboards to have a graphical representation of your data quality trends

In the third part of this post, we will see the steps needed to start tracking data on your dataset’s quality:

  1. For our data quality runs that we set up in the previous sections, we set the Data quality results location parameter to the bucket location specified by the AWS CloudFormation stack.
  2. After each successful run, you should see a single JSONL file being exported to your selected S3 location, corresponding to that particular run.
  3. Open the Amazon Athena console.
  4. In the query editor, run the following CREATE TABLE statement (replace the <my_table_name> with a relevant value, and <GlueDataQualityResultsS3Bucket_from_cfn> section with the GlueDataQualityResultsS3Bucket value from the provided AWS CloudFormation template):
    CREATE EXTERNAL TABLE `<my_table_name>`(
    `catalogid` string,
    `databasename` string,
    `tablename` string,
    `dqrunid` string,
    `evaluationstartedon` timestamp,
    `evaluationcompletedon` timestamp,
    `rule` string,
    `outcome` string,
    `failurereason` string,
    `evaluatedmetrics` string)
    PARTITIONED BY (
    `year` string,
    `month` string,
    `day` string)
    ROW FORMAT SERDE
    'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES (
    'paths'='catalogId,databaseName,dqRunId,evaluatedMetrics,evaluationCompletedOn,evaluationStartedOn,failureReason,outcome,rule,tableName')
    STORED AS INPUTFORMAT
    'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
    's3://<GlueDataQualityResultsS3Bucket_from_cfn>/'
    TBLPROPERTIES (
    'classification'='json',
    'compressionType'='none',
    'typeOfData'='file')
    
    MSCK REPAIR TABLE `<my_table_name>`

  5. Once the above table is created, you should be able to run queries to analyze your data quality results.

For example, consider the following query that shows me the failed AWS Glue Data Quality runs against my table demo_nyc_taxi_data_input within a time window:

SELECT * from "<my_table_name>"
WHERE "outcome" = 'Failed'
AND "tablename" = '<my_source_table>'
AND "evaluationcompletedon" between
parse_datetime('2022-12-05 17:00:00:000', 'yyyy-MM-dd HH:mm:ss:SSS') AND parse_datetime('2022-12-05 20:00:00:000', 'yyyy-MM-dd HH:mm:ss:SSS');

The output of the above query shows me details about all the runs with “outcome” = ‘Failed’ that ran against my NYC Taxi dataset table ( “tablename” = ‘demo_nyc_taxi_data_input’ ). The output also gives me information about the failure reason ( failurereason ) and the values it was evaluated against ( evaluatedmetrics ).

As you can see, we are able to get detailed information about our AWS Glue Data Quality runs, via the run results uploaded to S3, perform more detailed analysis and build dashboards on top of the data.

Clean up

  • Navigate to the Amazon Athena console and delete the table created for data quality analysis.
  • Navigate to the Amazon CloudWatch console and delete the alarms created.
  • If you deployed the sample CloudFormation stack, delete the CloudFormation stack via the AWS CloudFormation console. You will need to empty the S3 bucket before you delete the bucket.
  • If you have enabled your AWS Glue Data Quality runs to output to S3, empty those buckets as well.

Conclusion

In this post, we talked about the ease and speed of incorporating data quality rules using the AWS Glue Data Quality feature, into your AWS Glue Data Catalog tables. We also talked about how to run recommendations and evaluate data quality against your tables. We then discussed analyzing the data quality results via Amazon Athena, and the process for setting up alarms via Amazon CloudWatch in order to notify users of failed data quality.

To dive into the AWS Glue Data Quality APIs, take a look at the AWS Glue Data Quality API documentation
To learn more about AWS Glue Data Quality, check out the AWS Glue Data Quality Developer Guide


About the authors

Aniket Jiddigoudar is a Big Data Architect on the AWS Glue team.

Joseph Barlan is a Frontend Engineer at AWS Glue. He has over 5 years of experience helping teams build reusable UI components and is passionate about frontend design systems. In his spare time, he enjoys pencil drawing and binge watching tv shows.

AWS Marketplace Seller Insights team uses Amazon QuickSight Embedded to empower sellers with actionable business insights

Post Syndicated from Snigdha Sahi original https://aws.amazon.com/blogs/big-data/aws-marketplace-seller-insights-team-uses-amazon-quicksight-embedded-to-empower-sellers-with-actionable-business-insights/

AWS Marketplace enables independent software vendors (ISVs), data providers, and consulting partners to sell software, services, and data to millions of AWS customers. Working in partnership with the AWS Partner Network (APN), AWS Marketplace helps ISVs and partners build, market and sell their AWS offerings by providing crucial business, technical, and marketing support.

The AWS Marketplace Seller Insights team helps AWS sellers scale their businesses by providing reports, data feeds, and insights. We share this data directly with sellers through the AWS Marketplace Management Portal (AMMP). In 2013, we launched the first version of reporting insights, delivered monthly via static CSV files. To improve the customer experience, the next iteration of reporting upgrades was launched in 2020 and provided more granular data, refreshed daily and delivered via feeds.

We’ve spent the past few months working closely with the QuickSight team and are excited to now offer AWS Marketplace sellers public preview access to two new Amazon QuickSight dashboards. QuickSight improves the reporting experience for AWS Marketplace sellers by making it easy for users to monitor key metrics, perform ad hoc analysis, and quickly get business insights from data at any time and on any device.

In this post, we discuss our decision to implement QuickSight Embedded, as well as some of the benefits to AWS Marketplace sellers.

Data agility is a key differentiator

Making informed decisions is a business-critical function for successful organizations. Being able to quickly view performance trends and adjust strategies accordingly can make all the difference between hitting or missing business goals. Investing time, effort, and resources in technical data integrations, or creating pivot tables and charts from downloaded raw data, means a slower response rate to analyze shifts in performance trends. The more agile we can be in providing fast, efficient access to key performance indicators, the better positioned AWS Marketplace sellers can be to take action based on what their data tells them.

After reviewing customer feedback on their reporting experience, several trends emerged in what would be most helpful to them. First, sellers wanted a visual representation of their data. Second, though some wanted data feeds available to integrate AWS Marketplace data into their own business intelligence tools, others wanted to be able to access and review data without needing to invest technical business intelligence bandwidth in integrating feeds to create more user-friendly reports. Finally, they wanted the ability to easily filter the data, as well as the option to download it. In researching options to provide the reporting experience AWS Marketplace sellers wanted, we found that QuickSight was a perfect fit.

Doing more with data

Billed Revenue and the Collections & Disbursements are two new QuickSight dashboards embedded directly into the AWS Marketplace Management Portal (AMMP), accessed via the Insights tab. These dashboards—pre-built with 10+ controls or filters, 15+ visualizations, and powered by daily refreshed data—provide a visual reporting experience for revenue recognition and disbursements tracking.

The following screenshot shows what the dashboards look like in the Finance operations section on the Insights tab within the AMMP.

The dashboards are divided into several sections:

  • Controls provides filters to refine your dashboard data.
  • Date Range Filter provides the ability to filter dashboard data based on the dates.
  • Metrics, Trends, and Breakdowns all provide detailed analytics to understand business performance.
  • Granular data provides the option to download the raw data from the dashboard by clicking on the table and choosing the Export to CSV option.

For quick help, sellers can select the Info button to view the side navigation panel with tips on how to use the dashboard. They can also reach out to our team by selecting Contact us from the Info panel.

Improving the customer experience

The Billed Revenue dashboard now reflects changes within 24 hours of a customer being billed or refunded—a major improvement over the 45 days it once took to access this data from the legacy AMMP Billed Revenue report. Similarly, the Collections & Disbursements dashboard provides disbursement information within 24 hours of AWS sending funds to sellers, whereas it used to take up to 4 days from the legacy AMMP Disbursement Report.

Our team’s decision to go with QuickSight was the direct result of a clear alignment between what our sellers told us they wanted and what QuickSight offered. With QuickSight Embedded dashboards, AWS Marketplace sellers now have a visual representation of their data that doesn’t require time or resources dedicated to technical integration implementations, and they can quickly and easily manipulate the data via filters, or they can download it to a CSV if that’s their preference. Embedded dashboards simplify the viewing, analyzing, and tracking of key business metrics and trends related to financial operations. Being able to easily show each seller only their relevant data (using row-level security) provides us with the flexibility we need, with the peace of mind of knowing everything is secure. AWS Marketplace data is hosted in an Amazon Redshift cluster; QuickSight’s seamless integration into Amazon Redshift made it a fantastic choice.

Data-driven decisions with QuickSight

Embedding these new QuickSight dashboards into the AMMP has enabled us to provide at-a-glance metrics to AWS Marketplace sellers in a faster, more efficient, and far more user-friendly way than ever before. QuickSight has made a significant impact on how quickly sellers can access their data, which helps them respond faster to shifting trends.

To learn more about how you can embed customized data visuals, interactive dashboards, and natural language querying into any application, visit Amazon QuickSight Embedded.


About the Authors

Snigdha Sahi is a Senior Product Manager (Technical) with Amazon Web Services (AWS) Marketplace. She has diverse work experience across product management, product development and management consulting. A technology enthusiast, she loves brainstorming and building products that are intuitive and easy to use. In her spare time, she enjoys hiking, solving Sudoku and listening to music.

Vincent Larchet is a Principal Software Development Engineer on the AWS Marketplace team based in Vancouver, BC. Outside of work, he is a passionate wood worker and DIYer.