Tag Archives: Amazon DynamoDB

Reduce costs and enable integrated SMS tracking with Braze URL shortening

Post Syndicated from Umesh Kalaspurkar original https://aws.amazon.com/blogs/architecture/reduce-costs-and-enable-integrated-sms-tracking-with-braze-url-shortening/

As competition grows fiercer, marketers need ways to ensure they reach each user with personalized content on their most critical channels. Short message/messaging service (SMS) is a key part of that effort, touching more than 5 billion people worldwide, with an impressive 82% open rate. However, SMS lacks the built-in engagement metrics supported by other channels.

To bridge this gap, leading customer engagement platform, Braze, recently built an in-house SMS link shortening solution using Amazon DynamoDB and Amazon DynamoDB Accelerator (DAX). It’s designed to handle up to 27 billion redirects per month, allowing marketers to automatically shorten SMS-related URLs. Alongside the Braze Intelligence Suite, you can use SMS click data in reporting functions and retargeting actions. Read on to learn how Braze created this feature and the impact it’s having on marketers and consumers alike.

SMS link shortening approach

Many Braze customers have used third-party SMS link shortening solutions in the past. However, this approach complicates the SMS composition process and isolates click metrics from Braze analytics. This makes it difficult to get a full picture of SMS performance.

Multiple approaches for shortening URLs, SMS, 3rd party, and Braze. Includes SMS links

Figure 1. Multiple approaches for shortening URLs

The following table compares all 3 approaches for their pros and cons.

Scenario #1 – Unshortened URL in SMS #2 – 3rd Party Shortener #3 – Braze Link Shortening & Click Tracking
Low Character Count X
Total Clicks X
Ability to Retarget Users X X
Ability to Trigger Subsequent Messages X X

With link shortening built in-house and more tightly integrated into the Braze platform, Braze can maintain more control over their roadmap priority. By developing the tool internally, Braze achieved a 90% reduction in ongoing expenses compared with the $400,000 annual expense associated with using an outside solution.

Braze SMS link shortening: Flow and architecture

SMS link shortening architecture diagram

Figure 2. SMS link shortening architecture

The following steps explain the link shortening architecture:

  1. First, customers initiate campaigns via the Braze Dashboard. Using this interface, they can also make requests to shorten URLs.
  2. The URL registration process is managed by a Kubernetes-deployed Go-based service. This service not only shortens the provided URL but also maintains reference data in Amazon DynamoDB.
  3. After processing, the dashboard receives the generated campaign details alongside the shortened URL.
  4. The fully refined campaign can be efficiently distributed to intended recipients through SMS channels.
  5. Upon a user’s interaction with the shortened URL, the message gets directed to the URL redirect service. This redirection occurs through an Application Load Balancer.
  6. The redirect service processes links in messages, calls the service, and replaces links before sending to carriers.
  7. Asynchronous calls feed data to a Kafka queue for metrics, using the HTTP sink connector integrated with Braze systems.

The registration and redirect services are decoupled from the Braze platform to enable independent deployment and scaling due to different requirements. Both the services are running the same code, but with different endpoints exposed, depending on the functionality of a given Kubernetes pod. This restricts internal access to the registration endpoint and permits independent scaling of the services, while still maintaining a fast response time.

Braze SMS link shortening: Scale

Right now, our customers use the Braze platform to send about 200 million SMS messages each month, with peak speeds of around 2,000 messages per second. Many of these messages contain one or more URLs that need to be shortened. In order to support the scalability of the link shortening feature and give us room to grow, we designed the service to handle 33 million URLs sent per month, and 3.25 million redirects per month. We assumed that we’d see up to 65 million database writes per month and 3.25 million reads per month in connection with the redirect service. This would require storage of 65 GB per month, with peaks of ~2,000 writes and 100 reads per second.

With these needs in mind, we carried out testing and determined that Amazon DynamoDB made the most sense as the backend database for the redirect service. To determine this, we tested read and write performance and found that it exceeded our needs. Additionally, it was fully managed, thus requiring less maintenance expertise, and included DAX out of the box. Most clicks happen close to send, so leveraging DAX helps us smooth out the read and write load associated with the SMS link shortener.

Because we know how long we must keep the relevant written elements at write time, we’re able to use DynamoDB Time to Live (TTL) to effectively manage their lifecycle. Finally, we’re careful to evenly distribute partition keys to avoid hot partitions, and DynamoDB’s autoscaling capabilities make it possible for us to respond more efficiently to spikes in demand.

Braze SMS link shortening: Flow

Braze SMS link shortening flow, including Registration and Redirect service

Figure 3. Braze SMS link shortening flow

  1. When the marketer initiates an SMS send, Braze checks its primary datastore (a MongoDB collection) to see if the link has already been shortened (see Figure 3). If it has, Braze re-uses that shortened link and continues the send. If it hasn’t, the registration process is initiated to generate a new site identifier that encodes the generation date and saves campaign information in DynamoDB via DAX.
    1. The response from the registration service is used to generate a short link (1a) for the SMS.
  2. A recipient gets an SMS containing a short link (2).
  3. Recipient decides to tap it (3). Braze smoothly redirects them to the destination URL, and updates the campaign statistics to show that the link was tapped.
    1. Using Amazon Route 53’s latency-based routing, Braze directs the recipient to the nearest endpoint (Braze currently has North America and EU deployments), then inspects the link to ensure validity and that it hasn’t expired. If it passes those checks, the redirect service queries DynamoDB via DAX for information about the redirect (3a). Initial redirects are cached at send time, while later requests query the DAX cache.
    2. The user is redirected with a P99 redirect latency of less than 10 milliseconds (3b).
  4. Emit campaign-level metrics on redirects.

Braze generates URL identifiers, which serve as the partition key to the DynamoDB collection, by generating a random number. We concatenate the generation date timestamp to the number, then Base66 encode the value. This results in a generated URL that looks like https://brz.ai/5xRmz, with “5xRmz” being the encoded URL identifier. The use of randomized partition keys helps avoid hot, overloaded partitions. Embedding the generation date lets us see when a given link was generated without querying the database. This helps us maintain performance and reduce costs by removing old links from the database. Other cost control measures include autoscaling and the use of DAX to avoid repeat reads of the same data. We also query DynamoDB directly against a hash key, avoiding scatter-gather queries.

Braze link shortening feature results

Since its launch, SMS link shortening has been used by over 300 Braze customer companies in more than 700 million SMS messages. This includes 50% of the total SMS volume sent by Braze during last year’s Black Friday period. There has been a tangible reduction in the time it takes to build and send SMS. “The Motley Fool”, a financial media company, saved up to four hours of work per month while driving click rates of up to 15%. Another Braze client utilized multimedia messaging service (MMS) and link shortening to encourage users to shop during their “Smart Investment” campaign, rewarding users with additional store credit. Using the engagement data collected with Braze link shortening, they were able to offer engaged users unique messaging and follow-up offers. They retargeted users who did not interact with the message via other Braze messaging channels.

Conclusion

The Braze platform is designed to be both accessible to marketers and capable of supporting best-in-class cross-channel customer engagement. Our SMS link shortening feature, supported by AWS, enables marketers to provide an exceptional user experience and save time and money.

Further reading:

Implementing the transactional outbox pattern with Amazon EventBridge Pipes

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/implementing-the-transactional-outbox-pattern-with-amazon-eventbridge-pipes/

This post is written by Sayan Moitra, Associate Solutions Architect, and Sangram Sonawane, Senior Solutions Architect.

Microservice architecture is an architectural style that structures an application as a collection of loosely coupled and independently deployable services. Services must communicate with each other to exchange messages and perform business operations. Ensuring message reliability while maintaining loose coupling between services is crucial for building robust and scalable systems.

This blog demonstrates how to use Amazon DynamoDB, a fully managed serverless key-value NoSQL database, and Amazon EventBridge, a managed serverless event bus, to implement reliable messaging for microservices using the transactional outbox pattern.

Business operations can span across multiple systems or databases to maintain consistency and synchronization between them. One approach often used in distributed systems or architectures where data must be replicated across multiple locations or components is dual writes. In a dual write scenario, when a write operation is performed on one system or database, the same data or event also triggers another system in real-time or near real-time. This ensures that both systems always have the same data, minimizing data inconsistencies.

Dual writes can also introduce data integrity challenges in distributed systems. Failure to update the database or to send events to other downstream systems after an initial system update can lead to data loss and leave the application in an inconsistent state. One design approach to overcome this challenge is to combine dual writes with the transactional outbox pattern.

Challenges with dual writes

Consider an online food ordering application to illustrate the challenges with dual writes. Once the user submits the order, the order service updates the order status in a persistent data store. The order status update should also be sent to notify_restaurant and order_tracking services using a message bus for asynchronous communication. After successfully updating the order status in the database, the order service writes the event to the message bus. The order_service performs a dual write operation of updating the database and publishing the event details on the message bus for other services to read.

This approach works until there are issues encountered in publishing the event to the message bus. Publishing events can fail for multiple reasons like a network error or a message bus outage. When failure occurs, the notify_restaurant and order_tracking service will not be notified of the order update event, leaving the system in an inconsistent state. Implementing the transactional outbox pattern with dual writes can help ensure reliable messaging between systems after a database update.

This illustration shows a sequence diagram for an online food ordering application and the challenges with dual writes:

Sequence diagram

Overview of the transactional outbox pattern

In the transactional outbox pattern, a second persistent data store is introduced to store the outgoing messages. In the online food order example, updating the database with order details and storing the event information in the outbox table becomes a single atomic transaction.

The transaction is only successful when writing to both the database and the outbox table. Any failures to write to the outbox table rolls back the transaction. A separate process then reads the event from the outbox table and publishes the event on the message bus. Once the message is available on the message bus, it can be read by the notify_restaurant and order_tracking services. Combining transactional outbox pattern with dual writes allows for data consistency across systems and reliable message delivery with the transactional context.

The following illustration shows a sequence diagram for an online food ordering application with transactional outbox pattern for reliable message delivery.

Sequence diagram 2

Implementing the transaction outbox pattern

DynamoDB includes a feature called DynamoDB Streams to capture a time-ordered sequence of item-level modifications in the DynamoDB table and stores this information in a log for up to 24 hours. Applications can access this log and view the data items as they appeared before and after they were modified, in near real time.

Whenever an application creates, updates, or deletes items in the table, DynamoDB Streams writes a stream record with the primary key attributes of the items that were modified. A stream record contains information about a data modification to a single item in a DynamoDB table. DynamoDB Streams writes stream records in near real time and these can be consumed for processing based on the contents. Enabling this feature removes the need to maintain a separate outbox table and lowers the management and operational overhead.

EventBridge Pipes connects event producers to consumers with options to transform, filter, and enrich messages. EventBridge Pipes can integrate with DynamoDB Streams to capture table events without writing any code. There is no need to write and maintain a separate process to read from the stream. EventBridge Pipes also supports retries, and any failed events can be routed to a dead-letter queue (DLQ) for further analysis and reprocessing.

EventBridge polls shards in DynamoDB stream for records and invokes pipes as soon as records are available. You can configure this to read records from DynamoDB only when it has gathered a specified batch size or the batch window expires. Pipes maintains the order of records from the data stream when sending that data to the destination. You can optionally filter or enhance these records before sending them to a target for processing.

Example overview

The following diagram illustrates the implementation of transactional outbox pattern with DynamoDB Streams and EventBridge Pipe. Amazon API Gateway is used to trigger a DynamoDB operation via a POST request. The change in the DynamoDB triggers an EventBridge event bus via Amazon EventBridge Pipes. This event bus invokes the Lambda functions through an SQS Queue, depending on the filters applied.

Architecture overview

  1. In this sample implementation, Amazon API Gateway makes a POST call to the DynamoDB table for database updates. Amazon API Gateway supports CRUD operations for Amazon DynamoDB without the need of a compute layer for database calls.
  2. DynamoDB Streams is enabled on the table, which captures a time-ordered sequence of item-level modifications in the DynamoDB table in near real time.
  3. EventBridge Pipes integrates with DynamoDB Streams to capture the events and can optionally filter and enrich the data before it is sent to a supported target. In this example, events are sent to Amazon EventBridge, which acts as a message bus. This can be replaced with any of the supported targets as detailed in Amazon EventBridge Pipes targets. DLQ can be configured to handle any failed events, which can be analyzed and retried.
  4. Consumers listening to the event bus receive messages. You can optionally fan out and deliver the events to multiple consumers and apply filters. You can configure a DLQ to handle any failures and retries.

Prerequisites

  1. AWS SAM CLI, version 1.85.0 or higher
  2. Python 3.10

Deploying the example application

  1. Clone the repository:
    git clone https://github.com/aws-samples/amazon-eventbridge-pipes-dynamodb-stream-transactional-outbox.git
  2. Change to the root directory of the project and run the following AWS SAM CLI commands:
    cd amazon-eventbridge-pipes-dynamodb-stream-transactional-outbox               
    sam build
    sam deploy --guided
    
  3. Enter the name for your stack during guided deployment. During the deploy process, select the default option for all the additional steps.
    SAM deployment
  4. The resources are deployed.
    Testing the application

Testing the application

Once the deployment is complete, it provides the API Gateway URL in the output. You can test using that URL. To test the application, use Postman to make a POST call to API Gateway prod URL:

Postman

You can also test using the curl command:

curl -s --header "Content-Type: application/json" \
  --request POST \
  --data '{"Status":"Created"}' \
  <API_ENDPOINT>

This produces the following output:

Expected output

To verify if the order details are updated in the DynamoDB table, run this command for performing a scan operation on the table.

aws dynamodb scan \
    --table-name <DynamoDB Table Name>

Handling failures

DynamoDB Streams captures a time-ordered sequence of item-level modifications in the DynamoDB table and stores this information in a log for up to 24 hours. If EventBridge is unavailable to read from DynamoDB Stream due to misconfiguration, for example, the records are available in the log for 24 hours. Once EventBridge is reintegrated, it retrieves all undelivered records from the last 24 hours. For integration issues between EventBridge Pipes and the target application, all failed messages can be sent to the DLQ for reprocessing at a later time.

Cleaning up

To clean up your AWS based resources, run following AWS SAM CLI command, answering “y” to all questions:

sam delete --stack-name <stack_name>

Conclusion

Reliable interservice communication is an important consideration in microservice design, especially when faced with dual writes. Combining the transactional outbox pattern with dual writes provides a robust way of improving message reliability.

This blog demonstrates an architecture pattern to tackle the challenge of dual writes by combining it with the transactional outbox pattern using DynamoDB and EventBridge Pipes. This solution provides a no-code approach with AWS Managed Services, reducing management and operational overhead.

For more serverless learning resources, visit Serverless Land.

Implementing automatic drift detection in CDK Pipelines using Amazon EventBridge

Post Syndicated from DAMODAR SHENVI WAGLE original https://aws.amazon.com/blogs/devops/implementing-automatic-drift-detection-in-cdk-pipelines-using-amazon-eventbridge/

The AWS Cloud Development Kit (AWS CDK) is a popular open source toolkit that allows developers to create their cloud infrastructure using high level programming languages. AWS CDK comes bundled with a construct called CDK Pipelines that makes it easy to set up continuous integration, delivery, and deployment with AWS CodePipeline. The CDK Pipelines construct does all the heavy lifting, such as setting up appropriate AWS IAM roles for deployment across regions and accounts, Amazon Simple Storage Service (Amazon S3) buckets to store build artifacts, and an AWS CodeBuild project to build, test, and deploy the app. The pipeline deploys a given CDK application as one or more AWS CloudFormation stacks.

With CloudFormation stacks, there is the possibility that someone can manually change the configuration of stack resources outside the purview of CloudFormation and the pipeline that deploys the stack. This causes the deployed resources to be inconsistent with the intent in the application, which is referred to as “drift”, a situation that can make the application’s behavior unpredictable. For example, when troubleshooting an application, if the application has drifted in production, it is difficult to reproduce the same behavior in a development environment. In other cases, it may introduce security vulnerabilities in the application. For example, an AWS EC2 SecurityGroup that was originally deployed to allow ingress traffic from a specific IP address might potentially be opened up to allow traffic from all IP addresses.

CloudFormation offers a drift detection feature for stacks and stack resources to detect configuration changes that are made outside of CloudFormation. The stack/resource is considered as drifted if its configuration does not match the expected configuration defined in the CloudFormation template and by extension the CDK code that synthesized it.

In this blog post you will see how CloudFormation drift detection can be integrated as a pre-deployment validation step in CDK Pipelines using an event driven approach.

Services and frameworks used in the post include CloudFormation, CodeBuild, Amazon EventBridge, AWS Lambda, Amazon DynamoDB, S3, and AWS CDK.

Solution overview

Amazon EventBridge is a serverless AWS service that offers an agile mechanism for the developers to spin up loosely coupled, event driven applications at scale. EventBridge supports routing of events between services via an event bus. EventBridge out of the box supports a default event bus for each account which receives events from AWS services. Last year, CloudFormation added a new feature that enables event notifications for changes made to CloudFormation-based stacks and resources. These notifications are accessible through Amazon EventBridge, allowing users to monitor and react to changes in their CloudFormation infrastructure using event-driven workflows. Our solution leverages the drift detection events that are now supported by EventBridge. The following architecture diagram depicts the flow of events involved in successfully performing drift detection in CDK Pipelines.

Architecture diagram

Architecture diagram

The user starts the pipeline by checking code into an AWS CodeCommit repo, which acts as the pipeline source. We have configured drift detection in the pipeline as a custom step backed by a lambda function. When the drift detection step invokes the provider lambda function, it first starts the drift detection on the CloudFormation stack Demo Stack and then saves the drift_detection_id along with pipeline_job_id in a DynamoDB table. In the meantime, the pipeline waits for a response on the status of drift detection.

The EventBridge rules are set up to capture the drift detection state change events for Demo Stack that are received by the default event bus. The callback lambda is registered as the intended target for the rules. When drift detection completes, it triggers the EventBridge rule which in turn invokes the callback lambda function with stack status as either DRIFTED or IN SYNC. The callback lambda function pulls the pipeline_job_id from DynamoDB and sends the appropriate status back to the pipeline, thus propelling the pipeline out of the wait state. If the stack is in the IN SYNC status, the callback lambda sends a success status and the pipeline continues with the deployment. If the stack is in the DRIFTED status, callback lambda sends failure status back to the pipeline and the pipeline run ends up in failure.

Solution Deep Dive

The solution deploys two stacks as shown in the above architecture diagram

  1. CDK Pipelines stack
  2. Pre-requisite stack

The CDK Pipelines stack defines a pipeline with a CodeCommit source and drift detection step integrated into it. The pre-requisite stack deploys following resources that are required by the CDK Pipelines stack.

  • A Lambda function that implements drift detection step
  • A DynamoDB table that holds drift_detection_id and pipeline_job_id
  • An Event bridge rule to capture “CloudFormation Drift Detection Status Change” event
  • A callback lambda function that evaluates status of drift detection and sends status back to the pipeline by looking up the data captured in DynamoDB.

The pre-requisites stack is deployed first, followed by the CDK Pipelines stack.

Defining drift detection step

CDK Pipelines offers a mechanism to define your own step that requires custom implementation. A step corresponds to a custom action in CodePipeline such as invoke lambda function. It can exist as a pre or post deployment action in a given stage of the pipeline. For example, your organization’s policies may require its CI/CD pipelines to run a security vulnerability scan as a prerequisite before deployment. You can build this as a custom step in your CDK Pipelines. In this post, you will use the same mechanism for adding the drift detection step in the pipeline.

You start by defining a class called DriftDetectionStep that extends Step and implements ICodePipelineActionFactory as shown in the following code snippet. The constructor accepts 3 parameters stackName, account, region as inputs. When the pipeline runs the step, it invokes the drift detection lambda function with these parameters wrapped inside userParameters variable. The function produceAction() adds the action to invoke drift detection lambda function to the pipeline stage.

Please note that the solution uses an SSM parameter to inject the lambda function ARN into the pipeline stack. So, we deploy the provider lambda function as part of pre-requisites stack before the pipeline stack and publish its ARN to the SSM parameter. The CDK code to deploy pre-requisites stack can be found here.

export class DriftDetectionStep
    extends Step
    implements pipelines.ICodePipelineActionFactory
{
    constructor(
        private readonly stackName: string,
        private readonly account: string,
        private readonly region: string
    ) {
        super(`DriftDetectionStep-${stackName}`);
    }

    public produceAction(
        stage: codepipeline.IStage,
        options: ProduceActionOptions
    ): CodePipelineActionFactoryResult {
        // Define the configuraton for the action that is added to the pipeline.
        stage.addAction(
            new cpactions.LambdaInvokeAction({
                actionName: options.actionName,
                runOrder: options.runOrder,
                lambda: lambda.Function.fromFunctionArn(
                    options.scope,
                    `InitiateDriftDetectLambda-${this.stackName}`,
                    ssm.StringParameter.valueForStringParameter(
                        options.scope,
                        SSM_PARAM_DRIFT_DETECT_LAMBDA_ARN
                    )
                ),
                // These are the parameters passed to the drift detection step implementaton provider lambda
                userParameters: {
                    stackName: this.stackName,
                    account: this.account,
                    region: this.region,
                },
            })
        );
        return {
            runOrdersConsumed: 1,
        };
    }
}

Configuring drift detection step in CDK Pipelines

Here you will see how to integrate the previously defined drift detection step into CDK Pipelines. The pipeline has a stage called DemoStage as shown in the following code snippet. During the construction of DemoStage, we declare drift detection as the pre-deployment step. This makes sure that the pipeline always does the drift detection check prior to deployment.

Please note that for every stack defined in the stage; we add a dedicated step to perform drift detection by instantiating the class DriftDetectionStep detailed in the prior section. Thus, this solution scales with the number of stacks defined per stage.

export class PipelineStack extends BaseStack {
    constructor(scope: Construct, id: string, props?: StackProps) {
        super(scope, id, props);

        const repo = new codecommit.Repository(this, 'DemoRepo', {
            repositoryName: `${this.node.tryGetContext('appName')}-repo`,
        });

        const pipeline = new CodePipeline(this, 'DemoPipeline', {
            synth: new ShellStep('synth', {
                input: CodePipelineSource.codeCommit(repo, 'main'),
                commands: ['./script-synth.sh'],
            }),
            crossAccountKeys: true,
            enableKeyRotation: true,
        });
        const demoStage = new DemoStage(this, 'DemoStage', {
            env: {
                account: this.account,
                region: this.region,
            },
        });
        const driftDetectionSteps: Step[] = [];
        for (const stackName of demoStage.stackNameList) {
            const step = new DriftDetectionStep(stackName, this.account, this.region);
            driftDetectionSteps.push(step);
        }
        pipeline.addStage(demoStage, {
            pre: driftDetectionSteps,
        });

Demo

Here you will go through the deployment steps for the solution and see drift detection in action.

Deploy the pre-requisites stack

Clone the repo from the GitHub location here. Navigate to the cloned folder and run script script-deploy.sh You can find detailed instructions in README.md

Deploy the CDK Pipelines stack

Clone the repo from the GitHub location here. Navigate to the cloned folder and run script script-deploy.sh. This deploys a pipeline with an empty CodeCommit repo as the source. The pipeline run ends up in failure, as shown below, because of the empty CodeCommit repo.

First run of the pipeline

Next, check in the code from the cloned repo into the CodeCommit source repo. You can find detailed instructions on that in README.md  This triggers the pipeline and pipeline finishes successfully, as shown below.

Pipeline run after first check in

The pipeline deploys two stacks DemoStackA and DemoStackB. Each of these stacks creates an S3 bucket.

CloudFormation stacks deployed after first run of the pipeline

Demonstrate drift detection

Locate the S3 bucket created by DemoStackA under resources, navigate to the S3 bucket and modify the tag aws-cdk:auto-delete-objects from true to false as shown below

DemoStackA resources

DemoStackA modify S3 tag

Now, go to the pipeline and trigger a new execution by clicking on Release Change

Run pipeline via Release Change tab

The pipeline run will now end in failure at the pre-deployment drift detection step.

Pipeline run after Drift Detection failure

Cleanup

Please follow the steps below to clean up all the stacks.

  1. Navigate to S3 console and empty the buckets created by stacks DemoStackA and DemoStackB.
  2. Navigate to the CloudFormation console and delete stacks DemoStackA and DemoStackB, since deleting CDK Pipelines stack does not delete the application stacks that the pipeline deploys.
  3. Delete the CDK Pipelines stack cdk-drift-detect-demo-pipeline
  4. Delete the pre-requisites stack cdk-drift-detect-demo-drift-detection-prereq

Conclusion

In this post, I showed how to add a custom implementation step in CDK Pipelines. I also used that mechanism to integrate a drift detection check as a pre-deployment step. This allows us to validate the integrity of a CloudFormation Stack before its deployment. Since the validation is integrated into the pipeline, it is easier to manage the solution in one place as part of the overarching pipeline. Give the solution a try, and then see if you can incorporate it into your organization’s delivery pipelines.

About the author:

Damodar Shenvi Wagle

Damodar Shenvi Wagle is a Senior Cloud Application Architect at AWS Professional Services. His areas of expertise include architecting serverless solutions, CI/CD, and automation.

How SeatGeek uses AWS Serverless to control authorization, authentication, and rate-limiting in a multi-tenant SaaS application

Post Syndicated from Umesh Kalaspurkar original https://aws.amazon.com/blogs/architecture/how-seatgeek-uses-aws-to-control-authorization-authentication-and-rate-limiting-in-a-multi-tenant-saas-application/

SeatGeek is a ticketing platform for web and mobile users, offering ticket purchase and reselling for sports games, concerts, and theatrical productions. In 2022, SeatGeek had an average of 47 million daily tickets available, and their mobile app was downloaded 33+ million times.

Historically, SeatGeek used multiple identity and access tools internally. Applications were individually managing authorization, leading to increased overhead and a need for more standardization. SeatGeek sought to simplify the API provided to customers and partners by abstracting and standardizing the authorization layer. They were also looking to introduce centralized API rate-limiting to prevent noisy neighbor problems in their multi-tenant SaaS application.

In this blog, we will take you through SeatGeek’s journey and explore the solution architecture they’ve implemented. As of the publication of this post, many B2B customers have adopted this solution to query terabytes of business data.

Building multi-tenant SaaS environments

Multi-tenant SaaS environments allow highly performant and cost-efficient applications by sharing underlying resources across tenants. While this is a benefit, it is important to implement cross-tenant isolation practices to adhere to security, compliance, and performance objectives. With that, each tenant should only be able to access their authorized resources. Another consideration is the noisy neighbor problem that occurs when one of the tenants monopolizes excessive shared capacity, causing performance issues for other tenants.

Authentication, authorization, and rate-limiting are critical components of a secure and resilient multi-tenant environment. Without these mechanisms in place, there is a risk of unauthorized access, resource-hogging, and denial-of-service attacks, which can compromise the security and stability of the system. Validating access early in the workflow can help eliminate the need for individual applications to implement similar heavy-lifting validation techniques.

SeatGeek had several criteria for addressing these concerns:

  1. They wanted to use their existing Auth0 instance.
  2. SeatGeek did not want to introduce any additional infrastructure management overhead; plus, they preferred to use serverless services to “stitch” managed components together (with minimal effort) to implement their business requirements.
  3. They wanted this solution to scale as seamlessly as possible with demand and adoption increases; concurrently, SeatGeek did not want to pay for idle or over-provisioned resources.

Exploring the solution

The SeatGeek team used a combination of Amazon Web Services (AWS) serverless services to address the aforementioned criteria and achieve the desired business outcome. Amazon API Gateway was used to serve APIs at the entry point to SeatGeek’s cloud environment. API Gateway allowed SeatGeek to use a custom AWS Lambda authorizer for integration with Auth0 and defining throttling configurations for their tenants. Since all the services used in the solution are fully serverless, they do not require infrastructure management, are scaled up and down automatically on-demand, and provide pay-as-you-go pricing.

SeatGeek created a set of tiered usage plans in API Gateway (bronze, silver, and gold) to introduce rate-limiting. Each usage plan had a pre-defined request-per-second rate limit configuration. A unique API key was created by API Gateway for each tenant. Amazon DynamoDB was used to store the association of existing tenant IDs (managed by Auth0) to API keys (managed by API Gateway). This allowed us to keep API key management transparent to SeatGeek’s tenants.

Each new tenant goes through an onboarding workflow. This is an automated process managed with Terraform. During new tenant onboarding, SeatGeek creates a new tenant ID in Auth0, a new API key in API Gateway, and stores association between them in DynamoDB. Each API key is also associated with one of the usage plans.

Once onboarding completes, the new tenant can start invoking SeatGeek APIs (Figure 1).

SeatGeek's fully serverless architecture

Figure 1. SeatGeek’s fully serverless architecture

  1. Tenant authenticates with Auth0 using machine-to-machine authorization. Auth0 returns a JSON web token representing tenant authentication success. The token includes claims required for downstream authorization, such as tenant ID, expiration date, scopes, and signature.
  2. Tenant sends a request to the SeatGeak API. The request includes the token obtained in Step 1 and application-specific parameters, for example, retrieving the last 12 months of booking data.
  3. API Gateway extracts the token and passes it to Lambda authorizer.
  4. Lambda authorizer retrieves the token validation keys from Auth0. The keys are cached in the authorizer, so this happens only once for each authorizer launch environment. This allows token validation locally without calling Auth0 each time, reducing latency and preventing an excessive number of requests to Auth0.
  5. Lambda authorizer performs token validation, checking tokens’ structure, expiration date, signature, audience, and subject. In case validation succeeds, Lambda authorizer extracts the tenant ID from the token.
  6. Lambda authorizer uses tenant ID extracted in Step 5 to retrieve the associated API key from DynamoDB and return it back to API Gateway.
  7. The API Gateway uses API key to check if the client making this particular request is above the rate-limit threshold, based on the usage plan associated with API key. If the rate limit is exceeded, HTTP 429 (“Too Many Requests”) is returned to the client. Otherwise, the request will be forwarded to the backend for further processing.
  8. Optionally, the backend can perform additional application-specific token validations.

Architecture benefits

The architecture implemented by SeatGeek provides several benefits:

  • Centralized authorization: Using Auth0 with API Gateway and Lambda authorizer allows for standardization the API authentication and removes the burden of individual applications having to implement authorization.
  • Multiple levels of caching: Each Lambda authorizer launch environment caches token validation keys in memory to validate tokens locally. This reduces token validation time and helps to avoid excessive traffic to Auth0. In addition, API Gateway can be configured with up to 5 minutes of caching for Lambda authorizer response, so the same token will not be revalidated in that timespan. This reduces overall cost and load on Lambda authorizer and DynamoDB.
  • Noisy neighbor prevention: Usage plans and rate limits prevent any particular tenant from monopolizing the shared resources and causing a negative performance impact for other tenants.
  • Simple management and reduced total cost of ownership: Using AWS serverless services removed the infrastructure maintenance overhead and allowed SeatGeek to deliver business value faster. It also ensured they didn’t pay for over-provisioned capacity, and their environment could scale up and down automatically and on demand.

Conclusion

In this blog, we explored how SeatGeek used AWS serverless services, such as API Gateway, Lambda, and DynamoDB, to integrate with external identity provider Auth0, and implemented per-tenant rate limits with multi-tiered usage plans. Using AWS serverless services allowed SeatGeek to avoid undifferentiated heavy-lifting of infrastructure management and accelerate efforts to build a solution addressing business requirements.

Prime Day 2023 Powered by AWS – All the Numbers

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/prime-day-2023-powered-by-aws-all-the-numbers/

As part of my annual tradition to tell you about how AWS makes Prime Day possible, I am happy to be able to share some chart-topping metrics (check out my 2016, 2017, 2019, 2020, 2021, and 2022 posts for a look back).

This year I bought all kinds of stuff for my hobbies including a small drill press, filament for my 3D printer, and irrigation tools. I also bought some very nice Alphablock books for my grandkids. According to our official release, the first day of Prime Day was the single largest sales day ever on Amazon and for independent sellers, with more than 375 million items purchased.

Prime Day by the Numbers
As always, Prime Day was powered by AWS. Here are some of the most interesting and/or mind-blowing metrics:

Amazon Elastic Block Store (Amazon EBS) – The Amazon Prime Day event resulted in an incremental 163 petabytes of EBS storage capacity allocated – generating a peak of 15.35 trillion requests and 764 petabytes of data transfer per day. Compared to the previous year, Amazon increased the peak usage on EBS by only 7% Year-over-Year yet delivered +35% more traffic per day due to efficiency efforts including workload optimization using Amazon Elastic Compute Cloud (Amazon EC2) AWS Graviton-based instances. Here’s a visual comparison:

AWS CloudTrail – AWS CloudTrail processed over 830 billion events in support of Prime Day 2023.

Amazon DynamoDB – DynamoDB powers multiple high-traffic Amazon properties and systems including Alexa, the Amazon.com sites, and all Amazon fulfillment centers. Over the course of Prime Day, these sources made trillions of calls to the DynamoDB API. DynamoDB maintained high availability while delivering single-digit millisecond responses and peaking at 126 million requests per second.

Amazon Aurora – On Prime Day, 5,835 database instances running the PostgreSQL-compatible and MySQL-compatible editions of Amazon Aurora processed 318 billion transactions, stored 2,140 terabytes of data, and transferred 836 terabytes of data.

Amazon Simple Email Service (SES) – Amazon SES sent 56% more emails for Amazon.com during Prime Day 2023 vs. 2022, delivering 99.8% of those emails to customers.

Amazon CloudFront – Amazon CloudFront handled a peak load of over 500 million HTTP requests per minute, for a total of over 1 trillion HTTP requests during Prime Day.

Amazon SQS – During Prime Day, Amazon SQS set a new traffic record by processing 86 million messages per second at peak. This is 22% increase from Prime Day of 2022, where SQS supported 70.5M messages/sec.

Amazon Elastic Compute Cloud (EC2) – During Prime Day 2023, Amazon used tens of millions of normalized AWS Graviton-based Amazon EC2 instances, 2.7x more than in 2022, to power over 2,600 services. By using more Graviton-based instances, Amazon was able to get the compute capacity needed while using up to 60% less energy.

Amazon Pinpoint – Amazon Pinpoint sent tens of millions of SMS messages to customers during Prime Day 2023 with a delivery success rate of 98.3%.

Prepare to Scale
Every year I reiterate the same message: rigorous preparation is key to the success of Prime Day and our other large-scale events. If you are preparing for a similar chart-topping event of your own, I strongly recommend that you take advantage of AWS Infrastructure Event Management (IEM). As part of an IEM engagement, my colleagues will provide you with architectural and operational guidance that will help you to execute your event with confidence!

Jeff;

Near-real-time analytics using Amazon Redshift streaming ingestion with Amazon Kinesis Data Streams and Amazon DynamoDB

Post Syndicated from Poulomi Dasgupta original https://aws.amazon.com/blogs/big-data/near-real-time-analytics-using-amazon-redshift-streaming-ingestion-with-amazon-kinesis-data-streams-and-amazon-dynamodb/

Amazon Redshift is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, easy, and secure analytics at scale. Tens of thousands of customers rely on Amazon Redshift to analyze exabytes of data and run complex analytical queries, making it the widely used cloud data warehouse. You can run and scale analytics in seconds on all your data without having to manage your data warehouse infrastructure.

You can use the Amazon Redshift streaming ingestion capability to update your analytics databases in near-real time. Amazon Redshift streaming ingestion simplifies data pipelines by letting you create materialized views directly on top of data streams. With this capability in Amazon Redshift, you can use SQL (Structured Query Language) to connect to and directly ingest data from data streams, such as Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) data streams, and pull data directly to Amazon Redshift.

In this post, we discuss a solution that uses Amazon Redshift streaming ingestion to provide near-real-time analytics.

Overview of solution

We walk through an example pipeline to ingest data from an Amazon DynamoDB source table in near-real time using Kinesis Data Streams in combination with Amazon Redshift streaming ingestion. We also walk through using PartiQL in Amazon Redshift to unnest nested JSON documents and build fact and dimension tables that are used in your data warehouse refresh. The solution uses Kinesis Data Streams to capture item-level changes from an application DynamoDB table.

As shown in the following reference architecture, DynamoDB table data changes are streamed into Amazon Redshift through Kinesis Data Streams and Amazon Redshift streaming ingestion for near-real-time analytics dashboard visualization using Amazon QuickSight.

The process flow includes the following steps:

  1. Create a Kinesis data stream and turn on the data stream from DynamoDB to capture item-level changes in your DynamoDB table.
  2. Create a streaming materialized view in your Amazon Redshift cluster to consume live streaming data from the data stream.
  3. The streaming data gets ingested into a JSON payload. Use a combination of a PartiQL statement and dot notation to unnest the JSON document into data columns of a staging table in Amazon Redshift.
  4. Create fact and dimension tables in the Amazon Redshift cluster and keep loading the latest data at regular intervals from the staging table using transformation logic.
  5. Establish connectivity between a QuickSight dashboard and Amazon Redshift to deliver visualization and insights.

Prerequisites

You must have the following:

Set up a Kinesis data stream

To configure your Kinesis data stream, complete the following steps:

  1. Create a Kinesis data stream called demo-data-stream. For instructions, refer to Step 1 in Set up streaming ETL pipelines.

Configure the stream to capture changes from the DynamoDB table.

  1. On the DynamoDB console, choose Tables in the navigation pane.
  2. Open your table.
  3. On the Exports and streams tab, choose Turn on under Amazon Kinesis data stream details.

  1. For Destination Kinesis data stream, choose demo-data-stream.
  2. Choose Turn on stream.

Item-level changes in the DynamoDB table should now be flowing to the Kinesis data stream.

  1. To verify if the data is entering the stream, on the Kinesis Data Streams console, open demo-data-stream.
  2. On the Monitoring tab, find the PutRecord success – average (Percent) and PutRecord – sum (Bytes) metrics to validate record ingestion.

Set up streaming ingestion

To set up streaming ingestion, complete the following steps:

  1. Set up the AWS Identity and Access Management (IAM) role and trust policy required for streaming ingestion. For instructions, refer to Steps 1 and 2 in Getting started with streaming ingestion from Amazon Kinesis Data Streams.
  2. Launch the Query Editor v2 from the Amazon Redshift console or use your preferred SQL client to connect to your Amazon Redshift cluster for the next steps.
  3. Create an external schema:
CREATE EXTERNAL SCHEMA demo_schema
FROM KINESIS
IAM_ROLE { default | 'iam-role-arn' };
  1. To use case-sensitive identifiers, set enable_case_sensitive_identifier to true at either the session or cluster level.
  2. Create a materialized view to consume the stream data and store stream records in semi-structured SUPER format:
CREATE MATERIALIZED VIEW demo_stream_vw AS
    SELECT approximate_arrival_timestamp,
    partition_key,
    shard_id,
    sequence_number,
    json_parse(kinesis_data) as payload    
    FROM demo_schema."demo-data-stream";
  1. Refresh the view, which triggers Amazon Redshift to read from the stream and load data into the materialized view:
REFRESH MATERIALIZED VIEW demo_stream_vw;

You can also set your streaming materialized view to use auto refresh capabilities. This will automatically refresh your materialized view as data arrives in the stream. See CREATE MATERIALIZED VIEW for instructions on how to create a materialized view with auto refresh.

Unnest the JSON document

The following is a sample of a JSON document that was ingested from the Kinesis data stream to the payload column of the streaming materialized view demo_stream_vw:

{
  "awsRegion": "us-east-1",
  "eventID": "6d24680a-6d12-49e2-8a6b-86ffdc7306c1",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "sample-dynamoDB",
  "dynamodb": {
    "ApproximateCreationDateTime": 1657294923614,
    "Keys": {
      "pk": {
        "S": "CUSTOMER#CUST_123"
      },
      "sk": {
        "S": "TRANSACTION#2022-07-08T23:59:59Z#CUST_345"
      }
    },
    "NewImage": {
      "completionDateTime": {
        "S": "2022-07-08T23:59:59Z"
      },
      "OutofPockPercent": {
        "N": 50.00
      },
      "calculationRequirements": {
        "M": {
          "dependentIds": {
            "L": [
              {
                "M": {
                  "sk": {
                    "S": "CUSTOMER#2022-07-08T23:59:59Z#CUST_567"
                  },
                  "pk": {
                    "S": "CUSTOMER#CUST_123"
                  }
                }
              },
              {
                "M": {
                  "sk": {
                    "S": "CUSTOMER#2022-07-08T23:59:59Z#CUST_890"
                  },
                  "pk": {
                    "S": "CUSTOMER#CUST_123"
                  }
                }
              }
            ]
          }
        }
      },
      "Event": {
        "S": "SAMPLE"
      },
      "Provider": {
        "S": "PV-123"
      },
      "OutofPockAmount": {
        "N": 1000
      },
      "lastCalculationDateTime": {
        "S": "2022-07-08T00:00:00Z"
      },
      "sk": {
        "S": "CUSTOMER#2022-07-08T23:59:59Z#CUST_567"
      },
      "OutofPockMax": {
        "N": 2000
      },
      "pk": {
        "S": "CUSTOMER#CUST_123"
      }
    },
    "SizeBytes": 694
  },
  "eventSource": "aws:dynamodb"
}

We can use dot notation to unnest the JSON document. But in addition to that, we should use a PartiQL statement to handle arrays if applicable. For example, in the preceding JSON document, there is an array under the element:

"dynamodb"."NewImage"."calculationRequirements"."M"."dependentIds"."L".

The following SQL query uses a combination of dot notation and a PartiQL statement to unnest the JSON document:

select 
substring(a."payload"."dynamodb"."Keys"."pk"."S"::varchar, position('#' in "payload"."dynamodb"."Keys"."pk"."S"::varchar)+1) as Customer_ID,
substring(a."payload"."dynamodb"."Keys"."sk"."S"::varchar, position('#TRANSACTION' in "payload"."dynamodb"."Keys"."sk"."S"::varchar)+1) as Transaction_ID,
substring(b."M"."sk"."S"::varchar, position('#CUSTOMER' in b."M"."sk"."S"::varchar)+1) Dependent_ID,
a."payload"."dynamodb"."NewImage"."OutofPockMax"."N"::int as OutofPocket_Max,
a."payload"."dynamodb"."NewImage"."OutofPockPercent"."N"::decimal(5,2) as OutofPocket_Percent,
a."payload"."dynamodb"."NewImage"."OutofPockAmount"."N"::int as OutofPock_Amount,
a."payload"."dynamodb"."NewImage"."Provider"."S"::varchar as Provider,
a."payload"."dynamodb"."NewImage"."completionDateTime"."S"::timestamptz as Completion_DateTime,
a."payload"."eventName"::varchar Event_Name,
a.approximate_arrival_timestamp
from demo_stream_vw a
left outer join a."payload"."dynamodb"."NewImage"."calculationRequirements"."M"."dependentIds"."L" b on true;

The query unnests the JSON document to the following result set.

Precompute the result set using a materialized view

Optionally, to precompute and store the unnested result set from the preceding query, you can create a materialized view and schedule it to refresh at regular intervals. In this post, we maintain the preceding unnested data in a materialized view called mv_demo_super_unnest, which will be refreshed at regular intervals and used for further processing.

To capture the latest data from the DynamoDB table, the Amazon Redshift streaming materialized view needs to be refreshed at regular intervals, and then the incremental data should be transformed and loaded into the final fact and dimension table. To avoid reprocessing the same data, a metadata table can be maintained at Amazon Redshift to keep track of each ELT process with status, start time, and end time, as explained in the following section.

Maintain an audit table in Amazon Redshift

The following is a sample DDL of a metadata table that is maintained for each process or job:

create table MetaData_ETL
(
JobName varchar(100),
StartDate timestamp, 
EndDate timestamp, 
Status varchar(50)
);

The following is a sample initial entry of the metadata audit table that can be maintained at job level. The insert statement is the initial entry for the ELT process to load the Customer_Transaction_Fact table:

insert into MetaData_ETL 
values
('Customer_Transaction_Fact_Load', current_timestamp, current_timestamp,'Ready' );

Build a fact table with the latest data

In this post, we demonstrate the loading of a fact table using specific transformation logic. We are skipping the dimension table load, which uses similar logic.

As a prerequisite, create the fact and dimension tables in a preferred schema. In following example, we create the fact table Customer_Transaction_Fact in Amazon Redshift:

CREATE TABLE public.Customer_Transaction_Fact (
Transaction_ID character varying(500),
Customer_ID character varying(500),
OutofPocket_Percent numeric(5,2),
OutofPock_Amount integer,
OutofPocket_Max integer,
Provider character varying(500),
completion_datetime timestamp
);

Transform data using a stored procedure

We load this fact table from the unnested data using a stored procedure. For more information, refer to Creating stored procedures in Amazon Redshift.

Note that in this sample use case, we are using transformation logic to identify and load the latest value of each column for a customer transaction.

The stored procedure contains the following components:

  • In the first step of the stored procedure, the job entry in the MetaData_ETL table is updated to change the status to Running and StartDate as the current timestamp, which indicates that the fact load process is starting.
  • Refresh the materialized view mv_demo_super_unnest, which contains the unnested data.
  • In the following example, we load the fact table Customer_Transaction_Fact using the latest data from the streaming materialized view based on the column approximate_arrival_timestamp, which is available as a system column in the streaming materialized view. The value of approximate_arrival_timestamp is set when a Kinesis data stream successfully receives and stores a record.
  • The following logic in the stored procedure checks if the approximate_arrival_timestamp in mv_demo_super_unnest is greater than the EndDate timestamp in the MetaData_ETL audit table, so that it can only process the incremental data.
  • Additionally, while loading the fact table, we identify the latest non-null value of each column for every Transaction_ID depending on the order of the approximate_arrival_timestamp column using the rank and min
  • The transformed data is loaded into the intermediate staging table
  • The impacted records with the same Transaction_ID values are deleted and reloaded into the Customer_Transaction_Fact table from the staging table
  • In the last step of the stored procedure, the job entry in the MetaData_ETL table is updated to change the status to Complete and EndDate as the current timestamp, which indicates that the fact load process has completed successfully.

See the following code:

CREATE OR REPLACE PROCEDURE SP_Customer_Transaction_Fact()
AS $$
BEGIN

set enable_case_sensitive_identifier to true;

--Update metadata audit table entry to indicate that the fact load process is running
update MetaData_ETL
set status = 'Running',
StartDate = getdate()
where JobName = 'Customer_Transaction_Fact_Load';

refresh materialized view mv_demo_super_unnest;

drop table if exists Customer_Transaction_Fact_Stg;

--Create latest record by Merging records based on approximate_arrival_timestamp
create table Customer_Transaction_Fact_Stg as
select 
m.Transaction_ID,
min(case when m.rank_Customer_ID =1 then m.Customer_ID end) Customer_ID,
min(case when m.rank_OutofPocket_Percent =1 then m.OutofPocket_Percent end) OutofPocket_Percent,
min(case when m.rank_OutofPock_Amount =1 then m.OutofPock_Amount end) OutofPock_Amount,
min(case when m.rank_OutofPocket_Max =1 then m.OutofPocket_Max end) OutofPocket_Max,
min(case when m.rank_Provider =1 then m.Provider end) Provider,
min(case when m.rank_Completion_DateTime =1 then m.Completion_DateTime end) Completion_DateTime
from
(
select *,
rank() over(partition by Transaction_ID order by case when mqp.Customer_ID is not null then 1 end, approximate_arrival_timestamp desc ) rank_Customer_ID,
rank() over(partition by Transaction_ID order by case when mqp.OutofPocket_Percent is not null then 1 end, approximate_arrival_timestamp desc ) rank_OutofPocket_Percent,
rank() over(partition by Transaction_ID order by case when mqp.OutofPock_Amount is not null then 1 end, approximate_arrival_timestamp  desc )  rank_OutofPock_Amount,
rank() over(partition by Transaction_ID order by case when mqp.OutofPocket_Max is not null then 1 end, approximate_arrival_timestamp desc ) rank_OutofPocket_Max,
rank() over(partition by Transaction_ID order by case when mqp.Provider is not null then 1 end, approximate_arrival_timestamp  desc ) rank_Provider,
rank() over(partition by Transaction_ID order by case when mqp.Completion_DateTime is not null then 1 end, approximate_arrival_timestamp desc )  rank_Completion_DateTime
from mv_demo_super_unnest mqp
where upper(mqp.event_Name) <> 'REMOVE' and mqp.approximate_arrival_timestamp > (select mde.EndDate from MetaData_ETL mde where mde.JobName = 'Customer_Transaction_Fact_Load') 
) m
group by m.Transaction_ID 
order by m.Transaction_ID
;

--Delete only impacted Transaction_ID from Fact table
delete from Customer_Transaction_Fact  
where Transaction_ID in ( select mqp.Transaction_ID from Customer_Transaction_Fact_Stg mqp);

--Insert latest records from staging table to Fact table
insert into Customer_Transaction_Fact
select * from Customer_Transaction_Fact_Stg; 

--Update metadata audit table entry to indicate that the fact load process is completed
update MetaData_ETL
set status = 'Complete',
EndDate = getdate()
where JobName = 'Customer_Transaction_Fact_Load';
END;
$$ LANGUAGE plpgsql;

Additional considerations for implementation

There are several additional capabilities that you could utilize to modify this solution to meet your needs. Many customers utilize multiple AWS accounts, and it’s common that the Kinesis data stream may be in a different AWS account than the Amazon Redshift data warehouse. If this is the case, you can utilize an Amazon Redshift IAM role that assumes a role in the Kinesis data stream AWS account in order to read from the data stream. For more information, refer to Cross-account streaming ingestion for Amazon Redshift.

Another common use case is that you need to schedule the refresh of your Amazon Redshift data warehouse jobs so that the data warehouse’s data is continuously updated. To do this, you can utilize Amazon EventBridge to schedule the jobs in your data warehouse to run on a regular basis. For more information, refer to Creating an Amazon EventBridge rule that runs on a schedule. Another option is to use Amazon Redshift Query Editor v2 to schedule the refresh. For details, refer to Scheduling a query with query editor v2.

If you have a requirement to load data from a DynamoDB table with existing data, refer to Loading data from DynamoDB into Amazon Redshift.

For more information on Amazon Redshift streaming ingestion capabilities, refer to Real-time analytics with Amazon Redshift streaming ingestion.

Clean up

To avoid unnecessary charges, clean up any resources that you built as part of this architecture that are no longer in use. This includes dropping the materialized view, stored procedure, external schema, and tables created as part of this post. Additionally, make sure you delete the DynamoDB table and delete the Kinesis data stream.

Conclusion

After following the solution in this post, you’re now able to build near-real-time analytics using Amazon Redshift streaming ingestion. We showed how you can ingest data from a DynamoDB source table using a Kinesis data stream in order to refresh your Amazon Redshift data warehouse. With the capabilities presented in this post, you should be able to increase the refresh rate of your Amazon Redshift data warehouse in order to provide the most up-to-date data in your data warehouse for your use case.


About the authors

Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.

Matt Nispel is an Enterprise Solutions Architect at AWS. He has more than 10 years of experience building cloud architectures for large enterprise companies. At AWS, Matt helps customers rearchitect their applications to take full advantage of the cloud. Matt lives in Minneapolis, Minnesota, and in his free time enjoys spending time with friends and family.

Dan Dressel is a Senior Analytics Specialist Solutions Architect at AWS. He is passionate about databases, analytics, machine learning, and architecting solutions. In his spare time, he enjoys spending time with family, nature walking, and playing foosball.

AWS Week in Review – Updates on Amazon FSx for NetApp ONTAP, AWS Lambda, eksctl, Karpetner, and More – July 17, 2023

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/aws-week-in-review-updates-on-amazon-fsx-for-netapp-ontap-aws-lambda-eksctl-karpetner-and-more-july-17-2023/

The Data Centered: Eastern Oregon, a five-part mini-documentary series looking at the real-life impact of the more than $15 billion investment AWS has made in the local community, and how the company supports jobs, generates economic growth, provides skills training and education, and unlocks opportunities for local businesses suppliers.

Last week, I watched a new episode introducing the Data Center Technician training program offered by AWS to train people with little or no previous technical experience in the skills they need to work in data centers and other information technology (IT) roles. This video reminded me of my first days of cabling and transporting servers in data centers. Remember, there are still people behind cloud computing.

Last Week’s Launches
Here are some launches that got my attention:

Amazon FSx for NetApp ONTAP Updates – Jeff Barr introduced Amazon FSx for NetApp ONTAP support for SnapLock, an ONTAP feature that gives you the power to create volumes that provide write once read many (WORM) functionality for regulatory compliance and ransomware protection. In addition, FSx for NetApp ONTAP now supports IPSec encryption of data in transit and two additional monitoring and troubleshooting capabilities that you can use to monitor file system events and diagnose network connectivity.

AWS Lambda detects and stops recursive loops in Lambda functions – In certain scenarios, due to resource misconfiguration or code defects, a processed event might be sent back to the same service or resource that invoked the Lambda function. This can cause an unintended recursive loop and result in unintended usage and costs for customers. With this launch, Lambda will stop recursive invocations between Amazon SQS, Lambda, and Amazon SNS after 16 recursive calls. For more information, refer to our documentation or the launch blog post.

Email notification

Amazon CloudFront supports for 3072-bit RSA certificates – You can now associate their 3072-bit RSA certificates with CloudFront distributions to enhance communication security between clients and CloudFront edge locations. To get started, associate a 3072-bit RSA certificate with your CloudFront distribution using console or APIs. There are no additional fees associated with this feature. For more information, please refer to the CloudFront Developer Guide.

Running GitHub Actions with AWS CodeBuild – Two weeks ago, AWS CodeBuild started to support GitHub Actions. You can now define GitHub Actions steps directly in the BuildSpec and run them alongside CodeBuild commands. Last week, the AWS DevOps Blog published the blog post about using the Liquibase GitHub Action for deploying changes to an Amazon Aurora database in a private subnet. You can learn how to integrate AWS CodeBuild and nearly 20,000 GitHub Actions developed by the open source community.

CodeBuild configuration showing the GitHub repository URL

Amazon DynamoDB local version 2.0 – You can develop and test applications by running Amazon DynamoDB local in your local development environment without incurring any additional costs. The new 2.0 version allows Java developers to use DynamoDB local to work with Spring Boot 3 and frameworks such as Spring Framework 6 and Micronaut Framework 4 to build modernized, simplified, and lightweight cloud-native applications.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Open Source Updates
Last week, we introduced new open source projects and significant roadmap contributions to the Jupyter community.

New joint maintainership between Weaveworks and AWS for eksctl – Now the eksctl open source project has been moved from the Weaveworks GitHub organization to a new top level GitHub organization—eksctl-io—that will be jointly maintained by Weaveworks and AWS moving forward. The eksctl project can now be found on GitHub.

Karpenter now supports Windows containers – Karpenter is an open source flexible, high-performance Kubernetes node provisioning and management solution that you can use to quickly scale Amazon EKS clusters. With the launch of version 0.29.0, Karpenter extends the automated node provisioning support to Windows containers running on EKS. Read this blog post for a step-by-step guide on how to get started with Karpenter for Windows node groups.

Updates in Amazon Aurora and Amazon OpenSearch Service – Following the announcement of updates to the PostgreSQL database in May by the open source community, we’ve updated Amazon Aurora PostgreSQL-Compatible Edition to support PostgreSQL 15.3, 14.8, 13.11, 12.15, and 11.20. These releases contain product improvements and bug fixes made by the PostgreSQL community, along with Aurora-specific improvements. You can also run OpenSearch version 2.7 in Amazon OpenSearch Service. With OpenSearch 2.7 (also released in May), we’ve made several improvements to observability, security analytics, index management, and geospatial capabilities in OpenSearch Service.

To learn about weekly updates for open source at AWS, check out the latest AWS open source newsletter by Ricardo.

Upcoming AWS Events
Check your calendars and sign up for these AWS events:

AWS Storage Day on August 9 – Join a one-day virtual event that will help you to better understand AWS storage services and make the most of your data. Register today.

AWS Global Summits – Sign up for the AWS Summit closest to your city: Hong Kong (July 20), New York City (July 26), Taiwan (August 2-3), São Paulo (August 3), and Mexico City (August 30).

AWS Community Days – Join a community-led conference run by AWS user group leaders in your region: Malaysia (July 22), Philippines (July 29-30), Colombia (August 12), and West Africa (August 19).

AWS re:Invent 2023 – Join us to hear the latest from AWS, learn from experts, and connect with the global cloud community. Registration is now open.

You can browse all upcoming AWS-led in-person and virtual events, and developer-focused events such as AWS DevDay.

Take the AWS Blog Customer Survey
We’re focused on improving our content to provide a better customer experience, and we need your feedback to do so. Take our survey to share insights regarding your experience on the AWS Blog.

This survey is hosted by an external company. AWS handles your information as described in the AWS Privacy Notice. AWS will own the data gathered via this survey and will not share the information collected with survey respondents.

That’s all for this week. Check back next Monday for another Week in Review!

Channy

This post is part of our Week in Review series. Check back each week for a quick roundup of interesting news and announcements from AWS!

IBM Consulting creates innovative AWS solutions in French Hackathon

Post Syndicated from Diego Colombatto original https://aws.amazon.com/blogs/architecture/ibm-consulting-creates-innovative-aws-solutions-in-french-hackathon/

In March 2023, IBM Consulting delivered an Innovation Hackathon in France, aimed at designing and building new innovative solutions for real customer use cases using the AWS Cloud.

In this post, we briefly explore six of the solutions considered and demonstrate the AWS architectures created and implemented during the Hackathon.

Hackathon solutions

Solution 1: Optimize digital channels monitoring and management for Marketing

Monitoring Marketing campaign impact can require a lot of effort, such as customers and competitors’ reactions on digital media channels. Digital campaign managers need this data to evaluate customer segment penetration and overall campaign effectiveness. Information can be collected via digital-channel API integrations or on the digital channel user interface (UI): digital-channel API integrations require frequent maintenance, while UI data collection can be labor-intensive.

On the AWS Cloud, IBM designed an augmented digital campaign manager solution, to assist digital campaign managers with digital-channel monitoring and management. This solution monitors social media APIs and, when APIs change, automatically updates the API integration, ensuring accurate information collection (Figure 1).

Optimize digital channels monitoring and management for Marketing

Figure 1. Optimize digital channels monitoring and management for Marketing

  1. Amazon Simple Storage Service (Amazon S3) and AWS Lambda are used to garner new digital estates, such as new social media APIs, and assess data quality.
  2. Amazon Kinesis Data Streams is used to decouple data ingestion from data query and storage.
  3. Lambda retrieves the required information from Amazon DynamoDB, like the most relevant brands; natural language processing (NLP) is applied to retrieved data, like URL, bio, about, verification status.
  4. Amazon S3 and Amazon CloudFront are used to present a dashboard where end-users can check, enrich, and validate collected data.
  5. When graph API calls detect an error/change, Lambda checks API documentation to update/correct the API call.
  6. A new Lambda function is generated, with updated API call.

Solution 2: 4th party logistics consulting service for a greener supply chain

Logistics companies have a wealth of trip data, both first- and third-party, and can leverage these data to provide new customer services, such as options for trips booking with optimized carbon footprint, duration, or costs.

IBM designed an AWS solution (Figure 2) enabling the customer to book goods transport by selecting from different route options, combining transport modes, selecting departure-location, arrival, cargo weight and carbon emissions. Proposed options include the greenest, fastest, and cheapest routes. Additionally, the user can provide financial and time constraints.

Optimized transport booking architecture

Figure 2. Optimized transport booking architecture

  1. User connects to web-app UI, hosted on Amazon S3.
  2. Amazon API Gateway receives user requests from web app; requests are forwarded to Lambda.
  3. Lambda calculates the best trip options based on the user’s prerequisites, such as carbon emissions.
  4. Lambda estimates carbon emissions; estimates are combined with trip options at Step 3.
  5. Amazon Neptune graph database is used to efficiently store and query trip data.
  6. Different Lambda instances are used to ingest data from on-premises data sources and send customer bookings through the customer ordering system.

Solution 3: Purchase order as a service

In the context of vendor-managed inventory and vendor-managed replenishment, inventory and logistics companies want to check on warehouse stock levels to identify the best available options for goods transport. Their objective is to optimize the availability of warehouse stock for order fulfillment; therefore, when a purchase order (PO) is received, required goods are identified as available in the correct warehouse, enabling swift delivery with minimal lead time and costs.

IBM designed an AWS PO as a service solution (Figure 3), using warehouse data to forecast future customer’s POs. Based on this forecast, the solution plans and optimizes warehouse goods availability and, hence, logistics required for the PO fulfillment.

Purchase order as a service AWS solution

Figure 3. Purchase order as a service AWS solution

  1. AWS Amplify provides web-mobile UI where users can set constraints (such as warehouse capacity, minimum/maximum capacity) and check: warehouses’ states, POs in progress. Additionally, UI proposes possible optimized POs, which are automatically generated by the solution. If the user accepts one of these solution-generated POs, the user will benefit from optimized delivery time, costs and carbon-footprint.
  2. Lambda receives Amazon Forecast inferences and reads/writes PO information on Amazon DynamoDB.
  3. Forecast provides inferences regarding the most probable future POs. Forecast uses POs, warehouse data, and goods delivery data to automatically train a machine learning (ML) model that is used to generate forecast inferences.
  4. Amazon DynamoDB stores PO and warehouse information.
  5. Lambda pushes PO, warehouse, and goods delivery data from Amazon DynamoDB into Amazon S3. These data are used in the Forecast ML-model re-train, to ensure high quality forecasting inferences.

Solution 4: Optimize environmental impact associated with engineers’ interventions for customer fiber connections

Telco companies that provide end-users’ internet connections need engineers executing field tasks, like deploying, activating, and repairing subscribers’ lines. In this scenario, it’s important to identify the most efficient engineers’ itinerary.

IBM designed an AWS solution that automatically generates engineers’ itineraries that consider criteria such as mileage, carbon-emission generation, and electric-/thermal-vehicle availability.

The solution (Figure 4) provides:

  • Customer management teams with a mobile dashboard showing carbon-emissions estimates for all engineers’ journeys, both in-progress and planned
  • Engineers with a mobile application including an optimized itinerary, trip updates based on real time traffic, and unexpected events
AWS telco solution for greener customer service

Figure 4. AWS telco solution for greener customer service

  1. Management team and engineers connect to web/mobile application, respectively. Amazon Cognito provides authentication and authorization, Amazon S3 stores application static content, and API Gateway receives and forwards API requests.
  2. AWS Step Functions implements different workflows. Application logic is implemented in Lambda, which connects to DynamoDB to get trip data (current route and driver location); Amazon Location Service provides itineraries, and Amazon SageMaker ML model implements itinerary optimization engine.
  3. Independently from online users, trip data are periodically sent to API Gateway and stored in Amazon S3.
  4. SageMaker notebook periodically uses Amazon S3 data to re-train the trip optimization ML model with updated data.

Solution 5: Improve the effectiveness of customer SAP level 1 support by reducing response times for common information requests

Companies using SAP usually provide first-level support to their internal SAP users. SAP users engage the support (usually via ticketing system) to ask for help when facing SAP issues or to request additional information. A high number of information requests requires significant effort to retrieve and provide the available information on resources like SAP notes/documentation or similar support requests.

IBM designed an AWS solution (Figure 5), based on support request information, that can automatically provide a short list of most probable solutions with a confidence score.

SAP customer support solution

Figure 5. SAP customer support solution

  1. Lambda receives ticket information, such as ticket number, business service, and description.
  2. Lambda processes ticket data and Amazon Translate translates text into country native-language and English.
  3. SageMaker ML model receives the question and provides the inference.
  4. If the inference has a high confidence score, Lambda provides it immediately as output.
  5. If the inference has a low confidence score, Amazon Kendra receives the question, searches automatically through indexed company information and provides the best answer available. Lambda then provides the answer as output.

Solution 6: Improve contact center customer experience providing faster and more accurate customer support

Insured customers often interact with insurer companies using contact centers, requesting information and services regarding their insurance policies.

IBM designed an AWS solution improving end-customer experience and contact center agent efficiency by providing automated customer-agent call/chat summarization. This enables:

  • The agent to quickly recall the customer need in following interactions
  • Contact center supervisor to quickly understand the objective of each case (intervening if necessary)
  • Insured customers to quickly have the information required, without repeating information already provided
Improving contact center customer experience

Figure 6. Improving contact center customer experience

Summarization capability is provided by generative AI, leveraging large language models (LLM) on SageMaker.

  1. Pretrained LLM model from Hugging Face is stored on Amazon S3.
  2. LLM model is fine-tuned and trained using Amazon SageMaker.
  3. LLM model is made available as SageMaker API endpoint, ready to provide inferences.
  4. Insured user contact customer support; the user request goes through voice/chatbot, then reaches Amazon Connect.
  5. Lambda queries the LLM model. The inference is provided by LLM and it’s sent to an Amazon Connect instance, where inference is enriched with knowledge-based search, using Amazon Connect Wisdom.
  6. If the user–agent conversation was a voice interaction (like a phone call), then the call recording is transcribed using Amazon Transcribe. Then, Lambda is called for summarization.

Conclusion

In this blog post, we have explored how IBM Consulting delivered an Innovation Hackathon in France. During the Hackathon, IBM worked backward from real customer use cases, designing and building innovative solutions using AWS services.

Reduce archive cost with serverless data archiving

Post Syndicated from Rostislav Markov original https://aws.amazon.com/blogs/architecture/reduce-archive-cost-with-serverless-data-archiving/

For regulatory reasons, decommissioning core business systems in financial services and insurance (FSI) markets requires data to remain accessible years after the application is retired. Traditionally, FSI companies either outsourced data archiving to third-party service providers, which maintained application replicas, or purchased vendor software to query and visualize archival data.

In this blog post, we present a more cost-efficient option with serverless data archiving on Amazon Web Services (AWS). In our experience, you can build your own cloud-native solution on Amazon Simple Storage Service (Amazon S3) at one-fifth of the price of third-party alternatives. If you are retiring legacy core business systems, consider serverless data archiving for cost-savings while keeping regulatory compliance.

Serverless data archiving and retrieval

Modern archiving solutions follow the principles of modern applications:

  • Serverless-first development, to reduce management overhead.
  • Cloud-native, to leverage native capabilities of AWS services, such as backup or disaster recovery, to avoid custom build.
  • Consumption-based pricing, since data archival is consumed irregularly.
  • Speed of delivery, as both implementation and archive operations need to be performed quickly to fulfill regulatory compliance.
  • Flexible data retention policies can be enforced in an automated manner.

AWS Storage and Analytics services offer the necessary building blocks for a modern serverless archiving and retrieval solution.

Data archiving can be implemented on top of Amazon S3) and AWS Glue.

  1. Amazon S3 storage tiers enable different data retention policies and retrieval service level agreements (SLAs). You can migrate data to Amazon S3 using AWS Database Migration Service; otherwise, consider another data transfer service, such as AWS DataSync or AWS Snowball.
  2. AWS Glue crawlers automatically infer both database and table schemas from your data in Amazon S3 and store the associated metadata in the AWS Glue Data Catalog.
  3. Amazon CloudWatch monitors the execution of AWS Glue crawlers and notifies of failures.

Figure 1 provides an overview of the solution.

Serverless data archiving and retrieval

Figure 1. Serverless data archiving and retrieval

Once the archival data is catalogued, Amazon Athena can be used for serverless data query operations using standard SQL.

  1. Amazon API Gateway receives the data retrieval requests and eases integration with other systems via REST, HTTPS, or WebSocket.
  2. AWS Lambda reads parametrization data/templates from Amazon S3 in order to construct the SQL queries. Alternatively, query templates can be stored as key-value entries in a NoSQL store, such as Amazon DynamoDB.
  3. Lambda functions trigger Athena with the constructed SQL query.
  4. Athena uses the AWS Glue Data Catalog to retrieve table metadata for the Amazon S3 (archival) data and to return the SQL query results.

How we built serverless data archiving

An early build-or-buy assessment compared vendor products with a custom-built solution using Amazon S3, AWS Glue, and a user frontend for data retrieval and visualization.

The total cost of ownership over a 10-year period for one insurance core system (Policy Admin System) was $0.25M to build and run the custom solution on AWS compared with >$1.1M for third-party alternatives. The implementation cost advantage of the custom-built solution was due to development efficiencies using AWS services. The lower run cost resulted from a decreased frequency of archival usage and paying only for what you use.

The data archiving solution was implemented with AWS services (Figure 2):

  1. Amazon S3 is used to persist archival data in Parquet format (optimized for analytics and compressed to reduce storage space) that is loaded from the legacy insurance core system. The archival data source was AS400/DB2 and moved with Informatica Cloud to Amazon S3.
  2. AWS Glue crawlers infer the database schema from objects in Amazon S3 and create tables in AWS Glue for the decommissioned application data.
  3. Lambda functions (Python) remove data records based on retention policies configured for each domain, such as customers, policies, claims, and receipts. A daily job (Control-M) initiates the retention process.
Exemplary implementation of serverless data archiving and retrieval for insurance core system

Figure 2. Exemplary implementation of serverless data archiving and retrieval for insurance core system

Retrieval operations are formulated and executed via Python functions in Lambda. The following AWS resources implement the retrieval logic:

  1. Athena is used to run SQL queries over the AWS Glue tables for the decommissioned application.
  2. Lambda functions (Python) build and execute queries for data retrieval. The functions render HMTL snippets using Jinja templating engine and Athena query results, returning the selected template filled with the requested archive data. Using Jinja as templating engine improved the speed of delivery and reduced the heavy lifting of frontend and backend changes when modeling retrieval operations by ~30% due to the decoupling between application layers. As a result, engineers only need to build an Athena query with the linked Jinja template.
  3. Amazon S3 stores templating configuration and queries (JSON files) used for query parametrization.
  4. Amazon API Gateway serves as single point of entry for API calls.

The user frontend for data retrieval and visualization is implemented as web application using React JavaScript library (with static content on Amazon S3) and Amazon CloudFront used for web content delivery.

The archiving solution enabled 80 use cases with 60 queries and reduced storage from three terabytes on source to only 35 gigabytes on Amazon S3. The success of the implementation depended on the following key factors:

  • Appropriate sponsorship from business across all areas (claims, actuarial, compliance, etc.)
  • Definition of SLAs for responding to courts, regulators, etc.
  • Minimum viable and mandatory approach
  • Prototype visualizations early on (fail fast)

Conclusion

Traditionally, FSI companies relied on vendor products for data archiving. In this post, we explored how to build a scalable solution on Amazon S3 and discussed key implementation considerations. We have demonstrated that AWS services enable FSI companies to build a serverless archiving solution while reaching and keeping regulatory compliance at a lower cost.

Learn more about some of the AWS services covered in this blog:

Build an Amazon Redshift data warehouse using an Amazon DynamoDB single-table design

Post Syndicated from Altaf Hussain original https://aws.amazon.com/blogs/big-data/build-an-amazon-redshift-data-warehouse-using-an-amazon-dynamodb-single-table-design/

Amazon DynamoDB is a fully managed NoSQL service that delivers single-digit millisecond performance at any scale. It’s used by thousands of customers for mission-critical workloads. Typical use cases for DynamoDB are an ecommerce application handling a high volume of transactions, or a gaming application that needs to maintain scorecards for players and games. In traditional databases, we would model such applications using a normalized data model (entity-relation diagram). This approach comes with a heavy computational cost in terms of processing and distributing the data across multiple tables while ensuring the system is ACID-compliant at all times, which can negatively impact performance and scalability. If these entities are frequently queried together, it makes sense to store them in a single table in DynamoDB. This is the concept of single-table design. Storing different types of data in a single table allows you to retrieve multiple, heterogeneous item types using a single request. Such requests are relatively straightforward, and usually take the following form:

SELECT * FROM TABLE WHERE Some_Attribute = 'some_value'

In this format, some_attribute is a partition key or part of an index.

Nonetheless, many of the same customers using DynamoDB would also like to be able to perform aggregations and ad hoc queries against their data to measure important KPIs that are pertinent to their business. Suppose we have a successful ecommerce application handling a high volume of sales transactions in DynamoDB. A typical ask for this data may be to identify sales trends as well as sales growth on a yearly, monthly, or even daily basis. These types of queries require complex aggregations over a large number of records. A key pillar of AWS’s modern data strategy is the use of purpose-built data stores for specific use cases to achieve performance, cost, and scale. Deriving business insights by identifying year-on-year sales growth is an example of an online analytical processing (OLAP) query. These types of queries are suited for a data warehouse.

The goal of a data warehouse is to enable businesses to analyze their data fast; this is important because it means they are able to gain valuable insights in a timely manner. Amazon Redshift is fully managed, scalable, cloud data warehouse. Building a performant data warehouse is non-trivial because the data needs to be highly curated to serve as a reliable and accurate version of the truth.

In this post, we walk through the process of exporting data from a DynamoDB table to Amazon Redshift. We discuss data model design for both NoSQL databases and SQL data warehouses. We begin with a single-table design as an initial state and build a scalable batch extract, load, and transform (ELT) pipeline to restructure the data into a dimensional model for OLAP workloads.

DynamoDB table example

We use an example of a successful ecommerce store allowing registered users to order products from their website. A simple ERD (entity-relation diagram) for this application will have four distinct entities: customers, addresses, orders, and products. For customers, we have information such as their unique user name and email address; for the address entity, we have one or more customer addresses. Orders contain information regarding the order placed, and the products entity provides information about the products placed in an order. As we can see from the following diagram, a customer can place one or more orders, and an order must contain one or more products.

We could store each entity in a separate table in DynamoDB. However, there is no way to retrieve customer details alongside all the orders placed by the customer without making multiple requests to the customer and order tables. This is inefficient from both a cost and performance perspective. A key goal for any efficient application is to retrieve all the required information in a single query request. This ensures fast, consistent performance. So how can we remodel our data to avoid making multiple requests? One option is to use single-table design. Taking advantage of the schema-less nature of DynamoDB, we can store different types of records in a single table in order to handle different access patterns in a single request. We can go further still and store different types of values in the same attribute and use it as a global secondary index (GSI). This is called index overloading.

A typical access pattern we may want to handle in our single table design is to get customer details and all orders placed by the customer.

To accommodate this access pattern, our single-table design looks like the following example.

By restricting the number of addresses associated with a customer, we can store address details as a complex attribute (rather than a separate item) without exceeding the 400 KB item size limit of DynamoDB.

We can add a global secondary index (GSIpk and GSIsk) to capture another access pattern: get order details and all product items placed in an order. We use the following table.

We have used generic attribute names, PK and SK, for our partition key and sort key columns. This is because they hold data from different entities. Furthermore, the values in these columns are prefixed by generic terms such as CUST# and ORD# to help us identify the type of data we have and ensure that the value in PK is unique across all records in the table.

A well-designed single table will not only reduce the number of requests for an access pattern, but will service many different access patterns. The challenge comes when we need to ask more complex questions of our data, for example, what was the year-on-year quarterly sales growth by product broken down by country?

The case for a data warehouse

A data warehouse is ideally suited to answer OLAP queries. Built on highly curated structured data, it provides the flexibility and speed to run aggregations across an entire dataset to derive insights.

To house our data, we need to define a data model. An optimal design choice is to use a dimensional model. A dimension model consists of fact tables and dimension tables. Fact tables store the numeric information about business measures and foreign keys to the dimension tables. Dimension tables store descriptive information about the business facts to help understand and analyze the data better. From a business perspective, a dimension model with its use of facts and dimensions can present complex business processes in a simple-to-understand manner.

Building a dimensional model

A dimensional model optimizes read performance through efficient joins and filters. Amazon Redshift automatically chooses the best distribution style and sort key based on workload patterns. We build a dimensional model from the single DynamoDB table based on the following star schema.

We have separated each item type into individual tables. We have a single fact table (Orders) containing the business measures price and numberofitems, and foreign keys to the dimension tables. By storing the price of each product in the fact table, we can track price fluctuations in the fact table without continually updating the product dimension. (In a similar vein, the DynamoDB attribute amount is a simple derived measure in our star schema: amount is the summation of product prices per orderid).

By splitting the descriptive content of our single DynamoDB table into multiple Amazon Redshift dimension tables, we can remove redundancy by only holding in each dimension the information pertinent to it. This allows us the flexibility to query the data under different contexts; for example, we may want to know the frequency of customer orders by city or product sales by date. The ability to freely join dimensions and facts when analyzing the data is one of the key benefits of dimensional modeling. It’s also good practice to have a Date dimension to allow us to perform time-based analysis by aggregating the fact by year, month, quarter, and so forth.

This dimensional model will be built in Amazon Redshift. When setting out to build a data warehouse, it’s a common pattern to have a data lake as the source of the data warehouse. The data lake in this context serves a number of important functions:

  • It acts as a central source for multiple applications, not just exclusively for data warehousing purposes. For example, the same dataset could be used to build machine learning (ML) models to identify trends and predict sales.
  • It can store data as is, be it unstructured, semi-structured, or structured. This allows you to explore and analyze the data without committing upfront to what the structure of the data should be.
  • It can be used to offload historical or less-frequently-accessed data, allowing you to manage your compute and storage costs more effectively. In our analytic use case, if we are analyzing quarterly growth rates, we may only need a couple of years’ worth of data; the rest can be unloaded into the data lake.

When querying a data lake, we need to consider user access patterns in order to reduce costs and optimize query performance. This is achieved by partitioning the data. The choice of partition keys will depend on how you query the data. For example, if you query the data by customer or country, then they are good candidates for partition keys; if you query by date, then a date hierarchy can be used to partition the data.

After the data is partitioned, we want to ensure it’s held in the right format for optimal query performance. The recommended choice is to use a columnar format such as Parquet or ORC. Such formats are compressed and store data column-wise, allowing for fast retrieval times, and are parallelizable, allowing for fast load times when moving the data into Amazon Redshift. In our use case, it makes sense to store the data in a data lake with minimal transformation and formatting to enable easy querying and exploration of the dataset. We partition the data by item type (Customer, Order, Product, and so on), and because we want to easily query each entity in order to move the data into our data warehouse, we transform the data into the Parquet format.

Solution overview

The following diagram illustrates the data flow to export data from a DynamoDB table to a data warehouse.

We present a batch ELT solution using AWS Glue for exporting data stored in DynamoDB to an Amazon Simple Storage Service (Amazon S3) data lake and then a data warehouse built in Amazon Redshift. AWS Glue is a fully managed extract, transform, and load (ETL) service that allows you to organize, cleanse, validate, and format data for storage in a data warehouse or data lake.

The solution workflow has the following steps:

  1. Move any existing files from the raw and data lake buckets into corresponding archive buckets to ensure any fresh export from DynamoDB to Amazon S3 isn’t duplicating data.
  2. Begin a new DynamoDB export to the S3 raw layer.
  3. From the raw files, create a data lake partitioned by item type.
  4. Load the data from the data lake to landing tables in Amazon Redshift.
  5. After the data is loaded, we take advantage of the distributed compute capability of Amazon Redshift to transform the data into our dimensional model and populate the data warehouse.

We orchestrate the pipeline using an AWS Step Functions workflow and schedule a daily batch run using Amazon EventBridge.

For simpler DynamoDB table structures you may consider skipping some of these steps by either loading data directly from DynamoDB to Redshift or using Redshift’s auto-copy or copy command to load data from S3.

Prerequisites

You must have an AWS account with a user who has programmatic access. For setup instructions, refer to AWS security credentials.

Use the AWS CloudFormation template cf_template_ddb-dwh-blog.yaml to launch the following resources:

  • A DynamoDB table with a GSI and point-in-time recovery enabled.
  • An Amazon Redshift cluster (we use two nodes of RA3.4xlarge).
  • Three AWS Glue database catalogs: raw, datalake, and redshift.
  • Five S3 buckets: two for the raw and data lake files; two for their respective archives, and one for the Amazon Athena query results.
  • Two AWS Identity and Access Management (IAM) roles: An AWS Glue role and a Step Functions role with the requisite permissions and access to resources.
  • A JDBC connection to Amazon Redshift.
  • An AWS Lambda function to retrieve the s3-prefix-list-id for your Region. This is required to allow traffic from a VPC to access an AWS service through a gateway VPC endpoint.
  • Download the following files to perform the ELT:
    • The Python script to load sample data into our DynamoDB table: load_dynamodb.py.
    • The AWS Glue Python Spark script to archive the raw and data lake files: archive_job.py.
    • The AWS Glue Spark scripts to extract and load the data from DynamoDB to Amazon Redshift: GlueSparkJobs.zip.
    • The DDL and DML SQL scripts to create the tables and load the data into the data warehouse in Amazon Redshift: SQL Scripts.zip.

Launch the CloudFormation template

AWS CloudFormation allows you to model, provision, and scale your AWS resources by treating infrastructure as code. We use the downloaded CloudFormation template to create a stack (with new resources).

  1. On the AWS CloudFormation console, create a new stack and select Template is ready.
  2. Upload the stack and choose Next.

  1. Enter a name for your stack.
  2. For MasterUserPassword, enter a password.
  3. Optionally, replace the default names for the Amazon Redshift database, DynamoDB table, and MasterUsername (in case these names are already in use).
  4. Reviewed the details and acknowledge that AWS CloudFormation may create IAM resources on your behalf.
  5. Choose Create stack.

Load sample data into a DynamoDB table

To load your sample data into DynamoDB, complete the following steps:

  1. Create an AWS Cloud9 environment with default settings.
  2. Upload the load DynamoDB Python script. From the AWS Cloud9 terminal, use the pip install command to install the following packages:
    1. boto3
    2. faker
    3. faker_commerce
    4. numpy
  3. In the Python script, replace all placeholders (capital letters) with the appropriate values and run the following command in the terminal:
python load_dynamodb.py

This command loads the sample data into our single DynamoDB table.

Extract data from DynamoDB

To extract the data from DynamoDB to our S3 data lake, we use the new AWS Glue DynamoDB export connector. Unlike the old connector, the new version uses a snapshot of the DynamoDB table and doesn’t consume read capacity units of your source DynamoDB table. For large DynamoDB tables exceeding 100 GB, the read performance of the new AWS Glue DynamoDB export connector is not only consistent but also significantly faster than the previous version.

To use this new export connector, you need to enable point-in-time recovery (PITR) for the source DynamoDB table in advance. This will take continuous backups of the source table (so be mindful of cost) and ensures that each time the connector invokes an export, the data is fresh. The time it takes to complete an export depends on the size of your table and how uniformly the data is distributed therein. This can range from a few minutes for small tables (up to 10 GiB) to a few hours for larger tables (up to a few terabytes). This is not a concern for our use case because data lakes and data warehouses are typically used to aggregate data at scale and generate daily, weekly, or monthly reports. It’s also worth noting that each export is a full refresh of the data, so in order to build a scalable automated data pipeline, we need to archive the existing files before beginning a fresh export from DynamoDB.

Complete the following steps:

  1. Create an AWS Glue job using the Spark script editor.
  2. Upload the archive_job.py file from GlueSparkJobs.zip.

This job archives the data files into timestamped folders. We run the job concurrently to archive the raw files and the data lake files.

  1. In Job details section, give the job a name and choose the AWS Glue IAM role created by our CloudFormation template.
  2. Keep all defaults the same and ensure maximum concurrency is set to 2 (under Advanced properties).

Archiving the files provides a backup option in the event of disaster recovery. As such, we can assume that the files will not be accessed frequently and can be kept in Standard_IA storage class so as to save up to 40% on costs while providing rapid access to the files when needed.

This job typically runs before each export of data from DynamoDB. After the datasets have been archived, we’re ready to (re)-export the data from our DynamoDB table.

We can use AWS Glue Studio to visually create the jobs needed to extract the data from DynamoDB and load into our Amazon Redshift data warehouse. We demonstrate how to do this by creating an AWS Glue job (called ddb_export_raw_job) using AWS Glue Studio.

  1. In AWS Glue Studio, create a job and select Visual with a blank canvas.
  2. Choose Amazon DynamoDB as the data source.

  1. Choose our DynamoDB table to export from.
  2. Leave all other options as is and finish setting up the source connection.

We then choose Amazon S3 as our target. In the target properties, we can transform the output to a suitable format, apply compression, and specify the S3 location to store our raw data.

  1. Set the following options:
    1. For Format, choose Parquet.
    2. For Compression type, choose Snappy.
    3. For S3 Target Location, enter the path for RawBucket (located on the Outputs tab of the CloudFormation stack).
    4. For Database, choose the value for GlueRawDatabase (from the CloudFormation stack output).
    5. For Table name, enter an appropriate name.

  1. Because our target data warehouse requires data to be in a flat structure, verify that the configuration option dynamodb.unnestDDBJson is set to True on the Script tab.

  1. On the Job details tab, choose the AWS Glue IAM role generated by the CloudFormation template.
  2. Save and run the job.

Depending on the data volumes being exported, this job may take a few minutes to complete.

Because we’ll be adding the table to our AWS Glue Data Catalog, we can explore the output using Athena after the job is complete. Athena is a serverless interactive query service that makes it simple to analyze data directly in Amazon S3 using standard SQL.

  1. In the Athena query editor, choose the raw database.

We can see that the attributes of the Address structure have been unnested and added as additional columns to the table.

  1. After we export the data into the raw bucket, create another job (called raw_to_datalake_job) using AWS Glue Studio (select Visual with a blank canvas) to load the data lake partitioned by item type (customer, order, and product).
  2. Set the source as the AWS Glue Data Catalog raw database and table.

  1. In the ApplyMapping transformation, drop the Address struct because we have already unnested these attributes into our flattened raw table.

  1. Set the target as our S3 data lake.

  1. Choose the AWS Glue IAM role in the job details, then save and run the job.

Now that we have our data lake, we’re ready to build our data warehouse.

Build the dimensional model in Amazon Redshift

The CloudFormation template launches a two-node RA3.4xlarge Amazon Redshift cluster. To build the dimensional model, complete the following steps:

  1. In Amazon Redshift Query Editor V2, connect to your database (default: salesdwh) within the cluster using the database user name and password authentication (MasterUserName and MasterUserPassword from the CloudFormation template).
  2. You may be asked to configure your account if this is your first time using Query Editor V2.
  3. Download the SQL scripts SQL Scripts.zip to create the following schemas and tables (run the scripts in numbered sequence).

In the landing schema:

  • address
  • customer
  • order
  • product

In the staging schema:

  • staging.address
  • staging.address_maxkey
  • staging.addresskey
  • staging.customer
  • staging.customer_maxkey
  • staging.customerkey
  • staging.date
  • staging.date_maxkey
  • staging.datekey
  • staging.order
  • staging.order_maxkey
  • staging.orderkey
  • staging.product
  • staging.product_maxkey
  • staging.productkey

In the dwh schema:

  • dwh.address
  • dwh.customer
  • dwh.order
  • dwh.product

We load the data from our data lake to the landing schema as is.

  1. Use the JDBC connector to Amazon Redshift to build an AWS Glue crawler to add the landing schema to our Data Catalog under the ddb_redshift database.

  1. Create an AWS Glue crawler with the JDBC data source.

  1. Select the JDBC connection you created and choose Next.

  1. Choose the IAM role created by the CloudFormation template and choose Next.

  1. Review your settings before creating the crawler.

The crawler adds the four landing tables in our AWS Glue database ddb_redshift.

  1. In AWS Glue Studio, create four AWS Glue jobs to load the landing tables (these scripts are available to download, and you can use the Spark script editor to upload these scripts individually to create the jobs):
    1. land_order_job
    2. land_product_job
    3. land_customer_job
    4. land_address_job

Each job has the structure as shown in the following screenshot.

  1. Filter the S3 source on the partition column type:
    1. For product, filter on type=‘product’.
    2. For order, filter on type=‘order’.
    3. For customer and address, filter on type=‘customer’.

  1. Set the target for the data flow as the corresponding table in the landing schema in Amazon Redshift.
  2. Use the built-in ApplyMapping transformation in our data pipeline to drop columns and, where necessary, convert the data types to match the target columns.

For more information about built-in transforms available in AWS Glue, refer to AWS Glue PySpark transforms reference.

The mappings for our four jobs are as follows:

  • land_order_job:
    mappings=[
    ("pk", "string", "pk", "string"),
    ("orderid", "string", "orderid", "string"),
    ("numberofitems", "string", "numberofitems", "int"),
    ("orderdate", "string", "orderdate", "timestamp"),
    ]

  • land_product_job:
    mappings=[
    ("orderid", "string", "orderid", "string"),
    ("category", "string", "category", "string"),
    ("price", "string", "price", "decimal"),
    ("productname", "string", "productname", "string"),
    ("productid", "string", "productid", "string"),
    ("color", "string", "color", "string"),
    ]

  • land_address_job:
    mappings=[
    ("username", "string", "username", "string"),
    ("email", "string", "email", "string"),
    ("fullname", "string", "fullname", "string"),
    ]

  • land_customer_job:
    mappings=[
    ("username", "string", "username", "string"),
    ("email", "string", "email", "string"),
    ("fullname", "string", "fullname", "string"),
    ]

  1. Choose the AWS Glue IAM role, and under Advanced properties, verify the JDBC connector to Amazon Redshift as a connection.
  2. Save and run each job to load the landing tables in Amazon Redshift.

Populate the data warehouse

From the landing schema, we move the data to the staging layer and apply the necessary transformations. Our dimensional model has a single fact table, the orders table, which is the largest table and as such needs a distribution key. The choice of key depends on how the data is queried and the size of the dimension tables being joined to. If you’re unsure of your query patterns, you can leave the distribution keys and sort keys for your tables unspecified. Amazon Redshift automatically assigns the correct distribution and sort keys based on your queries. This has the advantage that if and when query patterns change over time, Amazon Redshift can automatically update the keys to reflect the change in usage.

In the staging schema, we keep track of existing records based on their business key (the unique identifier for the record). We create key tables to generate a numeric identity column for each table based on the business key. These key tables allow us to implement an incremental transformation of the data into our dimensional model.

CREATE TABLE IF NOT EXISTS staging.productkey ( 
    productkey integer identity(1,1), 
    productid character varying(16383), 
    CONSTRAINT products_pkey PRIMARY KEY(productkey));   

When loading the data, we need to keep track of the latest surrogate key value to ensure that new records are assigned the correct increment. We do this using maxkey tables (pre-populated with zero):

CREATE TABLE IF NOT EXISTS staging.product_maxkey ( 
    productmaxkey integer);

INSERT INTO staging.product_maxkey
select 0;    

We use staging tables to store our incremental load, the structure of which will mirror our final target model in the dwh schema:

---staging tables to load data from data lake 
   
CREATE TABLE IF NOT EXISTS staging.product ( 
    productkey integer,
    productname character varying(200), 
    color character varying(50), 
    category character varying(100),
    PRIMARY KEY (productkey));
---dwh tables to load data from staging schema
     
CREATE TABLE IF NOT EXISTS dwh.product ( 
    productkey integer,
    productname character varying(200), 
    color character varying(50), 
    category character varying(100),
    PRIMARY KEY (productkey)); 

Incremental processing in the data warehouse

We load the target data warehouse using stored procedures to perform upserts (deletes and inserts performed in a single transaction):

CREATE OR REPLACE PROCEDURE staging.load_order() LANGUAGE plpgsql AS $$
DECLARE
BEGIN

TRUNCATE TABLE staging.order;

--insert new records to get new ids
insert into staging.orderkey
(
orderid
)
select
c.orderid
from landing.order c
LEFT JOIN staging.orderkey i
ON c.orderid=i.orderid
where i.orderid IS NULL;

--update the max key
update staging.order_maxkey
set ordermaxkey = (select max(orderkey) from staging.orderkey);


insert into staging.order
(
orderkey,
customerkey,
productkey,
addresskey,
datekey,
numberofitems,
price
)
select
xid.orderkey,
cid.customerkey,
pid.productkey,
aid.addresskey,
d.datekey,
o.numberofitems,
p.price
from
landing.order o
join staging.orderkey xid on o.orderid=xid.orderid
join landing.customer c on substring(o.pk,6,length(o.pk))=c.username   ---order table needs username
join staging.customerkey cid on cid.username=c.username
join landing.address a on a.username=c.username
join staging.addresskey aid on aid.pk=a.buildingnumber::varchar+'||'+a.postcode  ---maybe change pk to addressid
join staging.datekey d on d.orderdate=o.orderdate
join landing.product p on p.orderid=o.orderid
join staging.productkey pid on pid.productid=p.productid;

COMMIT;

END;
$$ 
CREATE OR REPLACE PROCEDURE dwh.load_order() LANGUAGE plpgsql AS $$
DECLARE
BEGIN

---delete old records 
delete from dwh.order
using staging.order as stage
where dwh.order.orderkey=stage.orderkey;

--insert new and modified
insert into dwh.order
(
orderkey,
customerkey,  
productkey,
addresskey,
price,
datekey  
)
select
orderkey,
customerkey,  
productkey,
addresskey,
price,
datekey
from staging.order;

COMMIT;
END;
$$

Use Step Functions to orchestrate the data pipeline

So far, we have stepped through each component in our workflow. We now need to stitch them together to build an automated, idempotent data pipeline. A good orchestration tool must manage failures, retries, parallelization, service integrations, and observability, so developers can focus solely on the business logic. Ideally, the workflow we build is also serverless so there is no operational overhead. Step Functions is an ideal choice to automate our data pipeline. It allows us to integrate the ELT components we have built on AWS Glue and Amazon Redshift and conduct some steps in parallel to optimize performance.

  1. On the Step Functions console, create a new state machine.
  2. Select Write your workflow in code.

  1. Enter the stepfunction_workflow.json code into the definition, replacing all placeholders with the appropriate values:
    1. [REDSHIFT-CLUSTER-IDENTIFIER] – Use the value for ClusterName (from the Outputs tab in the CloudFormation stack).
    2. [REDSHIFT-DATABASE] – Use the value for salesdwh (unless changed, this is the default database in the CloudFormation template).

We use the Step Functions IAM role from the CloudFormation template.

This JSON code generates the following pipeline.

Starting from the top, the workflow contains the following steps:

  1. We archive any existing raw and data lake files.
  2. We add two AWS Glue StartJobRun tasks that run sequentially: first to export the data from DynamoDB to our raw bucket, then from the raw bucket to our data lake.
  3. After that, we parallelize the landing of data from Amazon S3 to Amazon Redshift.
  4. We transform and load the data into our data warehouse using the Amazon Redshift Data API. Because this is asynchronous, we need to check the status of the runs before moving down the pipeline.
  5. After we move the data load from landing to staging, we truncate the landing tables.
  6. We load the dimensions of our target data warehouse (dwh) first, and finally we load our single fact table with its foreign key dependency on the preceding dimension tables.

The following figure illustrates a successful run.

After we set up the workflow, we can use EventBridge to schedule a daily midnight run, where the target is a Step Functions StartExecution API calling our state machine. Under the workflow permissions, choose Create a new role for this schedule and optionally rename it.

Query the data warehouse

We can verify the data has been successfully loaded into Amazon Redshift with a query.

After we have the data loaded into Amazon Redshift, we’re ready to answer the query asked at the start of this post: what is the year-on-year quarterly sales growth by product and country? The query looks like the following code (depending on your dataset, you may need to select alternative years and quarters):

with sales2021q2
as
(
  select d.year, d.quarter,a.country,p.category,sum(o.price) as revenue2021q2
  from dwh.order o
  join dwh.date d on o.datekey=d.datekey
  join dwh.product p on o.productkey=p.productkey
  join dwh.address a on a.addresskey=o.addresskey
  where d.year=2021 and d.quarter=2
  group by d.year, d.quarter,a.country,p.category
  ),
sales2022q2
as
(
  select d.year, d.quarter,a.country,p.category,sum(o.price) as revenue2022q2
  from dwh.order o
  join dwh.date d on o.datekey=d.datekey
  join dwh.product p on o.productkey=p.productkey
  join dwh.address a on a.addresskey=o.addresskey
  where d.year=2022 and d.quarter=2
  group by d.year, d.quarter,a.country,p.category
  )

select a.country,a.category, ((revenue2022q2 - revenue2021q2)/revenue2021q2)*100 as quarteronquartergrowth
from sales2022q2 a
join sales2021q2 b on a.country=b.country and a.category=b.category
order by a.country,a.category

We can visualize the results in Amazon Redshift Query Editor V2 by toggling the chart option and setting Type as Pie, Values as quarteronquartergrowth, and Labels as category.

Cost considerations

We give a brief outline of the indicative costs associated with the key services covered in our solution based on us-east-1 Region pricing using the AWS Pricing Calculator:

  • DynamoDB – With on-demand settings for 1.5 million items (average size of 355 bytes) and associated write and read capacity plus PITR storage, the cost of DynamoDB is approximately $2 per month.
  • AWS Glue DynamoDB export connector – This connector utilizes the DynamoDB export to Amazon S3 feature. This has no hourly cost—you only pay for the gigabytes of data exported to Amazon S3 ($0.11 per GiB).
  • Amazon S3 – You pay for storing objects in your S3 buckets. The rate you’re charged depends on your objects’ size, how long you stored the objects during the month, and the storage class. In our solution, we used S3 Standard for our data lake and S3 Standard – Infrequent Access for archive. Standard-IA storage is $0.0125 per GB/month; Standard storage is $0.023 per GB/month.
  • AWS Glue Jobs – With AWS Glue, you only pay for the time your ETL job takes to run. There are no resources to manage, no upfront costs, and you are not charged for startup or shutdown time. AWS charges you an hourly rate based on the number of Data Processing Units (DPUs) used to run your ETL job. A single DPU provides 4 vCPU and 16 GB of memory. Every one of our nine Spark jobs uses 10 DPUs and has an average runtime of 3 minutes. This gives an approximate cost of $0.29 per job.
  • Amazon Redshift – We provisioned two RA3.4xlarge nodes for our Amazon Redshift cluster. If run on-demand, each node costs $3.26 per hour. If utilized 24/7, our monthly cost would be approximately $4,759.60. You should evaluate your workload to determine what cost savings can be achieved by using Amazon Redshift Serverless or using Amazon Redshift provisioned reserved instances.
  • Step Functions – You are charged based on the number of state transitions required to run your application. Step Functions counts a state transition as each time a step of your workflow is run. You’re charged for the total number of state transitions across all your state machines, including retries. The Step Functions free tier includes 4,000 free state transitions per month. Thereafter, it’s $0.025 per 1,000 state transitions.

Clean up

Remember to delete any resources created through the CloudFormation stack. You first need to manually empty and delete the S3 buckets. Then you can delete the CloudFormation stack using the AWS CloudFormation console or AWS Command Line Interface (AWS CLI). For instructions, refer to Clean up your “hello, world!” application and related resources.

Summary

In this post, we demonstrated how you can export data from DynamoDB to Amazon S3 and Amazon Redshift to perform advanced analytics. We built an automated data pipeline that you can use to perform a batch ELT process that can be scheduled to run daily, weekly, or monthly and can scale to handle very large workloads.

Please leave your feedback or comments in the comments section.


About the Author

Altaf Hussain is an Analytics Specialist Solutions Architect at AWS. He helps customers around the globe design and optimize their big data and data warehousing solutions.


Appendix

To extract the data from DynamoDB and load it into our Amazon Redshift database, we can use the Spark script editor and upload the files from GlueSparkJobs.zip to create each individual job necessary to perform the extract and load. If you choose to do this, remember to update, where appropriate, the account ID and Region placeholders in the scripts. Also, on the Job details tab under Advanced properties, add the Amazon Redshift connection.

Join a streaming data source with CDC data for real-time serverless data analytics using AWS Glue, AWS DMS, and Amazon DynamoDB

Post Syndicated from Manish Kola original https://aws.amazon.com/blogs/big-data/join-streaming-source-cdc-glue/

Customers have been using data warehousing solutions to perform their traditional analytics tasks. Recently, data lakes have gained lot of traction to become the foundation for analytical solutions, because they come with benefits such as scalability, fault tolerance, and support for structured, semi-structured, and unstructured datasets.

Data lakes are not transactional by default; however, there are multiple open-source frameworks that enhance data lakes with ACID properties, providing a best of both worlds solution between transactional and non-transactional storage mechanisms.

Traditional batch ingestion and processing pipelines that involve operations such as data cleaning and joining with reference data are straightforward to create and cost-efficient to maintain. However, there is a challenge to ingest datasets, such as Internet of Things (IoT) and clickstreams, at a fast rate with near-real-time delivery SLAs. You will also want to apply incremental updates with change data capture (CDC) from the source system to the destination. To make data-driven decisions in a timely manner, you need to account for missed records and backpressure, and maintain event ordering and integrity, especially if the reference data also changes rapidly.

In this post, we aim to address these challenges. We provide a step-by-step guide to join streaming data to a reference table changing in real time using AWS Glue, Amazon DynamoDB, and AWS Database Migration Service (AWS DMS). We also demonstrate how to ingest streaming data to a transactional data lake using Apache Hudi to achieve incremental updates with ACID transactions.

Solution overview

For our example use case, streaming data is coming through Amazon Kinesis Data Streams, and reference data is managed in MySQL. The reference data is continuously replicated from MySQL to DynamoDB through AWS DMS. The requirement here is to enrich the real-time stream data by joining with the reference data in near-real time, and to make it queryable from a query engine such as Amazon Athena while keeping consistency. In this use case, reference data in MySQL can be updated when the requirement is changed, and then queries need to return results by reflecting updates in the reference data.

This solution addresses the issue of users wanting to join streams with changing reference datasets when the size of the reference dataset is small. The reference data is maintained in DynamoDB tables, and the streaming job loads the full table into memory for each micro-batch, joining a high-throughput stream to a small reference dataset.

The following diagram illustrates the solution architecture.

Architecture

Prerequisites

For this walkthrough, you should have the following prerequisites:

Create IAM roles and S3 bucket

In this section, you create an Amazon Simple Storage Service (Amazon S3) bucket and two AWS Identity and Access Management (IAM) roles: one for the AWS Glue job, and one for AWS DMS. We do this using an AWS CloudFormation template. Complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack::
  3. Choose Next.
  4. For Stack name, enter a name for your stack.
  5. For DynamoDBTableName, enter tgt_country_lookup_table. This is the name of your new DynamoDB table.
  6. For S3BucketNamePrefix, enter the prefix of your new S3 bucket.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  8. Choose Create stack.

Stack creation can take about 1 minute.

Create a Kinesis data stream

In this section, you create a Kinesis data stream:

  1. On the Kinesis console, choose Data streams in the navigation pane.
  2. Choose Create data stream.
  3. For Data stream name, enter your stream name.
  4. Leave the remaining settings as default and choose Create data stream.

A Kinesis data stream is created with on-demand mode.

Create and configure an Aurora MySQL cluster

In this section, you create and configure an Aurora MySQL cluster as the source database. First, configure your source Aurora MySQL database cluster to enable CDC through AWS DMS to DynamoDB.

Create a parameter group

Complete the following steps to create a new parameter group:

  1. On the Amazon RDS console, choose Parameter groups in the navigation pane.
  2. Choose Create parameter group.
  3. For Parameter group family, select aurora-mysql5.7.
  4. For Type, choose DB Cluster Parameter Group.
  5. For Group name, enter my-mysql-dynamodb-cdc.
  6. For Description, enter Parameter group for demo Aurora MySQL database.
  7. Choose Create.
  8. Select my-mysql-dynamodb-cdc, and choose Edit under Parameter group actions.
  9. Edit the parameter group as follows:
Name Value
binlog_row_image full
binlog_format ROW
binlog_checksum NONE
log_slave_updates 1
  1. Choose Save changes.

RDS parameter group

Create the Aurora MySQL cluster

Complete following steps to create the Aurora MySQL cluster:

  1. On the Amazon RDS console, choose Databases in the navigation pane.
  2. Choose Create database.
  3. For Choose a database creation method, choose Standard create.
  4. Under Engine options, for Engine type, choose Aurora (MySQL Compatible).
  5. For Engine version, choose Aurora (MySQL 5.7) 2.11.2.
  6. For Templates, choose Production.
  7. Under Settings, for DB cluster identifier, enter a name for your database.
  8. For Master username, enter your primary user name.
  9. For Master password and Confirm master password, enter your primary password.
  10. Under Instance configuration, for DB instance class, choose Burstable classes (includes t classes) and choose db.t3.small.
  11. Under Availability & durability, for Multi-AZ deployment, choose Don’t create an Aurora Replica.
  12. Under Connectivity, for Compute resource, choose Don’t connect to an EC2 compute resource.
  13. For Network type, choose IPv4.
  14. For Virtual private cloud (VPC), choose your VPC.
  15. For DB subnet group, choose your public subnet.
  16. For Public access, choose Yes.
  17. For VPC security group (firewall), choose the security group for your public subnet.
  18. Under Database authentication, for Database authentication options, choose Password authentication.
  19. Under Additional configuration, for DB cluster parameter group, choose the cluster parameter group you created earlier.
  20. Choose Create database.

Grant permissions to the source database

The next step is to grant the required permission on the source Aurora MySQL database. Now you can connect to the DB cluster using the MySQL utility. You can run queries to complete the following tasks:

  • Create a demo database and table and run queries on the data
  • Grant permission for a user used by the AWS DMS endpoint

Complete the following steps:

  1. Log in to the EC2 instance that you’re using to connect to your DB cluster.
  2. Enter the following command at the command prompt to connect to the primary DB instance of your DB cluster:
$ mysql -h mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com -P 3306 -u admin -p
  1. Run the following SQL command to create a database:
> CREATE DATABASE mydev;
  1. Run the following SQL command to create a table:
> use mydev; 
> CREATE TABLE country_lookup_table
(
code varchar(5),
countryname varchar(40) not null,
combinedname varchar(40) not null
);
  1. Run the following SQL command to populate the table with data:
> INSERT INTO country_lookup_table(code, countryname, combinedname) VALUES ('IN', 'India', 'IN-India'), ('US', 'USA', 'US-USA'), ('CA', 'Canada', 'CA-Canada'), ('CN', 'China', 'CN-China');
  1. Run the following SQL command to create a user for the AWS DMS endpoint and grant permissions for CDC tasks (replace the placeholder with your preferred password):
> CREATE USER repl IDENTIFIED BY '<your-password>';
> GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'repl'@'%';
> GRANT SELECT ON mydev.country_lookup_table TO 'repl'@'%';

Create and configure AWS DMS resources to load data into the DynamoDB reference table

In this section, you create and configure AWS DMS to replicate data into the DynamoDB reference table.

Create an AWS DMS replication instance

First, create an AWS DMS replication instance by completing the following steps:

  1. On the AWS DMS console, choose Replication instances in the navigation pane.
  2. Choose Create replication instance.
  3. Under Settings, for Name, enter a name for your instance.
  4. Under Instance configuration, for High Availability, choose Dev or test workload (Single-AZ).
  5. Under Connectivity and security, for VPC security groups, choose default.
  6. Choose Create replication instance.

Create Amazon VPC endpoints

Optionally, you can create Amazon VPC endpoints for DynamoDB when you need to connect to your DynamoDB table from the AWS DMS instance in a private network. Also make sure that you enable Publicly accessible when you need to connect to a database outside of your VPC.

Create an AWS DMS source endpoint

Create an AWS DMS source endpoint by completing the following steps:

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Choose Create endpoint.
  3. For Endpoint type, choose Source endpoint.
  4. Under Endpoint configuration, for Endpoint identifier, enter a name for your endpoint.
  5. For Source engine, choose Amazon Aurora MySQL.
  6. For Access to endpoint database, choose Provide access information manually.
  7. For Server Name, enter the endpoint name of your Aurora writer instance (for example, mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com).
  8. For Port, enter 3306.
  9. For User name, enter a user name for your AWS DMS task.
  10. For Password, enter a password.
  11. Choose Create endpoint.

Crate an AWS DMS target endpoint

Create an AWS DMS target endpoint by completing the following steps:

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Choose Create endpoint.
  3. For Endpoint type, choose Target endpoint.
  4. Under Endpoint configuration, for Endpoint identifier, enter a name for your endpoint.
  5. For Target engine, choose Amazon DynamoDB.
  6. For Service access role ARN, enter the IAM role for your AWS DMS task.
  7. Choose Create endpoint.

Create AWS DMS migration tasks

Create AWS DMS database migration tasks by completing the following steps:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Choose Create task.
  3. Under Task configuration, for Task identifier, enter a name for your task.
  4. For Replication instance, choose your replication instance.
  5. For Source database endpoint, choose your source endpoint.
  6. For Target database endpoint, choose your target endpoint.
  7. For Migration type, choose Migrate existing data and replicate ongoing changes.
  8. Under Task settings, for Target table preparation mode, choose Do nothing.
  9. For Stop task after full load completes, choose Don’t stop.
  10. For LOB column settings, choose Limited LOB mode.
  11. For Task logs, enable Turn on CloudWatch logs and Turn on batch-optimized apply.
  12. Under Table mappings, choose JSON Editor and enter the following rules.

Here you can add values to the column. With the following rules, the AWS DMS CDC task will first create a new DynamoDB table with the specified name in target-table-name. Then it will replicate all the records, mapping the columns in the DB table to the attributes in the DynamoDB table.

{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "object-locator": {
                "schema-name": "mydev",
                "table-name": "country_lookup_table"
            },
            "rule-action": "include"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "2",
            "rule-action": "map-record-to-record",
            "object-locator": {
                "schema-name": "mydev",
                "table-name": "country_lookup_table"
            },
            "target-table-name": "tgt_country_lookup_table",
            "mapping-parameters": {
                "partition-key-name": "code",
                "sort-key-name": "countryname",
                "exclude-columns": [
                    "code",
                    "countryname"
                ],
                "attribute-mappings": [
                    {
                        "target-attribute-name": "code",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${code}"
                    },
                    {
                        "target-attribute-name": "countryname",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${countryname}"
                    }
                ],
                "apply-during-cdc": true
            }
        }
    ]
}

DMS table mapping

  1. Choose Create task.

Now the AWS DMS replication task has been started.

  1. Wait for the Status to show as Load complete.

DMS task

  1. On the DynamoDB console, choose Tables in the navigation pane.
  2. Select the DynamoDB reference table, and choose Explore table items to review the replicated records.

DynamoDB reference table initial

Create an AWS Glue Data Catalog table and an AWS Glue streaming ETL job

In this section, you create an AWS Glue Data Catalog table and an AWS Glue streaming extract, transform, and load (ETL) job.

Create a Data Catalog table

Create an AWS Glue Data Catalog table for the source Kinesis data stream with the following steps:

  1. On the AWS Glue console, choose Databases under Data Catalog in the navigation pane.
  2. Choose Add database.
  3. For Name, enter my_kinesis_db.
  4. Choose Create database.
  5. Choose Tables under Databases, then choose Add table.
  6. For Name, enter my_stream_src_table.
  7. For Database, choose my_kinesis_db.
  8. For Select the type of source, choose Kinesis.
  9. For Kinesis data stream is located in, choose my account.
  10. For Kinesis stream name, enter a name for your data stream.
  11. For Classification, select JSON.
  12. Choose Next.
  13. Choose Edit schema as JSON, enter the following JSON, then choose Save.
[
  {
    "Name": "uuid",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "country",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "itemtype",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "saleschannel",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "orderpriority",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "orderdate",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "region",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "shipdate",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "unitssold",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "unitprice",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "unitcost",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "totalrevenue",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "totalcost",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "totalprofit",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "impressiontime",
    "Type": "string",
    "Comment": ""
  }
]

Glue Catalog table schema

    1. Choose Next, then choose Create.

Create an AWS Glue streaming ETL job

Next, you create an AWS Glue streaming job. AWS Glue 3.0 and later supports Apache Hudi natively, so we use this native integration to ingest into a Hudi table. Complete the following steps to create the AWS Glue streaming job:

  1. On the AWS Glue Studio console, choose Spark script editor and choose Create.
  2. Under Job details tab, for Name, enter a name for your job.
  3. For IAM Role, choose the IAM role for your AWS Glue job.
  4. For Type, select Spark Streaming.
  5. For Glue version, choose Glue 4.0 – Supports spark 3.3, Scala 2, Python 3.
  6. For Requested number of workers, enter 3.
  7. Under Advanced properties, for Job parameters, choose Add new parameter.
  8. For Key, enter --conf.
  9. For Value, enter spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false.
  10. Choose Add new parameter.
  11. For Key, enter --datalake-formats.
  12. For Value, enter hudi.
  13. For Script path, enter s3://<S3BucketName>/scripts/.
  14. For Temporary path, enter s3://<S3BucketName>/temporary/.
  15. Optionally, for Spark UI logs path, enter s3://<S3BucketName>/sparkHistoryLogs/.

Glue job parameter

  1. On the Script tab, enter the following script into the AWS Glue Studio editor and choose Create.

The near-real-time streaming job enriches data by joining a Kinesis data stream with a DynamoDB table that contains frequently updated reference data. The enriched dataset is loaded into the target Hudi table in the data lake. Replace <S3BucketName> with your bucket that you created via AWS CloudFormation:

import sys, json
import boto3
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv,["JOB_NAME"])

# Initialize spark session and Glue context
sc = SparkContext() 
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# job paramters
dydb_lookup_table = "tgt_country_lookup_table"
kin_src_database_name = "my_kinesis_db" 
kin_src_table_name = "my_stream_src_table" 
hudi_write_operation = "upsert" 
hudi_record_key = "uuid" 
hudi_precomb_key = "orderdate" 
checkpoint_path = "s3://<S3BucketName>/streamlab/checkpoint/" 
s3_output_folder = "s3://<S3BucketName>/output/"
hudi_table = "hudi_table"
hudi_database = "my_kinesis_db"

# hudi options 
additional_options={
    "hoodie.datasource.hive_sync.use_jdbc": "false",
    "hoodie.datasource.write.recordkey.field": hudi_record_key,
    "hoodie.datasource.hive_sync.database": hudi_database,
    "hoodie.table.name": hudi_table,
    "hoodie.consistency.check.enabled": "true",
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.NonPartitionedExtractor",
    "hoodie.datasource.write.hive_style_partitioning": "false",
    "hoodie.datasource.write.precombine.field": hudi_precomb_key,
    "hoodie.bulkinsert.shuffle.parallelism": "4",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.write.operation": hudi_write_operation,
    "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
}

# Scan and load the reference data table from DynamoDB into AWS Glue DynamicFrames using boto3 API.
def readDynamoDb():
    dynamodb = boto3.resource(“dynamodb”)
    table = dynamodb.Table(dydb_lookup_table)
    response = table.scan()
    items = response[“Items”]
    jsondata = sc.parallelize(items)
    lookupDf = glueContext.read.json(jsondata)
    return lookupDf


# Load the Amazon Kinesis data stream from Amazon Glue Data Catalog.
source_df = glueContext.create_data_frame.from_catalog(
    database=kin_src_database_name,
    table_name=kin_src_table_name,
    transformation_ctx=”source_df”,
    additional_options={“startingPosition”: “TRIM_HORIZON”},
)

# As part of batch processing, implement the transformation logic for joining streaming data frames with reference data frames.
def processBatch(data_frame, batchId):
    if data_frame.count() > 0:

        # Refresh the dymanodb table to pull latest snapshot for each microbatch
        country_lookup_df = readDynamoDb() 
                
        final_frame = data_frame.join(
            country_lookup_df, 
            data_frame["country"] == country_lookup_df["countryname"], 
            'left'
        ).drop(
            "countryname",
            "country",
            "unitprice", 
            "unitcost",
            "totalrevenue",
            "totalcost",
            "totalprofit"
        )

        # Script generated for node my-lab-hudi-connector
        final_frame.write.format("hudi") \
            .options(**additional_options) \
            .mode("append") \
            .save(s3_output_folder)
        
try:
    glueContext.forEachBatch(
        frame=source_df,
        batch_function=processBatch,
        options={"windowSize": "60 seconds", "checkpointLocation": checkpoint_path},
    )
except Exception as e:
    print(f"Error is @@@ ....{e}")
  1. Choose Run to start the streaming job.

The following screenshot shows examples of the DataFrames data_frame, country_lookup_df, and final_frame.

Glue job log output initial

The AWS Glue job successfully joined records coming from the Kinesis data stream and the reference table in DynamoDB, and then ingested the joined records into Amazon S3 in Hudi format.

Create and run a Python script to generate sample data and load it into the Kinesis data stream

In this section, you create and run a Python to generate sample data and load it into the source Kinesis data stream. Complete the following steps:

  1. Log in to AWS Cloud9, your EC2 instance, or any other computing host that puts records in your data stream.
  2. Create a Python file called generate-data-for-kds.py:
$ python3 generate-data-for-kds.py
  1. Open the Python file and enter the following script:
import json
import random
import boto3
import time

STREAM_NAME = "<mystreamname>"

def get_data():
    return {
        "uuid": random.randrange(0, 1000001, 1),
        "country": random.choice( [ "United Arab Emirates", "China", "India", "United Kingdom", "United States of America", ] ),
        "itemtype": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ),
        "saleschannel": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ),
        "orderpriority": random.choice(["H", "L", "M", "C"]),
        "orderdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12",
                                      "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", 
                                      "7/15/14", "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", 
                                      "1/27/17", "2/25/17", "3/10/17", "4/1/17", ] ),
        "region": random.choice( ["Asia" "Europe", "Americas", "Middle Eastern", "Africa"] ),
        "shipdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12", 
                                    "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", "7/15/14",
                                      "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", "1/27/17", 
                                      "2/25/17", "3/10/17", "4/1/17", ] ),
        "unitssold": random.choice( [ "8217", "3465", "8877", "2882", "70", "7044", "6307", "2384", "1327", "2572", "8794", 
                                     "4131", "5793", "9091", "4314", "9085", "5270", "5459", "1982", "8245", "4860", "4656", 
                                     "8072", "65", "7864", "9778", ] ),
        "unitprice": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", 
                                     "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", 
                                     "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ),
        "unitcost": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", 
                                    "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", 
                                    "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ),
        "totalrevenue": random.choice( [ "1253749.86", "712750.5", "3745117.53", "1925954.14", "30604", "1448950.8", 
                                        "689228.96", "22242.72", "145014.56", "23996.76", "961008.32", "337626.63", 
                                        "1478837.04", "6075242.57", "887389.8", "742517.05", "3431876.7", "3648085.93", 
                                        "161988.86", "673863.85", "1240660.8", "380534.88", "882108.16", "16593.2", 
                                        "5255275.28", "463966.1", ] ),
        "totalcost": random.choice( [ "800664.48", "405786.15", "3237353.13", "1448320.28", "18433.1", "824922.84", 
                                     "226042.88", "16497.28", "47559.68", "17798.24", "315176.96", "234103.77", "923520.06",
                                       "4568591.14", "505212.54", "514846.95", "2766539.2", "2743365.86", 
                                       "112319.94", "467244.15", "774781.2", "263855.52", "289300.48", "10362.3", 
                                       "3951974.56", "310842.62", ] ),
        "totalprofit": random.choice( [ "453085.38", "306964.35", "507764.4", "477633.86", "12170.9", "624027.96", 
                                       "463186.08", "5745.44", "97454.88", "6198.52", "645831.36", "103522.86", "555316.98", 
                                       "1506651.43", "382177.26", "227670.1", "665337.5", "904720.07", "49668.92", "206619.7",
                                         "465879.6", "116679.36", "592807.68", "6230.9", "1303300.72", "153123.48", ] ),
        "impressiontime": random.choice( [ "2022-10-24T02:27:41Z", "2022-10-24T02:27:41Z", "2022-11-24T02:27:41Z", 
                                          "2022-12-24T02:27:41Z", "2022-13-24T02:27:41Z", "2022-14-24T02:27:41Z", 
                                          "2022-15-24T02:27:41Z", ] ),
    }

def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
        )
        time.sleep(2)

if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))

This script puts a Kinesis data stream record every 2 seconds.

Simulate updating the reference table in the Aurora MySQL cluster

Now all the resources and configurations are ready. For this example, we want to add a 3-digit country code to the reference table. Let’s update records in the Aurora MySQL table to simulate changes. Complete the following steps:

  1. Make sure that the AWS Glue streaming job is already running.
  2. Connect to the primary DB instance again, as described earlier.
  3. Enter your SQL commands to update records:
> UPDATE country_lookup_table SET combinedname='US-USA-US' WHERE code='US';
> UPDATE country_lookup_table SET combinedname='CA-CAN-Canada' WHERE code='CA';
> UPDATE country_lookup_table SET combinedname='CN-CHN-China' WHERE code='CN';
> UPDATE country_lookup_table SET combinedname='IN-IND-India' WHERE code='IN';

Now the reference table in the Aurora MySQL source database has been updated. Then the changes are automatically replicated to the reference table in DynamoDB.

DynamoDB reference table updated

The following tables show records in data_frame, country_lookup_df, and final_frame. In country_lookup_df and final_frame, the combinedname column has values formatted as <2-digit-country-code>-<3-digit-country-code>-<country-name>, which shows that the changed records in the referenced table are reflected in the table without restarting the AWS Glue streaming job. It means that the AWS Glue job successfully joins the incoming records from the Kinesis data stream with the reference table even when the reference table is changing.
Glue job log output updated

Query the Hudi table using Athena

Let’s query the Hudi table using Athena to see the records in the destination table. Complete the following steps:

  1. Make sure that the script and the AWS Glue Streaming job is still working:
    1. The Python script (generate-data-for-kds.py) is still running.
    2. The generated data is being sent to the data stream.
    3. The AWS Glue streaming job is still running.
  2. On the Athena console, run the following SQL in the query editor:
select shipdate, unitssold, impressiontime, code,combinedname from <database>.<table>
where combinedname is not null
limit 10;

The following query result shows the records that are processed before the referenced table was changed. Records in the combinedname column are similar to <2-digit-country-code>-<country-name>.

Athena query result initial

The following query result shows the records that are processed after the referenced table was changed. Records in the combinedname column are similar to <2-digit-country-code>-<3-digit-country-code>-<country-name>.

Athena query result updated

Now you understand that the changed reference data is successfully reflected in the target Hudi table joining records from the Kinesis data stream and the reference data in DynamoDB.

Clean up

As the final step, clean up the resources:

  1. Delete the Kinesis data stream.
  2. Delete the AWS DMS migration task, endpoint, and replication instance.
  3. Stop and delete the AWS Glue streaming job.
  4. Delete the AWS Cloud9 environment.
  5. Delete the CloudFormation template.

Conclusion

Building and maintaining a transactional data lake that involves real-time data ingestion and processing has multiple variable components and decisions to be made, such as what ingestion service to use, how to store your reference data, and what transactional data lake framework to use. In this post, we provided the implementation details of such a pipeline, using AWS native components as the building blocks and Apache Hudi as the open-source framework for a transactional data lake.

We believe that this solution can be a starting point for organizations looking to implement a new data lake with such requirements. Additionally, the different components are fully pluggable and can be mixed and matched to existing data lakes to target new requirements or migrate existing ones, addressing their pain points.


About the authors

Manish Kola is a Data Lab Solutions Architect at AWS, where he works closely with customers across various industries to architect cloud-native solutions for their data analytics and AI needs. He partners with customers on their AWS journey to solve their business problems and build scalable prototypes. Before joining AWS, Manish’s experience includes helping customers implement data warehouse, BI, data integration, and data lake projects.

Santosh Kotagiri is a Solutions Architect at AWS with experience in data analytics and cloud solutions leading to tangible business results. His expertise lies in designing and implementing scalable data analytics solutions for clients across industries, with a focus on cloud-native and open-source services. He is passionate about leveraging technology to drive business growth and solve complex problems.

Chiho Sugimoto is a Cloud Support Engineer on the AWS Big Data Support team. She is passionate about helping customers build data lakes using ETL workloads. She loves planetary science and enjoys studying the asteroid Ryugu on weekends.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Introducing the Enhanced Document API for DynamoDB in the AWS SDK for Java 2.x

Post Syndicated from John Viegas original https://aws.amazon.com/blogs/devops/introducing-the-enhanced-document-api-for-dynamodb-in-the-aws-sdk-for-java-2-x/

We are excited to announce that the AWS SDK for Java 2.x now offers the Enhanced Document API for DynamoDB, providing an enhanced way of working with Amazon DynamoDb items.
This post covers using the Enhanced Document API for DynamoDB with the DynamoDB Enhanced Client. By using the Enhanced Document API, you can create an EnhancedDocument instance to represent an item with no fixed schema, and then use the DynamoDB Enhanced Client to read and write to DynamoDB.
Furthermore, unlike the Document APIs of aws-sdk-java 1.x, which provided arguments and return types that were not type-safe, the EnhancedDocument provides strongly-typed APIs for working with documents. This interface simplifies the development process and ensures that the data is correctly typed.

Prerequisites:

Before getting started, ensure you are using an up-to-date version of the AWS Java SDK dependency with all the latest released bug-fixes and features. For Enhanced Document API support, you must use version 2.20.33 or later. See our “Set up an Apache Maven project” guide for details on how to manage the AWS Java SDK dependency in your project.

Add dependency for dynamodb-enhanced in pom.xml.

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb-enhanced</artifactId>
<version>2.20.33</version>
</dependency>

Quick walk-through for using Enhanced Document API to interact with DDB

Step 1 : Create a DynamoDB Enhanced Client

Create an instance of the DynamoDbEnhancedClient class, which provides a high-level interface for Amazon DynamoDB that simplifies working with DynamoDB tables.

DynamoDbEnhancedClient enhancedClient = DynamoDbEnhancedClient.builder()
                                               .dynamoDbClient(DynamoDbClient.create())
                                               .build();

Step 2 : Create a DynamoDbTable resource object with Document table schema

To execute commands against a DynamoDB table using the Enhanced Document API, you must associate the table with your Document table schema to create a DynamoDbTable resource object. The Document table schema builder requires the primary index key and attribute converter providers. Use AttributeConverterProvider.defaultProvider() to convert document attributes of default types. An optional secondary index key can be added to the builder.


DynamoDbTable<EnhancedDocument> documentTable = enhancedClient.table("my_table",
                                              TableSchema.documentSchemaBuilder()
                                                         .addIndexPartitionKey(TableMetadata.primaryIndexName(),"hashKey", AttributeValueType.S)
                                                         .addIndexSortKey(TableMetadata.primaryIndexName(), "sortKey", AttributeValueType.N)
                                                         .attributeConverterProviders(AttributeConverterProvider.defaultProvider())
                                                         .build());
                                                         
// call documentTable.createTable() if "my_table" does not exist in DynamoDB

Step 3 : Write a DynamoDB item using an EnhancedDocument

The EnhancedDocument class has static factory methods along with a builder method to add attributes to a document. The following snippet demonstrates the type safety provided by EnhancedDocument when you construct a document item.

EnhancedDocument simpleDoc = EnhancedDocument.builder()
 .attributeConverterProviders(defaultProvider())
 .putString("hashKey", "sampleHash")
 .putNull("nullKey")
 .putNumber("sortKey", 1.0)
 .putBytes("byte", SdkBytes.fromUtf8String("a"))
 .putBoolean("booleanKey", true)
 .build();
 
documentTable.putItem(simpleDoc);

Step 4 : Read a Dynamo DB item as an EnhancedDocument

Attributes of the Documents retrieved from a DynamoDB table can be accessed with getter methods

EnhancedDocument docGetItem = documentTable.getItem(r -> r.key(k -> k.partitionValue("samppleHash").sortValue(1)));

docGetItem.getString("hashKey");
docGetItem.isNull("nullKey")
docGetItem.getNumber("sortKey").floatValue();
docGetItem.getBytes("byte");
docGetItem.getBoolean("booleanKey"); 

AttributeConverterProviders for accessing document attributes as custom objects

You can provide a custom AttributeConverterProvider instance to an EnhancedDocument to convert document attributes to a specific object type.
These providers can be set on either DocumentTableSchema or EnhancedDocument to read or write attributes as custom objects.

TableSchema.documentSchemaBuilder()
           .attributeConverterProviders(CustomClassConverterProvider.create(), defaultProvider())
           .build();
    
// Insert a custom class instance into an EnhancedDocument as attribute 'customMapOfAttribute'.
EnhancedDocument customAttributeDocument =
EnhancedDocument.builder().put("customMapOfAttribute", customClassInstance, CustomClass.class).build();

// Retrieve attribute 'customMapOfAttribute' as CustomClass object.
CustomClass customClassObject = customAttributeDocument.get("customMapOfAttribute", CustomClass.class);

Convert Documents to JSON and vice-versa

The Enhanced Document API allows you to convert a JSON string to an EnhancedDocument and vice-versa.

// Enhanced document created from JSON string using defaultConverterProviders.
EnhancedDocument documentFromJson = EnhancedDocument.fromJson("{\"key\": \"Value\"}")
                                              
// Converting an EnhancedDocument to JSON string "{\"key\": \"Value\"}"                                                 
String jsonFromDocument = documentFromJson.toJson();

Define a Custom Attribute Converter Provider

Custom attribute converter providers are implementations of AttributeConverterProvider that provide converters for custom classes.
Below is an example for a CustomClassForDocumentAPI which has as a single field stringAttribute of type String and its corresponding AttributeConverterProvider implementation.

public class CustomClassForDocumentAPI {
    private final String stringAttribute;

    public CustomClassForDocumentAPI(Builder builder) {
        this.stringAttribute = builder.stringAttribute;
    }
    public static Builder builder() {
        return new Builder();
    }
    public String stringAttribute() {
        return stringAttribute;
    }
    public static final class Builder {
        private String stringAttribute;
        private Builder() {
        }
        public Builder stringAttribute(String stringAttribute) {
            this.stringAttribute = string;
            return this;
        }
        public CustomClassForDocumentAPI build() {
            return new CustomClassForDocumentAPI(this);
        }
    }
}
import java.util.Map;
import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
import software.amazon.awssdk.utils.ImmutableMap;

public class CustomAttributeForDocumentConverterProvider implements AttributeConverterProvider {
    private final Map<EnhancedType<?>, AttributeConverter<?>> converterCache = ImmutableMap.of(
        EnhancedType.of(CustomClassForDocumentAPI.class), new CustomClassForDocumentAttributeConverter());
        // Different types of converters can be added to this map.

    public static CustomAttributeForDocumentConverterProvider create() {
        return new CustomAttributeForDocumentConverterProvider();
    }

    @Override
    public <T> AttributeConverter<T> converterFor(EnhancedType<T> enhancedType) {
        return (AttributeConverter<T>) converterCache.get(enhancedType);
    }
}

A custom attribute converter is an implementation of AttributeConverter that converts a custom classes to and from a map of attribute values, as shown below.

import java.util.LinkedHashMap;
import java.util.Map;
import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
import software.amazon.awssdk.enhanced.dynamodb.internal.converter.attribute.EnhancedAttributeValue;
import software.amazon.awssdk.enhanced.dynamodb.internal.converter.attribute.StringAttributeConverter;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

public class CustomClassForDocumentAttributeConverter implements AttributeConverter<CustomClassForDocumentAPI> {
    public static CustomClassForDocumentAttributeConverter create() {
        return new CustomClassForDocumentAttributeConverter();
    }
    @Override
    public AttributeValue transformFrom(CustomClassForDocumentAPI input) {
        Map<String, AttributeValue> attributeValueMap = new LinkedHashMap<>();
        if(input.string() != null){
            attributeValueMap.put("stringAttribute", AttributeValue.fromS(input.string()));
        }
        return EnhancedAttributeValue.fromMap(attributeValueMap).toAttributeValue();
    }

    @Override
    public CustomClassForDocumentAPI transformTo(AttributeValue input) {
        Map<String, AttributeValue> customAttr = input.m();
        CustomClassForDocumentAPI.Builder builder = CustomClassForDocumentAPI.builder();
        if (customAttr.get("stringAttribute") != null) {
            builder.stringAttribute(StringAttributeConverter.create().transformTo(customAttr.get("stringAttribute")));
        }
        return builder.build();
    }
    @Override
    public EnhancedType<CustomClassForDocumentAPI> type() {
        return EnhancedType.of(CustomClassForDocumentAPI.class);
    }
    @Override
    public AttributeValueType attributeValueType() {
        return AttributeValueType.M;
    }
}

Attribute Converter Provider for EnhancedDocument Builder

When working outside of a DynamoDB table context, make sure to set the attribute converter providers explicitly on the EnhancedDocument builder. When used within a DynamoDB table context, the table schema’s converter provider will be used automatically for the EnhancedDocument.
The code snippet below shows how to set an AttributeConverterProvider using the EnhancedDocument builder method.

// Enhanced document created from JSON string using custom AttributeConverterProvider.
EnhancedDocument documentFromJson = EnhancedDocument.builder()
                                                    .attributeConverterProviders(CustomClassConverterProvider.create())
                                                    .json("{\"key\": \"Values\"}")
                                                    .build();
                                                    
CustomClassForDocumentAPI customClass = documentFromJson.get("key", CustomClassForDocumentAPI.class)

Conclusion

In this blog post we showed you how to set up and begin using the Enhanced Document API with the DynamoDB Enhanced Client and standalone with the EnhancedDocument class. The enhanced client is open-source and resides in the same repository as the AWS SDK for Java 2.0.
We hope you’ll find this new feature useful. You can always share your feedback on our GitHub issues page.

How Novo Nordisk built distributed data governance and control at scale

Post Syndicated from Jonatan Selsing original https://aws.amazon.com/blogs/big-data/how-novo-nordisk-built-distributed-data-governance-and-control-at-scale/

This is a guest post co-written with Jonatan Selsing and Moses Arthur from Novo Nordisk.

This is the second post of a three-part series detailing how Novo Nordisk, a large pharmaceutical enterprise, partnered with AWS Professional Services to build a scalable and secure data and analytics platform. The first post of this series describes the overall architecture and how Novo Nordisk built a decentralized data mesh architecture, including Amazon Athena as the data query engine. The third post will show how end-users can consume data from their tool of choice, without compromising data governance. This will include how to configure Okta, AWS Lake Formation, and a business intelligence tool to enable SAML-based federated use of Athena for an enterprise BI activity.

When building a scalable data architecture on AWS, giving autonomy and ownership to the data domains are crucial for the success of the platform. By providing the right mix of freedom and control to those people with the business domain knowledge, your business can maximize value from the data as quickly and effectively as possible. The challenge facing organizations, however, is how to provide the right balance between freedom and control. At the same time, data is a strategic asset that needs to be protected with the highest degree of rigor. How can organizations strike the right balance between freedom and control?

In this post, you will learn how to build decentralized governance with Lake Formation and AWS Identity and Access Management (IAM) using attribute-based access control (ABAC). We discuss some of the patterns we use, including Amazon Cognito identity pool federation using ABAC in permission policies, and Okta-based SAML federation with ABAC enforcement on role trust policies.

Solution overview

In the first post of this series, we explained how Novo Nordisk and AWS Professional Services built a modern data architecture based on data mesh tenets. This architecture enables data governance on distributed data domains, using an end-to-end solution to create data products and providing federated data access control. This post dives into three elements of the solution:

  • How IAM roles and Lake Formation are used to manage data access across data domains
  • How data access control is enforced at scale, using a group membership mapping with an ABAC pattern
  • How the system maintains state across the different layers, so that the ecosystem of trust is configured appropriately

From the end-user perspective, the objective of the mechanisms described in this post is to enable simplified data access from the different analytics services adopted by Novo Nordisk, such as those provided by software as a service (SaaS) vendors like Databricks, or self-hosted ones such as JupyterHub. At the same time, the platform must guarantee that any change in a dataset is immediately reflected at the service user interface. The following figure illustrates at a high level the expected behavior.

High-level data platform expected behavior

Following the layer nomenclature established in the first post, the services are created and managed in the consumption layer. The domain accounts are created and managed in the data management layer. Because changes can occur from both layers, continuous communication in both directions is required. The state information is kept in the virtualization layer along with the communication protocols. Additionally, at sign-in time, the services need information about data resources required to provide data access abstraction.

Managing data access

The data access control in this architecture is designed around the core principle that all access is encapsulated in isolated IAM role sessions. The layer pattern that we described in the first post ensures that the creation and curation of the IAM role policies involved can be delegated to the different data management ecosystems. Each data management platform integrated can use their own data access mechanisms, with the unique requirement that the data is accessed via specific IAM roles.

To illustrate the potential mechanisms that can be used by data management solutions, we show two examples of data access permission mechanisms used by two different data management solutions. Both systems utilize the same trust policies as described in the following sections, but have a completely different permission space.

Example 1: Identity-based ABAC policies

The first mechanism we discuss is an ABAC role that provides access to a home-like data storage area, where users can share within their departments and with the wider organization in a structure that mimics the organizational structure. Here, we don’t utilize the group names, but instead forward user attributes from the corporate Active Directory directly into the permission policy through claim overrides. We do this by having the corporate Active Directory as the identity provider (IdP) for the Amazon Cognito user pool and mapping the relevant IdP attributes to user pool attributes. Then, in the Amazon Cognito identity pool, we map the user pool attributes to session tags to use them for access control. Custom overrides can be included in the claim mapping, through the use of a pre token generation Lambda trigger. This way, claims from AD can be mapped to Amazon Cognito user pool attributes and then ultimately used in the Amazon Cognito identity pool to control IAM role permissions. The following is an example of an IAM policy with sessions tags:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Condition": {
                "StringLike": {
                    "s3:prefix": [
                        "",
                        "public/",
                        "public/*",
                        "home/",
                        "home/${aws:PrincipalTag/initials}/*",
                        "home/${aws:PrincipalTag/department}/*"
                    ]
                }
            },
            "Action": "s3:ListBucket",
            "Resource": [
                "arn:aws:s3:::your-home-bucket"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:GetObject*",
                "s3:PutObject*",
                "s3:DeleteObject*"
            ],
            "Resource": [
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/initials}",
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/initials}/*",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/initials}",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/initials}/*",
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/department}",
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/department}/*",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/department}",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/department}/*"
            ],
            "Effect": "Allow"
        },
        {
            "Action": "s3:GetObject*",
            "Resource": [
                "arn:aws:s3:::your-home-bucket/public/",
                "arn:aws:s3:::your-home-bucket/public/*"
            ],
            "Effect": "Allow"
        }
    ]
}

This role is then embedded in the analytics layer (together with the data domain roles) and assumed on behalf of the user. This enables users to mix and match between data domains—as well as utilizing private and public data paths that aren’t necessarily tied to any data domain. For more examples of how ABAC can be used with permission policies, refer to How to scale your authorization needs by using attribute-based access control with S3.

Example 2: Lake Formation name-based access controls

In the data management solution named Novo Nordisk Enterprise Datahub (NNEDH), which we introduced in the first post, we use Lake Formation to enable standardized data access. The NNEDH datasets are registered in the Lake Formation Data Catalog as databases and tables, and permissions are granted using the named resource method. The following screenshot shows an example of these permissions.

Lakeformation named resource method for permissions management

In this approach, data access governance is delegated to Lake Formation. Every data domain in NNEDH has isolated permissions synthesized by NNEDH as the central governance management layer. This is a similar pattern to what is adopted for other domain-oriented data management solutions. Refer to Use an event-driven architecture to build a data mesh on AWS for an example of tag-based access control in Lake Formation.

These patterns don’t exclude implementations of peer-to-peer type data sharing mechanisms, such as those that can be achieved using AWS Resource Access Manager (AWS RAM), where a single IAM role session can have permissions that span across accounts.

Delegating role access to the consumption later

The following figure illustrates the data access workflow from an external service.

Data access workflow from external service

The workflow steps are as follows:

  1. A user authenticates on an IdP used by the analytics tool that they are trying to access. A wide range of analytics tools are supported by Novo Nordisk platform, such as Databricks and JupyterHub, and the IdP can be either SAML or OIDC type depending on the capabilities of the third-party tool. In this example, an Okta SAML application is used to sign into a third-party analytics tool, and an IAM SAML IdP is configured in the data domain AWS account to federate with the external IdP. The third post of this series describes how to set up an Okta SAML application for IAM role federation on Athena.
  2. The SAML assertion obtained during the sign-in process is used to request temporary security credentials of an IAM role through the AssumeRole operation. In this example, the SAML assertion is used onAssumeRoleWithSAMLoperation. For OpenID Connect-compatible IdPs, the operationAssumeRoleWithWebIdentitymust be used with the JWT. The SAML attributes in the assertion or the claims in the token can be generated at sign-in time, to ensure that the group memberships are forwarded, for the ABAC policy pattern described in the following sections.
  3. The analytics tool, such as Databricks or JupyterHub, abstracts the usage of the IAM role session credentials in the tool itself, and data can be accessed directly according to the permissions of the IAM role assumed. This pattern is similar in nature to IAM passthrough as implemented by Databricks, but in Novo Nordisk it’s extended across all analytics services. In this example, the analytics tool accesses the data lake on Amazon Simple Storage Service (Amazon S3) through Athena queries.

As the data mesh pattern expands across domains covering more downstream services, we need a mechanism to keep IdPs and IAM role trusts continuously updated. We come back to this part later in the post, but first we explain how role access is managed at scale.

Attribute-based trust policies

In previous sections, we emphasized that this architecture relies on IAM roles for data access control. Each data management platform can implement its own data access control method using IAM roles, such as identity-based policies or Lake Formation access control. For data consumption, it’s crucial that these IAM roles are only assumable by users that are part of Active Directory groups with the appropriate entitlements to use the role. To implement this at scale, the IAM role’s trust policy uses ABAC.

When a user authenticates on the external IdP of the consumption layer, we add in the access token a claim derived from their Active Directory groups. This claim is propagated by theAssumeRoleoperation into the trust policy of the IAM role, where it is compared with the expected Active Directory group. Only users that belong to the expected groups can assume the role. This mechanism is illustrated in the following figure.

Architecture of the integration with the identity provider

Translating group membership to attributes

To enforce the group membership entitlement at the role assumption level, we need a way to compare the required group membership with the group memberships that a user comes with in their IAM role session. To achieve this, we use a form of ABAC, where we have a way to represent the sum of context-relevant group memberships in a single attribute. A single IAM role session tag value is limited to 256 characters. The corresponding limit for SAML assertions is 100,000 characters, so for systems where a very large number of either roles or group-type mappings are required, SAML can support a wider range of configurations.

In our case, we have opted for a compression algorithm that takes a group name and compresses it to a 4-character string hash. This means that, together with a group-separation character, we can fit 51 groups in a single attribute. This gets pushed down to approximately 20 groups for OIDC type role assumption due to the PackedPolicySize, but is higher for a SAML-based flow. This has shown to be sufficient for our case. There is a risk that two different groups could hash to the same character combination; however, we have checked that there are no collisions in the existing groups. To mitigate this risk going forward, we have introduced guardrails in multiples places. First, before adding new groups entitlements in the virtualization layer, we check if there’s a hash collision with any existing group. When a duplicated group is attempted to be added, our service team is notified and we can react accordingly. But as stated earlier, there is a low probability of clashes, so the flexibility this provides outweighs the overhead associated with managing clashes (we have not had any yet). We additionally enforce this at SAML assertion creation time as well, to ensure that there are no duplicated groups in the users group list, and in cases of duplication, we remove both entirely. This means malicious actors can at most limit the access of other users, but not gain unauthorized access.

Enforcing audit functionality across sessions

As mentioned in the first post, on top of governance, there are strict requirements around auditability of data accesses. This means that for all data access requests, it must be possible to trace the specific user across services and retain this information. We achieve this by setting (and enforcing) a source identity for all role sessions and make sure to propagate enterprise identity to this attribute. We use a combination of Okta inline hooks and SAML session tags to achieve this. This means that the AWS CloudTrail logs for an IAM role session have the following information:

{
    "eventName": "AssumeRoleWithSAML",
    "requestParameters": {
        "SAMLAssertionlD": "id1111111111111111111111111",
        "roleSessionName": "[email protected]",
        "principalTags": {
            "nn-initials": "user",
            "department": "NNDepartment",
            "GroupHash": "xxxx",
            "email": "[email protected]",
            "cost-center": "9999"
        },
        "sourceIdentity": "[email protected]",
        "roleArn": "arn:aws:iam::111111111111:role/your-assumed-role",
        "principalArn": "arn:aws:iam,111111111111:saml-provider/your-saml-provider",
        ...
    },
    ...
}

On the IAM role level, we can enforce the required attribute configuration with the following example trust policy. This is an example for a SAML-based app. We support the same patterns through OpenID Connect IdPs.

We now go through the elements of an IAM role trust policy, based on the following example:

{
    "Version": "2008-10-17",
    "Statement": {
        "Effect": "Allow",
        "Principal": {
            "Federated": [SAML_IdP_ARN]
        },
        "Action": [
            "sts:AssumeRoleWithSAML",
            "sts:TagSession",
            "sts:SetSourceIdentity"
        ],
        "Condition": {
            "StringEquals": {
                "SAML:aud": "https://signin.aws.amazon.com/saml"
            },
            "StringLike": {
                "sts:SourceIdentity": "*@novonordisk.com",
                "aws:RequestTag/GroupHash": ["*xxxx*"]
            },
            "StringNotLike": {
                "sts:SourceIdentity": "*"
            }
        }
    }
}

The policy contains the following details:

  • ThePrincipalstatement should point to the list of apps that are served through the consumption layer. These can be Azure app registrations, Okta apps, or Amazon Cognito app clients. This means that SAML assertions (in the case of SAML-based flows) minted from these applications can be used to run the operationAssumeRoleWithSamlif the remaining elements are also satisfied.
  • TheActionstatement includes the required permissions for theAssumeRolecall to succeed, including adding the contextual information to the role session.
  • In the first condition, the audience of the assertion needs to be targeting AWS.
  • In the second condition, there are twoStringLikerequirements:
    • A requirement on the source identity as the naming convention to follow at Novo Nordisk (users must come with enterprise identity, following our audit requirements).
    • Theaws:RequestTag/GroupHashneeds to bexxxx, which represents the hashed group name mentioned in the upper section.
  • Lastly, we enforce that sessions can’t be started without setting the source identity.

This policy enforces that all calls are from recognized services, include auditability, have the right target, and enforces that the user has the right group memberships.

Building a central overview of governance and trust

In this section, we discuss how Novo Nordisk keeps track of the relevant group-role relations and maps these at sign-in time.

Entitlements

In Novo Nordisk, all accesses are based on Active Directory group memberships. There is no user-based access. Because this pattern is so central, we have extended this access philosophy into our data accesses. As mentioned earlier, at sign-in time, the hooks need to be able to know which roles to assume for a given user, given this user’s group membership. We have modeled this data in Amazon DynamoDB, where just-in-time provisioning ensures that only the required user group memberships are available. By building our application around the use of groups, and by having the group propagation done by the application code, we avoid having to make a more general Active Directory integration, which would, for a company the size of Novo Nordisk, severely impact the application, simply due to the volume of users and groups.

The DynamoDB entitlement table contains all relevant information for all roles and services, including role ARNs and IdP ARNs. This means that when users log in to their analytics services, the sign-in hook can construct the required information for the Roles SAML attribute.

When new data domains are added to the data management layer, the data management layer needs to communicate both the role information and the group name that gives access to the role.

Single sign-on hub for analytics services

When scaling this permission model and data management pattern to a large enterprise such as Novo Nordisk, we ended up creating a large number of IAM roles distributed across different accounts. Then, a solution is required to map and provide access for end-users to the required IAM role. To simplify user access to multiple data sources and analytics tools, Novo Nordisk developed a single sign-on hub for analytics services. From the end-user perspective, this is a web interface that glues together different offerings in a unified system, making it a one-stop tool for data and analytics needs. When signing in to each of the analytical offerings, the authenticated sessions are forwarded, so users never have to reauthenticate.

Common for all the services supported in the consumption layer is that we can run a piece of application code at sign-in time, allowing sign-in time permissions to be calculated. The hooks that achieve this functionality can, for instance, be run by Okta inline hooks. This means that each of the target analytics services can have custom code to translate relevant contextual information or provide other types of automations for the role forwarding.

The sign-in flow is demonstrated in the following figure.

Sign-in flow

The workflow steps are as follows:

  1. A user accesses an analytical service such as Databricks in the Novo Nordisk analytics hub.
  2. The service uses Okta as the SAML-based IdP.
  3. Okta invokes an AWS Lambda-based SAML assertion inline hook.
  4. The hook uses the entitlement database, converting application-relevant group memberships into role entitlements.
  5. Relevant contextual information is returned from the entitlement database.
  6. The Lambda-based hook adds new SAML attributes to the SAML assertion, including the hashed group memberships and other contextual information such as source identity.
  7. A modified SAML assertion is used to sign users in to the analytical service.
  8. The user can now use the analytical tool with active IAM role sessions.

Synchronizing role trust

The preceding section gives an overview of how federation works in this solution. Now we can go through how we ensure that all participating AWS environments and accounts are in sync with the latest configuration.

From the end-user perspective, the synchronization mechanism must ensure that every analytics service instantiated can access the data domains assigned to the groups that the user belongs to. Also, changes in data domains—such as granting data access to an Active Directory group—must be effective immediately to every analytics service.

Two event-based mechanisms are used to maintain all the layers synchronized, as detailed in this section.

Synchronize data access control on the data management layer with changes to services in the consumption layer

As describe in the previous section, the IAM roles used for data access are created and managed by the data management layer. These IAM roles have a trust policy providing federated access to the external IdPs used by the analytics tools of the consumption layer. It implies that for every new analytical service created with a different IDP, the IAM roles used for data access on data domains must be updated to trust this new IdP.

Using NNEDH as an example of a data management solution, the synchronization mechanism is demonstrated in the following figure.

Synchronization mechanism in a data management solution

Taking as an example a scenario where a new analytics service is created, the steps in this workflow are as follows:

  1. A user with access to the administration console of the consumption layer instantiates a new analytics service, such as JupyterHub.
  2. A job running on AWS Fargate creates the resources needed for this new analytics service, such as an Amazon Elastic Compute Cloud (Amazon EC2) instance for JupyterHub, and the IdP required, such as a new SAML IdP.
  3. When the IdP is created in the previous step, an event is added in an Amazon Simple Notification Service (Amazon SNS) topic with its details, such as name and SAML metadata.
  4. In the NNEDH control plane, a Lambda job is triggered by new events on this SNS topic. This job creates the IAM IdP, if needed, and updates the trust policy of the required IAM roles in all the AWS accounts used as data domains, adding the trust on the IdP used by the new analytics service.

In this architecture, all the update steps are event-triggered and scalable. This means that users of new analytics services can access their datasets almost instantaneously when they are created. In the same way, when a service is removed, the federation to the IdP is automatically removed if not used by other services.

Propagate changes on data domains to analytics services

Changes to data domains, such as the creation of a new S3 bucket used as a dataset, or adding or removing data access to a group, must be reflected immediately on analytics services of the consumption layer. To accomplish it, a mechanism is used to synchronize the entitlement database with the relevant changes made in NNEDH. This flow is demonstrated in the following figure.

Changes propagation flow

Taking as an example a scenario where access to a specific dataset is granted to a new group, the steps in this workflow are as follows:

  1. Using the NNEDH admin console, a data owner approves a dataset sharing request that grants access on a dataset to an Active Directory group.
  2. In the AWS account of the related data domain, the dataset components such as the S3 bucket and Lake Formation are updated to provide data access to the new group. The cross-account data sharing in Lake Formation uses AWS RAM.
  3. An event is added in an SNS topic with the current details about this dataset, such as the location of the S3 bucket and the groups that currently have access to it.
  4. In the virtualization layer, the updated information from the data management layer is used to update the entitlement database in DynamoDB.

These steps make sure that changes on data domains are automatically and immediately reflected on the entitlement database, which is used to provide data access to all the analytics services of the consumption layer.

Limitations

Many of these patterns rely on the analytical tool to support a clever use of IAM roles. When this is not the case, the platform teams themselves need to develop custom functionality at the host level to ensure that role accesses are correctly controlled. This, for example, includes writing custom authenticators for JupyterHub.

Conclusion

This post shows an approach to building a scalable and secure data and analytics platform. It showcases some of the mechanisms used at Novo Nordisk and how to strike the right balance between freedom and control. The architecture laid out in the first post in this series enables layer independence, and exposes some extremely useful primitives for data access and governance. We make heavy use of contextual attributes to modulate role permissions at the session level, which provide just-in-time permissions. These permissions are propagated at a scale, across data domains. The upside is that a lot of the complexity related to managing data access permission can be delegated to the relevant business groups, while enabling the end-user consumers of data to think as little as possible about data accesses and focus on providing value for the business use cases. In the case of Novo Nordisk, they can provide better outcomes for patients and acceleration innovation.

The next post in this series describes how end-users can consume data from their analytics tool of choice, aligned with the data access controls detailed in this post.


About the Authors

Jonatan Selsing is former research scientist with a PhD in astrophysics that has turned to the cloud. He is currently the Lead Cloud Engineer at Novo Nordisk, where he enables data and analytics workloads at scale. With an emphasis on reducing the total cost of ownership of cloud-based workloads, while giving full benefit of the advantages of cloud, he designs, builds, and maintains solutions that enable research for future medicines.

Hassen Riahi is a Sr. Data Architect at AWS Professional Services. He holds a PhD in Mathematics & Computer Science on large-scale data management. He works with AWS customers on building data-driven solutions.

Alessandro Fior is a Sr. Data Architect at AWS Professional Services. He is passionate about designing and building modern and scalable data platforms that accelerate companies to extract value from their data.

Moses Arthur comes from a mathematics and computational research background and holds a PhD in Computational Intelligence specialized in Graph Mining. He is currently a Cloud Product Engineer at Novo Nordisk, building GxP-compliant enterprise data lakes and analytics platforms for Novo Nordisk global factories producing digitalized medical products.

Anwar RizalAnwar Rizal is a Senior Machine Learning consultant based in Paris. He works with AWS customers to develop data and AI solutions to sustainably grow their business.

Kumari RamarKumari Ramar is an Agile certified and PMP certified Senior Engagement Manager at AWS Professional Services. She delivers data and AI/ML solutions that speed up cross-system analytics and machine learning models, which enable enterprises to make data-driven decisions and drive new innovations.

Accelerate HiveQL with Oozie to Spark SQL migration on Amazon EMR

Post Syndicated from Vinay Kumar Khambhampati original https://aws.amazon.com/blogs/big-data/accelerate-hiveql-with-oozie-to-spark-sql-migration-on-amazon-emr/

Many customers run big data workloads such as extract, transform, and load (ETL) on Apache Hive to create a data warehouse on Hadoop. Apache Hive has performed pretty well for a long time. But with advancements in infrastructure such as cloud computing and multicore machines with large RAM, Apache Spark started to gain visibility by performing better than Apache Hive.

Customers now want to migrate their Apache Hive workloads to Apache Spark in the cloud to get the benefits of optimized runtime, cost reduction through transient clusters, better scalability by decoupling the storage and compute, and flexibility. However, migration from Apache Hive to Apache Spark needs a lot of manual effort to write migration scripts and maintain different Spark job configurations.

In this post, we walk you through a solution that automates the migration from HiveQL to Spark SQL. The solution was used to migrate Hive with Oozie workloads to Spark SQL and run them on Amazon EMR for a large gaming client. You can also use this solution to develop new jobs with Spark SQL and process them on Amazon EMR. This post assumes that you have a basic understanding of Apache Spark, Hive, and Amazon EMR.

Solution overview

In our example, we use Apache Oozie, which schedules Apache Hive jobs as actions to collect and process data on a daily basis.

We migrate these Oozie workflows with Hive actions by extracting the HQL files, and dynamic and static parameters, and converting them to be Spark compliant. Manual conversion is both time consuming and error prone. To convert the HQL to Spark SQL, you’ll need to sort through existing HQLs, replace the parameters, and change the syntax for a bunch of files.

Instead, we can use automation to speed up the process of migration and reduce heavy lifting tasks, costs, and risks.

We split the solution into two primary components: generating Spark job metadata and running the SQL on Amazon EMR. The first component (metadata setup) consumes existing Hive job configurations and generates metadata such as number of parameters, number of actions (steps), and file formats. The second component consumes the generated metadata from the first component and prepares the run order of Spark SQL within a Spark session. With this solution, we support basic orchestration and scheduling with the help of AWS services like Amazon DynamoDB and Amazon Simple Storage Service (Amazon S3). We can validate the solution by running queries in Amazon Athena.

In the following sections, we walk through these components and how to use these automations in detail.

Generate Spark SQL metadata

Our batch job consists of Hive steps scheduled to run sequentially. For each step, we run HQL scripts that extract, transform, and aggregate input data into one final Hive table, which stores data in HDFS. We use the following Oozie workflow parser script, which takes the input of an existing Hive job and generates configurations artifacts needed for running SQL using PySpark.

Oozie workflow XML parser

We create a Python script to automatically parse the Oozie jobs, including workflow.xml, co-ordinator.xml, job properties, and HQL files. This script can handle many Hive actions in a workflow by organizing the metadata at the step level into separate folders. Each step includes the list of SQLs, SQL paths, and their static parameters, which are input for the solution in the next step.

The process consists of two steps:

  1. The Python parser script takes input of the existing Oozie Hive job and its configuration files.
  2. The script generates a metadata JSON file for each step.

The following diagram outlines these steps and shows sample output.

Prerequisites

You need the following prerequisites:

  • Python 3.8
  • Python packages:
    • sqlparse==0.4.2
    • jproperties==2.1.1
    • defusedxml== 0.7.1

Setup

Complete the following steps:

  1. Install Python 3.8.
  2. Create a virtual environment:
python3 -m venv /path/to/new/virtual/environment
  1. Activate the newly created virtual environment:
source /path/to/new/virtual/environment/bin/activate
  1. Git clone the project:
git clone https://github.com/aws-samples/oozie-job-parser-extract-hive-sql
  1. Install dependent packages:
cd oozie-job-parser-extract-hive-sql
pip install -r requirements.txt

Sample command

We can use the following sample command:

python xml_parser.py --base-folder ./sample_jobs/ --job-name sample_oozie_job_name --job-version V3 --hive-action-version 0.4 --coordinator-action-version 0.4 --workflow-version 0.4 --properties-file-name job.coordinator.properties

The output is as follows:

{'nameNode': 'hdfs://@{{/cluster/${{cluster}}/namenode}}:54310', 'jobTracker': '@{{/cluster/${{cluster}}/jobtracker}}:54311', 'queueName': 'test_queue', 'appName': 'test_app', 'oozie.use.system.libpath': 'true', 'oozie.coord.application.path': '${nameNode}/user/${user.name}/apps/${appName}', 'oozie_app_path': '${oozie.coord.application.path}', 'start': '${{startDate}}', 'end': '${{endDate}}', 'initial_instance': '${{startDate}}', 'job_name': '${appName}', 'timeOut': '-1', 'concurrency': '3', 'execOrder': 'FIFO', 'throttle': '7', 'hiveMetaThrift': '@{{/cluster/${{cluster}}/hivemetastore}}', 'hiveMySQL': '@{{/cluster/${{cluster}}/hivemysql}}', 'zkQuorum': '@{{/cluster/${{cluster}}/zookeeper}}', 'flag': '_done', 'frequency': 'hourly', 'owner': 'who', 'SLA': '2:00', 'job_type': 'coordinator', 'sys_cat_id': '6', 'active': '1', 'data_file': 'hdfs://${nameNode}/hive/warehouse/test_schema/test_dataset', 'upstreamTriggerDir': '/input_trigger/upstream1'}
('./sample_jobs/development/sample_oozie_job_name/step1/step1.json', 'w')

('./sample_jobs/development/sample_oozie_job_name/step2/step2.json', 'w')

Limitations

This method has the following limitations:

  • The Python script parses only HiveQL actions from the Oozie workflow.xml.
  • The Python script generates one file for each SQL statement and uses the sequence ID for file names. It doesn’t name the SQL based on the functionality of the SQL.

Run Spark SQL on Amazon EMR

After we create the SQL metadata files, we use another automation script to run them with Spark SQL on Amazon EMR. This automation script supports custom UDFs by adding JAR files to spark submit. This solution uses DynamoDB for logging the run details of SQLs for support and maintenance.

Architecture overview

The following diagram illustrates the solution architecture.

Prerequisites

You need the following prerequisites:

  • Version:
    • Spark 3.X
    • Python 3.8
    • Amazon EMR 6.1

Setup

Complete the following steps:

  1. Install the AWS Command Line Interface (AWS CLI) on your workspace by following the instructions in Installing or updating the latest version of the AWS CLI. To configure AWS CLI interaction with AWS, refer to Quick setup.
  2. Create two tables in DynamoDB: one to store metadata about jobs and steps, and another to log job runs.
    • Use the following AWS CLI command to create the metadata table in DynamoDB:
aws dynamodb create-table --region us-east-1 --table-name dw-etl-metadata --attribute-definitions '[ { "AttributeName": "id","AttributeType": "S" } , { "AttributeName": "step_id","AttributeType": "S" }]' --key-schema '[{"AttributeName": "id", "KeyType": "HASH"}, {"AttributeName": "step_id", "KeyType": "RANGE"}]' --billing-mode PROVISIONED --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

You can check on the DynamoDB console that the table dw-etl-metadata is successfully created.

The metadata table has the following attributes.

Attributes Type Comments
id String partition_key
step_id String sort_key
step_name String Step description
sql_base_path string Base path
sql_info list List of SQLs in ETL pipeline
. sql_path SQL file name
. sql_active_flag y/n
. sql_load_order Order of SQL
. sql_parameters Parameters in SQL and values
spark_config Map Spark configs
    • Use the following AWS CLI command to create the log table in DynamoDB:
aws dynamodb create-table --region us-east-1 --table-name dw-etl-pipelinelog --attribute-definitions '[ { "AttributeName":"job_run_id", "AttributeType": "S" } , { "AttributeName":"step_id", "AttributeType": "S" } ]' --key-schema '[{"AttributeName": "job_run_id", "KeyType": "HASH"},{"AttributeName": "step_id", "KeyType": "RANGE"}]' --billing-mode PROVISIONED --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

You can check on the DynamoDB console that the table dw-etl-pipelinelog is successfully created.

The log table has the following attributes.

Attributes Type Comments
job_run_id String partition_key
id String sort_key (UUID)
end_time String End time
error_description String Error in case of failure
expire Number Time to Live
sql_seq Number SQL sequence number
start_time String Start time
Status String Status of job
step_id String Job ID SQL belongs

The log table can grow quickly if there are too many jobs or if they are running frequently. We can archive them to Amazon S3 if they are no longer used or use the Time to Live feature of DynamoDB to clean up old records.

  1. Run the first command to set the variable in case you have an existing bucket that can be reused. If not, create a S3 bucket to store the Spark SQL code, which will be run by Amazon EMR.
export s3_bucket_name=unique-code-bucket-name # Change unique-code-bucket-name to a valid bucket name
aws s3api create-bucket --bucket $s3_bucket_name --region us-east-1
  1. Enable secure transfer on the bucket:
aws s3api put-bucket-policy --bucket $s3_bucket_name --policy '{"Version": "2012-10-17", "Statement": [{"Effect": "Deny", "Principal": {"AWS": "*"}, "Action": "s3:*", "Resource": ["arn:aws:s3:::unique-code-bucket-name", "arn:aws:s3:::unique-code-bucket-name/*"], "Condition": {"Bool": {"aws:SecureTransport": "false"} } } ] }' # Change unique-code-bucket-name to a valid bucket name

  1. Clone the project to your workspace:
git clone https://github.com/aws-samples/pyspark-sql-framework.git
  1. Create a ZIP file and upload it to the code bucket created earlier:
cd pyspark-sql-framework/code
zip code.zip -r *
aws s3 cp ./code.zip s3://$s3_bucket_name/framework/code.zip
  1. Upload the ETL driver code to the S3 bucket:
cd $OLDPWD/pyspark-sql-framework
aws s3 cp ./code/etl_driver.py s3://$s3_bucket_name/framework/

  1. Upload sample job SQLs to Amazon S3:
aws s3 cp ./sample_oozie_job_name/ s3://$s3_bucket_name/DW/sample_oozie_job_name/ --recursive

  1. Add a sample step (./sample_oozie_job_name/step1/step1.json) to DynamoDB (for more information, refer to Write data to a table using the console or AWS CLI):
{
  "name": "step1.q",
  "step_name": "step1",
  "sql_info": [
    {
      "sql_load_order": 5,
      "sql_parameters": {
        "DATE": "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}",
        "HOUR": "${coord:formatTime(coord:nominalTime(), 'HH')}"
      },
      "sql_active_flag": "Y",
      "sql_path": "5.sql"
    },
    {
      "sql_load_order": 10,
      "sql_parameters": {
        "DATE": "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}",
        "HOUR": "${coord:formatTime(coord:nominalTime(), 'HH')}"
      },
      "sql_active_flag": "Y",
      "sql_path": "10.sql"
    }
  ],
  "id": "emr_config",
  "step_id": "sample_oozie_job_name#step1",
  "sql_base_path": "sample_oozie_job_name/step1/",
  "spark_config": {
    "spark.sql.parser.quotedRegexColumnNames": "true"
  }
}

  1. In the Athena query editor, create the database base:
create database base;
  1. Copy the sample data files from the repo to Amazon S3:
    1. Copy us_current.csv:
aws s3 cp ./sample_data/us_current.csv s3://$s3_bucket_name/covid-19-testing-data/base/source_us_current/;

  1. Copy states_current.csv:
aws s3 cp ./sample_data/states_current.csv s3://$s3_bucket_name/covid-19-testing-data/base/source_states_current/;

  1. To create the source tables in the base database, run the DDLs present in the repo in the Athena query editor:
    1. Run the ./sample_data/ddl/states_current.q file by modifying the S3 path to the bucket you created.
    1. Run the ./sample_data/ddl/us_current.q file by modifying the S3 path to the bucket you created.

The ETL driver file implements the Spark driver logic. It can be invoked locally or on an EMR instance.

  1. Launch an EMR cluster.
    1. Make sure to select Use for Spark table metadata under AWS Glue Data Catalog settings.

  1. Add the following steps to EMR cluster.
aws emr add-steps --cluster-id <<cluster id created above>> --steps 'Type=CUSTOM_JAR,Name="boto3",ActionOnFailure=CONTINUE,Jar=command-runner.jar,Args=[bash,-c,"sudo pip3 install boto3"]'
aws emr add-steps --cluster-id <<cluster id created above>> --steps 'Name="sample_oozie_job_name",Jar="command-runner.jar",Args=[spark-submit,--py-files,s3://unique-code-bucket-name-#####/framework/code.zip,s3://unique-code-bucket-name-#####/framework/etl_driver.py,--step_id,sample_oozie_job_name#step1,--job_run_id,sample_oozie_job_name#step1#2022-01-01-12-00-01,  --code_bucket=s3://unique-code-bucket-name-#####/DW,--metadata_table=dw-etl-metadata,--log_table_name=dw-etl-pipelinelog,--sql_parameters,DATE=2022-02-02::HOUR=12::code_bucket=s3://unique-code-bucket-name-#####]' # Change unique-code-bucket-name to a valid bucket name

The following table summarizes the parameters for the spark step.

Step type Spark Application
Name Any Name
Deploy mode Client
Spark-submit options --py-files s3://unique-code-bucket-name-#####/framework/code.zip
Application location s3://unique-code-bucket-name-####/framework/etl_driver.py
Arguments --step_id sample_oozie_job_name#step1 --job_run_id sample_oozie_job_name#step1#2022-01-01-12-00-01 --code_bucket=s3://unique-code-bucket-name-#######/DW --metadata_table=dw-etl-metadata --log_table_name=dw-etl-pipelinelog --sql_parameters DATE=2022-02-02::HOUR=12::code_bucket=s3://unique-code-bucket-name-#######
Action on failure Continue

The following table summarizes the script arguments.

Script Argument Argument Description
deploy-mode Spark deploy mode. Client/Cluster.
name <jobname>#<stepname> Unique name for the Spark job. This can be used to identify the job on the Spark History UI.
py-files <s3 path for code>/code.zip S3 path for the code.
<s3 path for code>/etl_driver.py S3 path for the driver module. This is the entry point for the solution.
step_id <jobname>#<stepname> Unique name for the step. This refers to the step_id in the metadata entered in DynamoDB.
job_run_id <random UUID> Unique ID to identify the log entries in DynamoDB.
log_table_name <DynamoDB Log table name> DynamoDB table for storing the job run details.
code_bucket <s3 bucket> S3 path for the SQL files that are uploaded in the job setup.
metadata_table <DynamoDB Metadata table name> DynamoDB table for storing the job metadata.
sql_parameters DATE=2022-07-04::HOUR=00 Any additional or dynamic parameters expected by the SQL files.

Validation

After completion of EMR step you should have data on S3 bucket for the table base.states_daily. We can validate the data by querying the table base.states_daily in Athena.

Congratulations, you have converted an Oozie Hive job to Spark and run on Amazon EMR successfully.

Solution highlights

This solution has the following benefits:

  • It avoids boilerplate code for any new pipeline and offers less maintenance of code
  • Onboarding any new pipeline only needs the metadata set up—the DynamoDB entries and SQL to be placed in the S3 path
  • Any common modifications or enhancements can be done at the solution level, which will be reflected across all jobs
  • DynamoDB metadata provides insight into all active jobs and their optimized runtime parameters
  • For each run, this solution persists the SQL start time, end time, and status in a log table to identify issues and analyze runtimes
  • It supports Spark SQL and UDF functionality. Custom UDFs can be provided externally to the spark submit command

Limitations

This method has the following limitations:

  • The solution only supports SQL queries on Spark
  • Each SQL should be a Spark action like insert, create, drop, and so on

In this post, we explained the scenario of migrating an existing Oozie job. We can use the PySpark solution independently for any new development by creating DynamoDB entries and SQL files.

Clean up

Delete all the resources created as part of this solution to avoid ongoing charges for the resources:

  1. Delete the DynamoDB tables:
aws dynamodb delete-table --table-name dw-etl-metadata --region us-east-1
aws dynamodb delete-table --table-name dw-etl-pipelinelog --region us-east-1
  1. Delete the S3 bucket:
aws s3 rm s3://$s3_bucket_name --region us-east-1 --recursive
aws s3api create-bucket --bucket $s3_bucket_name --region us-east-1

  1. Stop the EMR cluster if it wasn’t a transient cluster:
aws emr terminate-clusters --cluster-ids <<cluster id created above>> 

Conclusion

In this post, we presented two automated solutions: one for parsing Oozie workflows and HiveQL files to generate metadata, and a PySpark solution for running SQLs using generated metadata. We successfully implemented these solutions to migrate a Hive workload to EMR Spark for a major gaming customer and achieved about 60% effort reduction.

For a Hive with Oozie to Spark migration, these solutions help complete the code conversion quickly so you can focus on performance benchmark and testing. Developing a new pipeline is also quick—you only need to create SQL logic, test it using Spark (shell or notebook), add metadata to DynamoDB, and test via the PySpark SQL solution. Overall, you can use the solution in this post to accelerate Hive to Spark code migration.


About the authors

Vinay Kumar Khambhampati is a Lead Consultant with the AWS ProServe Team, helping customers with cloud adoption. He is passionate about big data and data analytics.

Sandeep Singh is a Lead Consultant at AWS ProServe, focused on analytics, data lake architecture, and implementation. He helps enterprise customers migrate and modernize their data lake and data warehouse using AWS services.

Amol Guldagad is a Data Analytics Consultant based in India. He has worked with customers in different industries like banking and financial services, healthcare, power and utilities, manufacturing, and retail, helping them solve complex challenges with large-scale data platforms. At AWS ProServe, he helps customers accelerate their journey to the cloud and innovate using AWS analytics services.

Optimizing fleet utilization with Amazon Location Service and HERE Technologies

Post Syndicated from Mahesh Geeniga original https://aws.amazon.com/blogs/architecture/optimizing-fleet-utilization-with-amazon-location-service-and-here-technologies/

The fleet management market is expected to grow at a Compound Annual Growth Rate (CAGR) of 15.5 percent—from 25.5 billion US dollars in 2022 to USD 52.4 billion in 2027. Optimizing how your organization uses its vehicle fleet is important for logistics and service providers such as last mile, middle mile, and field services.

In this post, we demonstrate how to build and run a solution for many-to-many vehicle routing using HERE Tour Planning and Amazon Location Service. HERE Technologies is a data provider for Amazon Location Service and provides it with map rendering, geocoding, search, and routing. HERE Tour Planning expands on functionality such as geocoding, basic routing, and matrix routing to consider parameters such as time windows, job requirements or priorities, vehicle capabilities, range, and traffic information. They also support immediate re-planning when conditions change.

The architecture described in this post can help you optimize your fleets for delivering shipments, such as perishable items on pallets, from a central distribution center to multiple retail locations. The architecture uses the AWS Cloud Development Kit (AWS CDK) to help you provision and version control your infrastructure. The architecture also uses Event Driven Architecture (EDA) based on AWS Lambda and Amazon DynamoDB. It uses Amazon Simple Storage Service (Amazon S3) and DynamoDB to store the artifacts generated and integrates with Amazon Location Service to plot the routes visually on a map for each delivery driver.

Solution overview

This post will help you:

  • Configure the many-to-many vehicle routing architecture using HERE Tour Planning and Amazon Location Service
  • Submit a HERE Tour Planning problem
  • Generate an optimized solution file
  • Run a React app that:
    • Generates a list of routes for each vehicle in the fleet
    • Allows drivers to select and view routes in detail

The following diagram outlines how the architecture works.

Many-to-many vehicle routing architecture

Figure 1. Many-to-many vehicle routing architecture using HERE Tour Planning and Amazon Location Service

Let’s explore the steps in the diagram.

  1. The fleet operator uploads the tour requirements file to an Amazon S3 bucket.
  2. The upload invokes a Lambda function to process the new Tour Request. If the HERE API key is present, it calls the HERE Tour Planning API.
  3. The HERE Tour Planning API calculates the solution to the routing problem.
  4. The driver uses the React app to select a vehicle, which requests a route.
  5. The invoked Lambda function uses Amazon Location Service to calculate the route and render it in the React app.

Prerequisites

This walkthrough requires the following installations and resources:

  • Have an AWS account
  • Install the AWS Amplify CLI (command-line interface)
  • Install the AWS CDK CLI
  • Install the AWS CLI
    • Configure and authenticate the AWS CLI to interact with your AWS account.
  • Have your preferred integrated development environment (IDE), such as Visual Studio Code
  • Have GitHub repository access
    • git clone https://github.com/aws-samples/aws-here-optimize-fleet-utilization
  • Have a HERE API Key (optional)
    • This is needed to invoke the HERE Tour Planning API for generating solutions to new routing problems.
    • The GitHub sample repository includes a problem and pre-solved solution file, so you don’t need to acquire a HERE API key to learn about these offerings.
    • To acquire an API key, create a free account on the HERE Platform, then follow the instructions in the HERE Tour Planning documentation to create your API key.
    • There can be additional charges based on API key use. For more details, see the HERE Tour Planning section within HERE service rates.

Walkthrough

Provision the infrastructure

  1. The architecture uses NPM node modules. Run the following commands to install the dependencies:
    • # AWS Lambda function dependencies
      cd lib/lambda/calculate-route
      npm install
    • # Sample Frontend application dependencies
      cd frontend/here-driver-app
      npm install
  2. If you have never used AWS CDK in your AWS account, you must first bootstrap the solution, which creates Amazon S3 buckets and metadata to support AWS CDK operations. Note: Architectural components described in this article are covered under AWS Free Tier and HERE Free monthly usage, but additional charges can occur based on usage beyond the Free Tier limits. We recommend following the Cleanup instructions after completing the walkthrough.
      • From the root of the repository, run the following command to generate the needed infrastructure:
        • cdk bootstrap
      • Output similar to the following indicates that you successfully bootstrapped the AWS account with what AWS CDK requires.

        Successful bootstrap of AWS account with AWS CDK requirements

        Figure 2. Successful bootstrap of AWS account with AWS CDK requirements

  3. Deploy the infrastructure for this solution by running the following command. This provisions much of the infrastructure required for this solution such as DynamoDB, Lambda functions, and Amazon S3 buckets:
    • cdk deploy
  4. Next, Amplify provisions the remaining resources to complete the architecture. Navigate to the folder for the frontend by running the following command:
    • cd frontend/here-driver-app
  5. Run the following series of Amplify commands to create the remaining resources, Amazon API Gateway, Amazon Cognito, and Amazon Location Service. For additional details, see Clone sample Amplify project.
  6. To accept the defaults, run the following command:
    • amplify init
  7. To push the infrastructure out to the AWS account, run the following command:
    • amplify push
  8. To publish the environment, run the following command:
    • amplify publish

Create problem and generate architecture files

The next step is to create the HERE Tour Planning problem file and submit it to the HERE Tour Planning API to solve. Note: You can either sign up for the HERE Developer program to receive an API key to test this solution live or use the problem and pre-solved solution provided in the /data folder of the repository.

  1. Open the Amazon S3 bucket that the previous step created.
  2. Upload a problem file (in JSON format) to the bucket.
  3. An Amazon S3 event notification invokes a Lambda function that performs a synchronous call to the HERE Tour Planning API and generates vehicle routing problem solution file in JSON format.
  4. The Lambda function saves the solution file to the Amazon S3 bucket and additional details about the solution to a DynamoDB table.
  5. The delivery drivers can use the example React app to view the list of vehicles and the routes.
Creating a problem file and submitting it to HERE Tour Planning API

Figure 3. Creating a problem file and submitting it to HERE Tour Planning API

Frontend

The next step is to run the React Frontend app to see the results. Access to the app and the API Gateway is secured with Amazon Cognito.

  1. To run the web application, run the following command:
    • npm start
  2. A local web server runs on http://localhost:3000.
  3. To use the system, the user must authenticate. Amazon Cognito allows users to sign in or create a new account.
  4. After you authenticate, the Home screen displays a list of available vehicles and routes.

    Available vehicles and routes

    Figure 4. Available vehicles and routes

Choose a vehicle to see the detail of the route. Each red marker is a stop.

Vehicle route details

Figure 5. Vehicle route details

Cleanup

To avoid incurring future charges, delete all resources created.

  1. Run the following commands to delete the application in AWS Amplify. As an alternative, you can use the Amplify console to delete Amplify resources:
    • cd frontend/here-driver-app
      amplify pull
      amplify delete
  2. With AWS CDK, run the following command to delete the AWS CloudFormation stack that was used to provision the resources. Note: You can leave the AWS CLI, Amplify CLI, and CDK CLI installed on your computer for future development:
    • cdk destroy

Conclusion

In this post, using shipment delivery from a central distribution center use case, we’ve demonstrated how you can build your own serverless solution for optimizing middle and last mile operations. The solution uses multi-vehicle and multi-stop optimization services provided by HERE Tour Planning and Amazon Location Service to visualize the generated routes for each delivery vehicle driver. For additional details about HERE’s offerings on AWS Marketplace, see AWS Marketplace: HERE Technologies.

Serverless ICYMI Q1 2023

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/serverless-icymi-q1-2023/

Welcome to the 21st edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all the most recent product launches, feature enhancements, blog posts, webinars, live streams, and other interesting things that you might have missed!

ICYMI2023Q1

In case you missed our last ICYMI, check out what happened last quarter here.

Artificial intelligence (AI) technologies, ChatGPT, and DALL-E are creating significant interest in the industry at the moment. Find out how to integrate serverless services with ChatGPT and DALL-E to generate unique bedtime stories for children.

Example notification of a story hosted with Next.js and App Runner

Example notification of a story hosted with Next.js and App Runner

Serverless Land is a website maintained by the Serverless Developer Advocate team to help you build serverless applications and includes workshops, code examples, blogs, and videos. There is now enhanced search functionality so you can search across resources, patterns, and video content.

SLand-search

ServerlessLand search

AWS Lambda

AWS Lambda has improved how concurrency works with Amazon SQS. You can now control the maximum number of concurrent Lambda functions invoked.

The launch blog post explains the scaling behavior of Lambda using this architectural pattern, challenges this feature helps address, and a demo of maximum concurrency in action.

Maximum concurrency is set to 10 for the SQS queue.

Maximum concurrency is set to 10 for the SQS queue.

AWS Lambda Powertools is an open-source library to help you discover and incorporate serverless best practices more easily. Lambda Powertools for .NET is now generally available and currently focused on three observability features: distributed tracing (Tracer), structured logging (Logger), and asynchronous business and application metrics (Metrics). Powertools is also available for Python, Java, and Typescript/Node.js programming languages.

To learn more:

Lambda announced a new feature, runtime management controls, which provide more visibility and control over when Lambda applies runtime updates to your functions. The runtime controls are optional capabilities for advanced customers that require more control over their runtime changes. You can now specify a runtime management configuration for each function with three settings, Automatic (default), Function update, or manual.

There are three new Amazon CloudWatch metrics for asynchronous Lambda function invocations: AsyncEventsReceived, AsyncEventAge, and AsyncEventsDropped. You can track the asynchronous invocation requests sent to Lambda functions to monitor any delays in processing and take corrective actions if required. The launch blog post explains the new metrics and how to use them to troubleshoot issues.

Lambda now supports Amazon DocumentDB change streams as an event source. You can use Lambda functions to process new documents, track updates to existing documents, or log deleted documents. You can use any programming language that is supported by Lambda to write your functions.

There is a helpful blog post suggesting best practices for developing portable Lambda functions that allow you to port your code to containers if you later choose to.

AWS Step Functions

AWS Step Functions has expanded its AWS SDK integrations with support for 35 additional AWS services including Amazon EMR Serverless, AWS Clean Rooms, AWS IoT FleetWise, AWS IoT RoboRunner and 31 other AWS services. In addition, Step Functions also added support for 1000+ new API actions from new and existing AWS services such as Amazon DynamoDB and Amazon Athena. For the full list of added services, visit AWS SDK service integrations.

Amazon EventBridge

Amazon EventBridge has launched the AWS Controllers for Kubernetes (ACK) for EventBridge and Pipes . This allows you to manage EventBridge resources, such as event buses, rules, and pipes, using the Kubernetes API and resource model (custom resource definitions).

EventBridge event buses now also support enhanced integration with Service Quotas. Your quota increase requests for limits such as PutEvents transactions-per-second, number of rules, and invocations per second among others will be processed within one business day or faster, enabling you to respond quickly to changes in usage.

AWS SAM

The AWS Serverless Application Model (SAM) Command Line Interface (CLI) has added the sam list command. You can now show resources defined in your application, including the endpoints, methods, and stack outputs required to test your deployed application.

AWS SAM has a preview of sam build support for building and packaging serverless applications developed in Rust. You can use cargo-lambda in the AWS SAM CLI build workflow and AWS SAM Accelerate to iterate on your code changes rapidly in the cloud.

You can now use AWS SAM connectors as a source resource parameter. Previously, you could only define AWS SAM connectors as a AWS::Serverless::Connector resource. Now you can add the resource attribute on a connector’s source resource, which makes templates more readable and easier to update over time.

AWS SAM connectors now also support multiple destinations to simplify your permissions. You can now use a single connector between a single source resource and multiple destination resources.

In October 2022, AWS released OpenID Connect (OIDC) support for AWS SAM Pipelines. This improves your security posture by creating integrations that use short-lived credentials from your CI/CD provider. There is a new blog post on how to implement it.

Find out how best to build serverless Java applications with the AWS SAM CLI.

AWS App Runner

AWS App Runner now supports retrieving secrets and configuration data stored in AWS Secrets Manager and AWS Systems Manager (SSM) Parameter Store in an App Runner service as runtime environment variables.

AppRunner also now supports incoming requests based on HTTP 1.0 protocol, and has added service level concurrency, CPU and Memory utilization metrics.

Amazon S3

Amazon S3 now automatically applies default encryption to all new objects added to S3, at no additional cost and with no impact on performance.

You can now use an S3 Object Lambda Access Point alias as an origin for your Amazon CloudFront distribution to tailor or customize data to end users. For example, you can resize an image depending on the device that an end user is visiting from.

S3 has introduced Mountpoint for S3, a high performance open source file client that translates local file system API calls to S3 object API calls like GET and LIST.

S3 Multi-Region Access Points now support datasets that are replicated across multiple AWS accounts. They provide a single global endpoint for your multi-region applications, and dynamically route S3 requests based on policies that you define. This helps you to more easily implement multi-Region resilience, latency-based routing, and active-passive failover, even when data is stored in multiple accounts.

Amazon Kinesis

Amazon Kinesis Data Firehose now supports streaming data delivery to Elastic. This is an easier way to ingest streaming data to Elastic and consume the Elastic Stack (ELK Stack) solutions for enterprise search, observability, and security without having to manage applications or write code.

Amazon DynamoDB

Amazon DynamoDB now supports table deletion protection to protect your tables from accidental deletion when performing regular table management operations. You can set the deletion protection property for each table, which is set to disabled by default.

Amazon SNS

Amazon SNS now supports AWS X-Ray active tracing to visualize, analyze, and debug application performance. You can now view traces that flow through Amazon SNS topics to destination services, such as Amazon Simple Queue Service, Lambda, and Kinesis Data Firehose, in addition to traversing the application topology in Amazon CloudWatch ServiceLens.

SNS also now supports setting content-type request headers for HTTPS notifications so applications can receive their notifications in a more predictable format. Topic subscribers can create a DeliveryPolicy that specifies the content-type value that SNS assigns to their HTTPS notifications, such as application/json, application/xml, or text/plain.

EDA Visuals collection added to Serverless Land

The Serverless Developer Advocate team has extended Serverless Land and introduced EDA visuals. These are small bite sized visuals to help you understand concept and patterns about event-driven architectures. Find out about batch processing vs. event streaming, commands vs. events, message queues vs. event brokers, and point-to-point messaging. Discover bounded contexts, migrations, idempotency, claims, enrichment and more!

EDA-visuals

EDA Visuals

To learn more:

Serverless Repos Collection on Serverless Land

There is also a new section on Serverless Land containing helpful code repositories. You can search for code repos to use for examples, learning or building serverless applications. You can also filter by use-case, runtime, and level.

Serverless Repos Collection

Serverless Repos Collection

Serverless Blog Posts

January

Jan 12 – Introducing maximum concurrency of AWS Lambda functions when using Amazon SQS as an event source

Jan 20 – Processing geospatial IoT data with AWS IoT Core and the Amazon Location Service

Jan 23 – AWS Lambda: Resilience under-the-hood

Jan 24 – Introducing AWS Lambda runtime management controls

Jan 24 – Best practices for working with the Apache Velocity Template Language in Amazon API Gateway

February

Feb 6 – Previewing environments using containerized AWS Lambda functions

Feb 7 – Building ad-hoc consumers for event-driven architectures

Feb 9 – Implementing architectural patterns with Amazon EventBridge Pipes

Feb 9 – Securing CI/CD pipelines with AWS SAM Pipelines and OIDC

Feb 9 – Introducing new asynchronous invocation metrics for AWS Lambda

Feb 14 – Migrating to token-based authentication for iOS applications with Amazon SNS

Feb 15 – Implementing reactive progress tracking for AWS Step Functions

Feb 23 – Developing portable AWS Lambda functions

Feb 23 – Uploading large objects to Amazon S3 using multipart upload and transfer acceleration

Feb 28 – Introducing AWS Lambda Powertools for .NET

March

Mar 9 – Server-side rendering micro-frontends – UI composer and service discovery

Mar 9 – Building serverless Java applications with the AWS SAM CLI

Mar 10 – Managing sessions of anonymous users in WebSocket API-based applications

Mar 14 –
Implementing an event-driven serverless story generation application with ChatGPT and DALL-E

Videos

Serverless Office Hours – Tues 10AM PT

Weekly office hours live stream. In each session we talk about a specific topic or technology related to serverless and open it up to helping you with your real serverless challenges and issues. Ask us anything you want about serverless technologies and applications.

January

Jan 10 – Building .NET 7 high performance Lambda functions

Jan 17 – Amazon Managed Workflows for Apache Airflow at Scale

Jan 24 – Using Terraform with AWS SAM

Jan 31 – Preparing your serverless architectures for the big day

February

Feb 07- Visually design and build serverless applications

Feb 14 – Multi-tenant serverless SaaS

Feb 21 – Refactoring to Serverless

Feb 28 – EDA visually explained

March

Mar 07 – Lambda cookbook with Python

Mar 14 – Succeeding with serverless

Mar 21 – Lambda Powertools .NET

Mar 28 – Server-side rendering micro-frontends

FooBar Serverless YouTube channel

Marcia Villalba frequently publishes new videos on her popular serverless YouTube channel. You can view all of Marcia’s videos at https://www.youtube.com/c/FooBar_codes.

January

Jan 12 – Serverless Badge – A new certification to validate your Serverless Knowledge

Jan 19 – Step functions Distributed map – Run 10k parallel serverless executions!

Jan 26 – Step Functions Intrinsic Functions – Do simple data processing directly from the state machines!

February

Feb 02 – Unlock the Power of EventBridge Pipes: Integrate Across Platforms with Ease!

Feb 09 – Amazon EventBridge Pipes: Enrichment and filter of events Demo with AWS SAM

Feb 16 – AWS App Runner – Deploy your apps from GitHub to Cloud in Record Time

Feb 23 – AWS App Runner – Demo hosting a Node.js app in the cloud directly from GitHub (AWS CDK)

March

Mar 02 – What is Amazon DynamoDB? What are the most important concepts? What are the indexes?

Mar 09 – Choreography vs Orchestration: Which is Best for Your Distributed Application?

Mar 16 – DynamoDB Single Table Design: Simplify Your Code and Boost Performance with Table Design Strategies

Mar 23 – 8 Reasons You Should Choose DynamoDB for Your Next Project and How to Get Started

Sessions with SAM & Friends

SAMFiends

AWS SAM & Friends

Eric Johnson is exploring how developers are building serverless applications. We spend time talking about AWS SAM as well as others like AWS CDK, Terraform, Wing, and AMPT.

Feb 16 – What’s new with AWS SAM

Feb 23 – AWS SAM with AWS CDK

Mar 02 – AWS SAM and Terraform

Mar 10 – Live from ServerlessDays ANZ

Mar 16 – All about AMPT

Mar 23 – All about Wing

Mar 30 – SAM Accelerate deep dive

Still looking for more?

The Serverless landing page has more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow the Serverless Developer Advocacy team on Twitter to see the latest news, follow conversations, and interact with the team.

Genomics workflows, Part 5: automated benchmarking

Post Syndicated from Rostislav Markov original https://aws.amazon.com/blogs/architecture/genomics-workflows-part-5-automated-benchmarking/

Launching and running genomics workflows can take hours and involves large pools of compute instances that process data at a petabyte scale. Benchmarking helps you evaluate workflow performance and discover faster and cheaper ways of running them.

In practice, performance evaluations happen irregularly because of the associated heavy lifting. In this blog post, we discuss how life-science research teams can automate evaluations.

Business Benefits

An automated benchmarking solution provides:

  • more accurate enterprise resource planning by performing historical analytics,
  • lower cost to the business by comparing performance on different resource types, and
  • cost transparency to the business by quantifying periodical chargeback.

We’ve used automated benchmarking to compare processing times on different services such as Amazon Elastic Compute Cloud (Amazon EC2), AWS Batch, AWS ParallelCluster, Amazon Elastic Kubernetes Service (Amazon EKS), and on-premises HPC clusters. Scientists, financiers, technical leaders, and other stakeholders can build reports and dashboards to compare consumption data by consumer, workflow type, and time period.

Design pattern

Our automated benchmarking solution measures performance on two dimensions:

  • Timing: measures the duration of a workflow launch on a specific dataset
  • Pricing: measures the associated cost

This solution can be extended to other performance metrics such as iterations per second or process/thread distribution across compute nodes.

Our requirements include the following:

  • Consistent measurement of timing based on workflow status (such as preparing, waiting, ready, running, failed, complete)
  • Extensible pricing models based on unit prices (the Amazon EC2 Spot price at a specific period of time compared to Amazon EC2 On-Demand pricing)
  • Scalable, cost-efficient, and flexible data store enabling historical benchmarking and estimations
  • Minimal infrastructure management overhead

We choose a serverless design pattern using AWS Step Functions orchestration, AWS Lambda for our application code, and Amazon DynamoDB to track workflow launch IDs and states (as described in Part 3). We assume that the genomics workflows run on AWS Batch with genomics data on Amazon FSx for Lustre (Part 1). AWS Step Functions allows us to break down processing into smaller steps and avoid monolithic application code. Our evaluation process runs in four steps:

  1. Monitor for completed workflow launches in the DynamoDB stream using an Amazon EventBridge pipe with a Step Functions workflow as target. This event-driven approach is more efficient than periodic polling and avoids custom code for parsing status and cost values in all records of the DynamoDB stream.
  2. Collect a list of all compute resources associated with the workflow launch. Design a Lambda function that queries the AWS Batch API (see Part 1) to describe compute environment parameters like the Amazon EC2 instance IDs and their details, such as processing times, instance family/size, and allocation strategy (for example, Spot Instances, Reserved Instances, On-Demand Instances).
  3. Calculate the cost of all consumed resources. We achieve this with another Lambda function, which calculates the total price based on unit prices from the AWS Price List Query API.
  4. Our state machine updates the total price in the DynamoDB table without the need for additional application code.

Figure 1 visualizes these steps.

Automated benchmarking of genomics workflows

Figure 1. Automated benchmarking of genomics workflows

Implementation considerations

AWS Step Functions orchestrates our benchmarking workflow reliably and makes our application code easy to maintain. Figure 2 summarizes the state machine transitions that we’ll describe.

AWS Step Functions state machine for automated benchmarking

Figure 2. AWS Step Functions state machine for automated benchmarking

Gather consumption details

Configure the DynamoDB stream view type to New image so that the entire item is passed through as it appears after it was changed. We set up an Amazon EventBridge pipe with event filtering and the DynamoDB stream as a source. Our event filter uses multiple matching on records with a status of COMPLETE, but no cost entry in order to avoid an infinite loop. Once our state machine has updated the DynamoDB item with the workflow price, the resulting record in the DynamoDB stream will not pass our event filter.

The syntax of our event filter is as follows:

{
  "dynamodb": {
    "NewImage": {
      "status": {
        "S": ["COMPLETE"]
      },
      "totalCost": {
        "S": [{
          "exists": false
        }]
      }
    }
  }
}

We use an input transformer to simplify follow-on parsing by removing unnecessary metadata from the event.

The consumed resources included in the stream record are the auto-scaling group ID for AWS Batch and the Amazon FSx for Lustre volume ID. We use the DescribeJobs API (describe_jobs in Boto3) to determine which compute resources were used. If the response is a list of EC2 instances, we then look up consumption information including start and end times using the ListJobs API (list_jobs in Boto3) for each compute node. We use describe_volumes with filters on the identified EC2 instances to obtain the size and type of Amazon Elastic Block Store (Amazon EBS) volumes.

Calculate prices

Another Lambda function obtains the associated unit prices of all consumed resources using the GetProducts request of AWS Price List Query API (get_products in Boto3) and then parsing the pricePerUnit value. For Spot Instances, we use describe_spot_price_history of the EC2 client in Boto3 and specify the time range and instance types for which we want to receive prices.

Calculate the price of workflow launches based on the following factors:

  • Number and size of EC2 instances in auto-scaling node groups
  • Size of EBS volumes and Amazon FSx for Lustre
  • Processing duration

Our Python-based Lambda function calculates the total, rounds it, and delivers the price breakdown in the following format:

total_cost: str, instance_cost: str, volume_cost: str, filesystem_cost: str

Lastly, we put the price breakdown to the DynamoDB table using UpdateItem directly from the Amazon States Language.

Note that AWS credits and enterprise discounts might not be reflected in the responses of the AWS Price List Query API unless applied to the particular AWS account. This is often considered best practice in light of least-privilege considerations.

In the past, we’ve also used AWS Cost Explorer instead of the AWS Price List API. AWS Cost Explorer data is updated at least once every 24 hours. You can denote the pending price status in the DynamoDB table item and use the Wait state to delay the calculation process.

The presented solution can be extended to other compute services such as Amazon Elastic Kubernetes Service (Amazon EKS). For Amazon EKS, events are enriched with the cluster ID from the DynamoDB table and the price calculation should also include control plane costs.

Conclusion

Life-science research teams use benchmarking to compare workflow performance and inform their architectural decisions. Such evaluations are effort-intensive and therefore done irregularly.

In this blog post, we showed how life-science research teams can automate benchmarking for their scientific workflows. The insights teams gain from automated benchmarking indicate continuous optimization opportunities, such as by adjusting compute node configuration. The evaluation data is also available on demand for other purposes including chargeback.

Stay tuned for our next post in which we show how to use historical benchmarking data for price estimations of future workflow launches.

Related information

Unit Testing AWS Lambda with Python and Mock AWS Services

Post Syndicated from Kevin Hakanson original https://aws.amazon.com/blogs/devops/unit-testing-aws-lambda-with-python-and-mock-aws-services/

When building serverless event-driven applications using AWS Lambda, it is best practice to validate individual components.  Unit testing can quickly identify and isolate issues in AWS Lambda function code.  The techniques outlined in this blog demonstrates unit test techniques for Python-based AWS Lambda functions and interactions with AWS Services.

The full code for this blog is available in the GitHub project as a demonstrative example.

Example use case

Let’s consider unit testing a serverless application which provides an API endpoint to generate a document.  When the API endpoint is called with a customer identifier and document type, the Lambda function retrieves the customer’s name from DynamoDB, then retrieves the document text from DynamoDB for the given document type, finally generating and writing the resulting document to S3.

Figure 1. Example application architecture

Figure 1. Example application architecture

  1. Amazon API Gateway provides an endpoint to request the generation of a document for a given customer.  A document type and customer identifier are provided in this API call.
  2. The endpoint invokes an AWS Lambda function that generates a document using the customer identifier and the document type provided.
  3. An Amazon DynamoDB table stores the contents of the documents and the users name, which are retrieved by the Lambda function.
  4. The resulting text document is stored to Amazon S3.

Our testing goal is to determine if an isolated “unit” of code works as intended. In this blog, we will be writing tests to provide confidence that the logic written in the above AWS Lambda function behaves as we expect. We will mock the service integrations to Amazon DynamoDB and S3 to isolate and focus our tests on the Lambda function code, and not on the behavior of the AWS Services.

Define the AWS Service resources in the Lambda function

Before writing our first unit test, let’s look at the Lambda function that contains the behavior we wish to test.  The full code for the Lambda function is available in the GitHub repository as src/sample_lambda/app.py.

As part of our Best practices for working AWS Lambda functions, we recommend initializing AWS service resource connections outside of the handler function and in the global scope.  Additionally, we can retrieve any relevant environment variables in the global scope so that subsequent invocations of the Lambda function do not repeatedly need to retrieve them.  For organization, we can put the resource and variables in a dictionary:

_LAMBDA_DYNAMODB_RESOURCE = { "resource" : resource('dynamodb'), 
                              "table_name" : environ.get("DYNAMODB_TABLE_NAME","NONE") }

However, globally scoped code and global variables are challenging to test in Python, as global statements are executed on import, and outside of the controlled test flow.  To facilitate testing, we define classes for supporting AWS resource connections that we can override (patch) during testing.  These classes will accept a dictionary containing the boto3 resource and relevant environment variables.

For example, we create a DynamoDB resource class with a parameter “boto3_dynamodb_resource” that accepts a boto3 resource connected to DynamoDB:

class LambdaDynamoDBClass:
    def __init__(self, lambda_dynamodb_resource):
        self.resource = lambda_dynamodb_resource["resource"]
        self.table_name = lambda_dynamodb_resource["table_name"]
        self.table = self.resource.Table(self.table_name)

Build the Lambda Handler

The Lambda function handler is the method in the AWS Lambda function code that processes events. When the function is invoked, Lambda runs the handler method. When the handler exits or returns a response, it becomes available to process another event.

To facilitate unit test of the handler function, move as much of logic as possible to other functions that are then called by the Lambda hander entry point.  Also, pass the AWS resource global variables to these subsequent function calls.  This approach enables us to mock and intercept all resources and calls during test.

In our example, the handler references the global variables, and instantiates the resource classes to setup the connections to specific AWS resources.  (We will be able to override and mock these connections during unit test.)

Then the handler calls the create_letter_in_s3 function to perform the steps of creating the document, passing the resource classes.  This downstream function avoids directly referencing the global context or any AWS resource connections directly.

def lambda_handler(event: APIGatewayProxyEvent, context: LambdaContext) -> Dict[str, Any]:

    global _LAMBDA_DYNAMODB_RESOURCE
    global _LAMBDA_S3_RESOURCE

    dynamodb_resource_class = LambdaDynamoDBClass(_LAMBDA_DYNAMODB_RESOURCE)
    s3_resource_class = LambdaS3Class(_LAMBDA_S3_RESOURCE)

    return create_letter_in_s3(
            dynamo_db = dynamodb_resource_class,
            s3 = s3_resource_class,
            doc_type = event["pathParameters"]["docType"],
            cust_id = event["pathParameters"]["customerId"])

Unit testing with mock AWS services

Our Lambda function code has now been written and is ready to be tested, let’s take a look at the unit test code!   The full code for the unit test is available in the GitHub repository as tests/unit/src/test_sample_lambda.py.

In production, our Lambda function code will directly access the AWS resources we defined in our function handler; however, in our unit tests we want to isolate our code and replace the AWS resources with simulations.  This isolation facilitates running unit tests in an isolated environment to prevent accidental access to actual cloud resources.

Moto is a python library for Mocking AWS Services that we will be using to simulate AWS resource our tests.  Moto supports many AWS resources, and it allows you to test your code with little or no modification by emulating functionality of these services.

Moto uses decorators to intercept and simulate responses to and from AWS resources.  By adding a decorator for a given AWS service, subsequent calls from the module to that service will be re-directed to the mock.

@moto.mock_dynamodb
@moto.mock_s3

Configure Test Setup and Tear-down

The mocked AWS resources will be used during the unit test suite.  Using the setUp() method allows you to define and configure the mocked global AWS Resources before the tests are run.

We define the test class and a setUp() method and initialize the mock AWS resource.  This includes configuring the resource to prepare it for testing, such as defining a mock DynamoDB table or creating a mock S3 Bucket.

class TestSampleLambda(TestCase):
    def setUp(self) -> None:
        dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
        dynamodb.create_table(
            TableName = self.test_ddb_table_name,
            KeySchema = [{"AttributeName": "PK", "KeyType": "HASH"}],
            AttributeDefinitions = [{"AttributeName": "PK", 
                                     "AttributeType": "S"}],
            BillingMode = 'PAY_PER_REQUEST'
           
        s3_client = boto3.client('s3', region_name="us-east-1")
        s3_client.create_bucket(Bucket = self.test_s3_bucket_name ) 

After creating the mocked resources, the setup function creates resource class object referencing those mocked resources, which will be used during testing.

        mocked_dynamodb_resource = resource("dynamodb")
        mocked_s3_resource = resource("s3")
        mocked_dynamodb_resource = { "resource" : resource('dynamodb'),
                                     "table_name" : self.test_ddb_table_name  }
        mocked_s3_resource = { "resource" : resource('s3'),
                               "bucket_name" : self.test_s3_bucket_name }
        self.mocked_dynamodb_class = LambdaDynamoDBClass(mocked_dynamodb_resource)
        self.mocked_s3_class = LambdaS3Class(mocked_s3_resource)

Test #1: Verify the code writes the document to S3

Our first test will validate our Lambda function writes the customer letter to an S3 bucket in the correct manner.  We will follow the standard test format of arrange, act, assert when writing this unit test.

Arrange the data we need in the DynamoDB table:

def test_create_letter_in_s3(self) -> None:
    
    self.mocked_dynamodb_class.table.put_item(Item={"PK":"D#UnitTestDoc",
                                                        "data":"Unit Test Doc Corpi"})
    self.mocked_dynamodb_class.table.put_item(Item={"PK":"C#UnitTestCust",
                                                        "data":"Unit Test Customer"})

Act by calling the create_letter_in_s3 function.  During these act calls, the test passes the AWS resources as created in the setUp().

    test_return_value = create_letter_in_s3(
                        dynamo_db = self.mocked_dynamodb_class,
                        s3=self.mocked_s3_class,
                        doc_type = "UnitTestDoc",
                        cust_id = "UnitTestCust"
                        )

Assert by reading the data written to the mock S3 bucket, and testing conformity to what we are expecting:

bucket_key = "UnitTestCust/UnitTestDoc.txt"
    body = self.mocked_s3_class.bucket.Object(bucket_key).get()['Body'].read()

    self.assertEqual(test_return_value["statusCode"], 200)
    self.assertIn("UnitTestCust/UnitTestDoc.txt", test_return_value["body"])
    self.assertEqual(body.decode('ascii'),"Dear Unit Test Customer;\nUnit Test Doc Corpi")

Tests #2 and #3: Data not found error conditions

We can also test error conditions and handling, such as keys not found in the database.  For example, if a customer identifier is submitted, but does not exist in the database lookup, does the logic handle this and return a “Not Found” code of 404?

To test this in test #2, we add data to the mocked DynamoDB table, but then submit a customer identifier that is not in the database.

This test, and a similar test #3 for “Document Types not found”, are implemented in the example test code on GitHub.

Test #4: Validate the handler interface

As the application logic resides in independently tested functions, the Lambda handler function provides only interface validation and function call orchestration.  Therefore, the test for the handler validates that the event is parsed correctly, any functions are invoked as expected, and the return value is passed back.

To emulate the global resource variables and other functions, patch both the global resource classes and logic functions.

    @patch("src.sample_lambda.app.LambdaDynamoDBClass")
    @patch("src.sample_lambda.app.LambdaS3Class")
    @patch("src.sample_lambda.app.create_letter_in_s3")
    def test_lambda_handler_valid_event_returns_200(self,
                            patch_create_letter_in_s3 : MagicMock,
                            patch_lambda_s3_class : MagicMock,
                            patch_lambda_dynamodb_class : MagicMock
                            ):

Arrange for the test by setting return values for the patched objects.

patch_lambda_dynamodb_class.return_value = self.mocked_dynamodb_class
        patch_lambda_s3_class.return_value = self.mocked_s3_class

        return_value_200 = {"statusCode" : 200, "body":"OK"}
        patch_create_letter_in_s3.return_value = return_value_200

We need to provide event data when invoking the Lambda handler.  A good practice is to save test events as separate JSON files, rather than placing them inline as code. In the example project, test events are located in the folder “tests/events/”. During test execution, the event object is created from the JSON file using the utility function named load_sample_event_from_file.

test_event = self.load_sample_event_from_file("sampleEvent1")

Act by calling the lambda_handler function.

test_return_value = lambda_handler(event=test_event, context=None)

Assert by ensuring the create_letter_in_s3 function is called with the expected parameters based on the event, and a create_letter_in_s3 function return value is passed back to the caller.  In our example, this value is simply passed with no alterations.

patch_create_letter_in_s3.assert_called_once_with(
                                        dynamo_db=self.mocked_dynamodb_class,
                                        s3=self.mocked_s3_class,
                                        doc_type=test_event["pathParameters"]["docType"],
                                        cust_id=test_event["pathParameters"]["customerId"])

       self.assertEqual(test_return_value, return_value_200)

Tear Down

The tearDown() method is called immediately after the test method has been run and the result is recorded.  In our example tearDown() method, we clean up any data or state created so the next test won’t be impacted.

Running the unit tests

The unittest Unit testing framework can be run using the Python pytest utility.  To ensure network isolation and verify the unit tests are not accidently connecting to AWS resources, the pytest-socket project provides the ability to disable network communication during a test.

pytest -v --disable-socket -s tests/unit/src/

The pytest command results in a PASSED or FAILED status for each test.  A PASSED status verifies that your unit tests, as written, did not encounter errors or issues,

Conclusion

Unit testing is a software development process in which different parts of an application, called units, are individually and independently tested. Tests validate the quality of the code and confirm that it functions as expected. Other developers can gain familiarity with your code base by consulting the tests. Unit tests reduce future refactoring time, help engineers get up to speed on your code base more quickly, and provide confidence in the expected behaviour.

We’ve seen in this blog how to unit test AWS Lambda functions and mock AWS Services to isolate and test individual logic within our code.

AWS Lambda Powertools for Python has been used in the project to validate hander events.   Powertools provide a suite of utilities for AWS Lambda functions to ease adopting best practices such as tracing, structured logging, custom metrics, idempotency, batching, and more.

Learn more about AWS Lambda testing in our prescriptive test guidance, and find additional test examples on GitHub.  For more serverless learning resources, visit Serverless Land.

About the authors:

Tom Romano

Tom Romano is a Solutions Architect for AWS World Wide Public Sector from Tampa, FL, and assists GovTech and EdTech customers as they create new solutions that are cloud-native, event driven, and serverless. He is an enthusiastic Python programmer for both application development and data analytics. In his free time, Tom flies remote control model airplanes and enjoys vacationing with his family around Florida and the Caribbean.

Kevin Hakanson

Kevin Hakanson is a Sr. Solutions Architect for AWS World Wide Public Sector based in Minnesota. He works with EdTech and GovTech customers to ideate, design, validate, and launch products using cloud-native technologies and modern development practices. When not staring at a computer screen, he is probably staring at another screen, either watching TV or playing video games with his family.

Realtime monitoring of microservices and cloud-native applications with IBM Instana SaaS on AWS

Post Syndicated from Eduardo Monich Fronza original https://aws.amazon.com/blogs/architecture/realtime-monitoring-of-microservices-and-cloud-native-applications-with-ibm-instana-saas-on-aws/

Customers are adopting microservices architecture to build innovative and scalable applications on Amazon Web Services (AWS). These microservices applications are deployed across multiple AWS services, and customers are looking for comprehensive observability solutions that can help them effectively monitor and manage the performance of their applications in real-time.

IBM Instana is a fully automated application performance management (APM) solution, available to customers as a fully managed software as a service (SaaS) solution on AWS. It is specifically designed to help customers address the challenges of monitoring microservices and cloud-native applications in real-time. It uses artificial intelligence and machine learning to provide detailed insights into the health and behavior of applications, allowing developers and IT teams to gain real-time insights into their microservices applications, optimize performance, and quickly identify and troubleshoot issues.

This post explains the capabilities of IBM Instana to automatically collect observability metrics, traces, and events from microservices deployed on AWS cloud, as well as on-premises, to provide full visibility into the performance of individual components and applications as a whole.

IBM Instana solution overview

IBM Instana is designed to be highly scalable and adaptable to changing microservices applications environments. Its architecture (Figure 1) consists of several components that work together to provide comprehensive monitoring for microservices and cloud-native applications.

Instana’s main building blocks are host agents and agent sensors that are deployed in a customer’s AWS account and responsible for collecting, aggregating, and sending detailed monitoring information of applications and AWS services to the Instana SaaS backend.

The Instana SaaS backend services provide several key components, including data collectors, storage services, analytics engines, and user interfaces. It allows customers to process and analyze data in real-time, generate actionable insights, have a comprehensive view of their applications and infrastructure performance, enabling them to quickly identify and resolve issues and improve their overall operations.

IBM Instana architecture on AWS

Figure 1. IBM Instana architecture on AWS

Monitoring data

Instana monitors and observes microservices and cloud-native applications by collecting beacons, traces, and one-second metrics:

  • Beacons are small monitoring payloads that are transmitted by a JavaScript agent to the Instana servers, modeling specific events occurring within the lifecycle of a page view of a website; for example, page loading, resource retrieval, and HTTP requests.
  • Traces are detailed records of the requests and transactions that flow through a microservice architecture. They record the sequence of events that occur when a request is processed, including the services that are involved, the duration of each service, and any errors or exceptions that occur. Instana automatically correlates traces across services to provide a complete view of an entire transaction. This allows for easy identification and diagnosis of performance issues.
  • Metrics are numerical values that represent the performance and resource utilization of a microservice or infrastructure component. Metrics are collected by Instana Agents and sent to the Instana backend at regular intervals. Instana Agents collect hundreds of different metrics, including (but not limited to) CPU usage, memory usage, network traffic, and disk I/O.

This information is captured by Instana agents and sensors, which also collect application configurations and events, plus discover application building blocks, including clusters, containers, and services.

IBM Instana agents and sensors

The Instana host agent is a lightweight software component that collects and aggregates data from various sensors before sending the data to the Instana backend. It can be deployed to AWS services, including Amazon Elastic Compute Cloud (Amazon EC2), Amazon Elastic Kubernetes Service (Amazon EKS), AWS Fargate, AWS Lambda, or Red Hat OpenShift Service on AWS (ROSA). A single host agent, one per host, is used to collect data from monitored systems.

Once Instana agents are running, they automatically detect applications and services, such as containers running on Amazon EKS, and processes like Nginx, NodeJS, Spring Boot, Postgres, Elasticsearch, or Cassandra. For each component detected, different Instana sensors are automatically downloaded, installed, and configured to monitor the environment.

Instana sensors are small programs that are designed to attach and monitor one specific technology and pass their data to the agent. They are automatically managed, updated, loaded, and unloaded by the host agent.

These sensors can monitor several different AWS services like Lambda, Amazon DynamoDB, Amazon Simple Storage Service (Amazon S3), Amazon Aurora, Amazon Simple Queue Service, and Amazon Managed Streaming for Apache Kafka. They collect data—like request and error rates, latency, CPU utilization—via AWS APIs and Amazon CloudWatch.

Instana also provides sensors to collect data from applications running on AWS, like IBM MQ, IBM Db2, or Red Hat OpenShift Container Platform. Review IBM’s full list of supported technologies and AWS services.

Instana also provides tracers, which are used with runtimes like Java, .NET, NodeJS, plus others. They modify code execution to capture logs, traces at request level, and send those back to the Instana agent.

With the use of sensors, the host agent collects configuration data and monitors the applications it has detected. The host agent also handles communications with the Instana SaaS backend services. It collects, aggregates and sends logs, traces and records metrics (such as response times, error rates, and resource utilization) every second to the Instana SaaS backend in real-time, using secure and efficient communication protocols.

IBM Instana SaaS

The Instana SaaS backend is the heart of the Instana APM solution and responsible for processing, storing, and analyzing the monitoring data collected from the Instana agents and sensors installed in the customer’s infrastructure.

It consists of several components and services that work together to provide real-time monitoring and analysis of microservices applications, including:

  • Data collectors: Receive and process data from the Instana agents and sensors, and store it in the Instana backend for further analysis.
  • Analytics engine: Analyzes the data collected by the agents and sensors to provide insights into the performance and health of the microservices applications.
  • User interface: Web-based interface that customers use to view and analyze their monitoring data.
  • Alerting engine: Generates alerts when thresholds or anomalies are detected in the monitoring data.
  • Data storage: Time-series database that stores the monitoring data collected by the agents and sensors. Allows customers to query and analyze the data in real-time.
  • Integrations: Integrates with various third-party tools, such as Slack, PagerDuty, and ServiceNow, providing seamless alerting and incident management.

IBM Instana backend: making sense of the situation in real time

The Instana SaaS platform automatically ingests data from agents and continuously updates a dependency map (Figure 2). This map presents every dependency in context, giving users an easy way to understand the interrelationships between application components and services.

This understanding enables users to identify the upstream and downstream impacts of any issue, ensuring that they stay informed about any potential impacts.

An example of an IBM Instana dependency map

Figure 2. An example of an IBM Instana dependency map

Instana traces every request end-to-end without sampling. The traces are analyzed in real-time, providing metrics that make any performance problems immediately visible. In the event of an incident, Instana can illustrate how a single issue can generate a ripple effect and impact a number of directly and indirectly connected services. Using the relationship information from the Dynamic Graph, Instana’s automatic root-cause analysis can precisely aggregate the individual issues into a single incident.

Applications monitoring with IBM Instana

Figure 3. Applications monitoring with IBM Instana

Developers, IT operations, or site reliability engineers (SREs) can access the Instana backend end-user monitoring interface (Figure 3) or end-user monitoring (EUM) interface (Figure 4) to view monitoring data of their workloads. These can be websites, mobile applications, AWS services, and infrastructure levels. From this UI, these personas can access service dashboards that show key performance indicators (KPIs), like response time and error rate.

End-user monitoring with IBM Instana

Figure 4. End-user monitoring with IBM Instana

The following actions demonstrate how an EUM for a JavaScript application, deployed to Amazon S3 can be completed:

  • Developers inject Instana JavaScript code (Figure 5) into the static website (HTML).
  • When a user visits the website, the JavaScript agent sends beacons to the Instana backend.
  • Dashboards show specific events of the website lifecycle, including page loading, JS errors, and HTTP requests.
  • Teams access Instana UI to check performance matrices. They can configure Smart Alerts with custom alerting policies based on specific metrics and KPIs.
  • Smart Alerts can send alerts via various channels, such as email, Slack, or IBM Watson AIOps Webhook.
  • In case of an incident, teams can use Instana to retrieve various performance metrics for root-cause analysis.
  • Developers can resolve the issues and apply the patch.
IBM Instana EUM JavaScript agent

Figure 5. IBM Instana EUM JavaScript agent

Instana also offers Smart Alerts (Figure 6) to provide a more intuitive process of managing alerts. With Smart Alerts, customers can automatically generate alerting configurations using relevant KPIs and automatic threshold detection for use cases like website slowness or website errors.

IBM Instana Smart Alerts

Figure 6. IBM Instana Smart Alerts

Conclusion

In this post, we discussed how IBM Instana provides a comprehensive monitoring solution with the right tools to help you implement a real-time observability and monitoring solution. It allows you to gain insight into your microservices and cloud-native applications, including visibility into AWS services, containers, on-premises infrastructure, and other technologies. Instana can quickly identify and resolve issues before they impact end-users, ensuring that your applications are performing optimally.

As an IT administrator, developer, or business owner, IBM Instana on AWS give a deeper understanding of your applications and help you make data-driven decisions to improve overall performance.

Additional resources

Implementing an event-driven serverless story generation application with ChatGPT and DALL-E

Post Syndicated from David Boyne original https://aws.amazon.com/blogs/compute/implementing-an-event-driven-serverless-story-generation-application-with-chatgpt-and-dall-e/

This post demonstrates how to integrate AWS serverless services with artificial intelligence (AI) technologies, ChatGPT, and DALL-E. This full stack event-driven application showcases a method of generating unique bedtime stories for children by using predetermined characters and scenes as a prompt for ChatGPT.

Every night at bedtime, the serverless scheduler triggers the application, initiating an event-driven workflow to create and store new unique AI-generated stories with AI-generated images and supporting audio.

These datasets are used to showcase the story on a custom website built with Next.js hosted with AWS App Runner. After the story is created, a notification is sent to the user containing a URL to view and read the story to the children.

Example notification of a story hosted with Next.js and App Runner

Example notification of a story hosted with Next.js and App Runner

By integrating AWS services with AI technologies, you can now create new and innovative ideas that were previously unimaginable.

The application mentioned in this blog post demonstrates examples of point-to-point messaging with Amazon EventBridge pipes, publish/subscribe patterns with Amazon EventBridge and reacting to change data capture events with DynamoDB Streams.

Understanding the architecture

The following image shows the serverless architecture used to generate stories:

Architecture diagram for Serverless bed time story generation with ChatGPT and DALL-E

Architecture diagram for Serverless bed time story generation with ChatGPT and DALL-E

A new children’s story is generated every day at configured time using Amazon EventBridge Scheduler (Step 1). EventBridge Scheduler is a service capable of scaling millions of schedules with over 200 targets and over 6000 API calls. This example application uses EventBridge scheduler to trigger an AWS Lambda function every night at the same time (7:15pm). The Lambda function is triggered to start the generation of the story.

EventBridge scheduler triggers Lambda function every day at 7:15pm (bed time)

EventBridge scheduler triggers Lambda function every day at 7:15pm (bed time)

The “Scenes” and “Characters” Amazon DynamoDB tables contain the characters involved in the story and a scene that is randomly selected during its creation. As a result, ChatGPT receives a unique prompt each time. An example of the prompt may look like this:

“`
Write a title and a rhyming story on 2 main characters called Parker and Jackson. The story needs to be set within the scene haunted woods and be at least 200 words long

“`

After the story is created, it is then saved in the “Stories” DynamoDB table (Step 2).

Scheduler triggering Lambda function to generate the story and store story into DynamoDB

Scheduler triggering Lambda function to generate the story and store story into DynamoDB

Once the story is created this initiates a change data capture event using DynamoDB Streams (Step 3). This event flows through point-to-point messaging with EventBridge pipes and directly into EventBridge. Input transforms are then used to convert the DynamoDB Stream event into a custom EventBridge event, which downstream consumers can comprehend. Adopting this pattern is beneficial as it allows us to separate contracts from the DynamoDB event schema and not having downstream consumers conform to this schema structure, this mapping allows us to remain decoupled from implementation details.

EventBridge Pipes connecting DynamoDB streams directly into EventBridge.

EventBridge Pipes connecting DynamoDB streams directly into EventBridge.

Upon triggering the StoryCreated event in EventBridge, three targets are triggered to carry out several processes (Step 4). Firstly, AI Images are processed, followed by the creation of audio for the story. Finally, the end user is notified of the completed story through Amazon SNS and email subscriptions. This fan-out pattern enables these tasks to be run asynchronously and in parallel, allowing for faster processing times.

EventBridge pub/sub pattern used to start async processing of notifications, audio, and images.

EventBridge pub/sub pattern used to start async processing of notifications, audio, and images.

An SNS topic is triggered by the `StoryCreated` event to send an email to the end user using email subscriptions (Step 6). The email consists of a URL with the id of the story that has been created. Clicking on the URL takes the user to the frontend application that is hosted with App Runner.

Using SNS to notify the user of a new story

Using SNS to notify the user of a new story

Example email sent to the user

Example email sent to the user

Amazon Polly is used to generate the audio files for the story (Step 6). Upon triggering the `StoryCreated` event, a Lambda function is triggered, and the story description is used and given to Amazon Polly. Amazon Polly then creates an audio file of the story, which is stored in Amazon S3. A presigned URL is generated and saved in DynamoDB against the created story. This allows the frontend application and browser to retrieve the audio file when the user views the page. The presigned URL has a validity of two days, after which it can no longer be accessed or listened to.

Lambda function to generate audio using Amazon Polly, store in S3 and update story with presigned URL

Lambda function to generate audio using Amazon Polly, store in S3 and update story with presigned URL

The `StoryCreated` event also triggers another Lambda function, which uses the OpenAI API to generate an AI image using DALL-E based on the generated story (Step 7). Once the image is generated, the image is downloaded and stored in Amazon S3. Similar to the audio file, the system generates a presigned URL for the image and saves it in DynamoDB against the story. The presigned URL is only valid for two days, after which it becomes inaccessible for download or viewing.

Lambda function to generate images, store in S3 and update story with presigned URL.

Lambda function to generate images, store in S3 and update story with presigned URL.

In the event of a failure in audio or image generation, the frontend application still loads the story, but does not display the missing image or audio at that moment. This ensures that the frontend can continue working and provide value. If you wanted more control and only trigger the user’s notification event once all parallel tasks are complete the aggregator messaging pattern can be considered.

Hosting the frontend Next.js application with AWS App Runner

Next.js is used by the frontend application to render server-side rendered (SSR) pages that can access the stories from the DynamoDB table, which are then hosted with AWS App Runner after being containerized.

Next.js application hosted with App Runner, with permissions into DynamoDB table.

Next.js application hosted with App Runner, with permissions into DynamoDB table.

AWS App Runner enables you to deploy containerized web applications and APIs securely, without needing any prior knowledge of containers or infrastructure. With App Runner, developers can concentrate on their application, while the service handles container startup, running, scaling, and load balancing. After deployment, App Runner provides a secure URL for clients to begin making HTTP requests against.

With App Runner, you have two primary options for deploying your container: source code connections or source images. Using source code connections grants App Runner permission to pull the image file directly from your source code, and with Automatic deployment configured, it can redeploy the application when changes are made. Alternatively, source images provide App Runner with the image’s location in an image registry, and this image is deployed by App Runner.

In this example application, CDK deploys the application using the DockerImageAsset construct with the App Runner construct. Once deployed, App Runner builds and uploads the frontend image to Amazon Elastic Container Registry (ECR) and deploys it. Downstream consumers can access the application using the secure URL provided by App Runner. In this example, the URL is used when the SNS notification is sent to the user when the story is ready to be viewed.

Giving the frontend container permission to DynamoDB table

To grant the Next.js application permission to obtain stories from the Stories DynamoDB table, App Runner instance roles are configured. These roles are optional and can provide the necessary permissions for the container to access AWS services required by the compute service.

If you want to learn more about AWS App Runner, you can explore the free workshop.

Design choices and assumptions

The DynamoDB Time to Live (TTL) feature is ideal for the short-lived nature of daily generated stories. DynamoDB handle the deletion of stories after two days by setting the TTL attribute on each story. Once a story is deleted, it becomes inaccessible through the generated story URLs.

Using Amazon S3 presigned URLs is a method to grant temporary access to a file in S3. This application creates presigned URLs for the audio file and generated images that last for 2 days, after which the URLs for the S3 items become invalid.

Input transforms are used between DynamoDB streams and EventBridge events to decouple the schemas and events consumed by downstream targets. Consuming the events as they are is known as the “conformist” pattern, and couples us to implementation details of DynamoDB streams with downstream EventBridge consumers. This allows the application to remain decoupled from implementation details and remain flexible.

Conclusion

The adoption of artificial intelligence (AI) technology has significantly increased in various industries. ChatGPT, a large language model that can understand and generate human-like responses in natural language, and DALL-E, an image generation system that can create realistic images based on textual descriptions, are examples of such technology. These systems have demonstrated the potential for AI to provide innovative solutions and transform the way we interact with technology.

This blog post explores ways in which you can utilize AWS serverless services with ChatGTP and DALL-E to create a story generation application fronted by a Next.js application hosted with App Runner. EventBridge Scheduler is used to trigger the story creation process then react to change data capture events with DynamoDB streams and EventBridge Pipes, and use Amazon EventBridge to fan out compute tasks to process notifications, images, and audio files.

You can find the documentation and the source code for this application in GitHub.

For more serverless learning resources, visit Serverless Land.