Tag Archives: Amazon DynamoDB

AWS Week in Review – Updates on Amazon FSx for NetApp ONTAP, AWS Lambda, eksctl, Karpetner, and More – July 17, 2023

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/aws-week-in-review-updates-on-amazon-fsx-for-netapp-ontap-aws-lambda-eksctl-karpetner-and-more-july-17-2023/

The Data Centered: Eastern Oregon, a five-part mini-documentary series looking at the real-life impact of the more than $15 billion investment AWS has made in the local community, and how the company supports jobs, generates economic growth, provides skills training and education, and unlocks opportunities for local businesses suppliers.

Last week, I watched a new episode introducing the Data Center Technician training program offered by AWS to train people with little or no previous technical experience in the skills they need to work in data centers and other information technology (IT) roles. This video reminded me of my first days of cabling and transporting servers in data centers. Remember, there are still people behind cloud computing.

Last Week’s Launches
Here are some launches that got my attention:

Amazon FSx for NetApp ONTAP Updates – Jeff Barr introduced Amazon FSx for NetApp ONTAP support for SnapLock, an ONTAP feature that gives you the power to create volumes that provide write once read many (WORM) functionality for regulatory compliance and ransomware protection. In addition, FSx for NetApp ONTAP now supports IPSec encryption of data in transit and two additional monitoring and troubleshooting capabilities that you can use to monitor file system events and diagnose network connectivity.

AWS Lambda detects and stops recursive loops in Lambda functions – In certain scenarios, due to resource misconfiguration or code defects, a processed event might be sent back to the same service or resource that invoked the Lambda function. This can cause an unintended recursive loop and result in unintended usage and costs for customers. With this launch, Lambda will stop recursive invocations between Amazon SQS, Lambda, and Amazon SNS after 16 recursive calls. For more information, refer to our documentation or the launch blog post.

Email notification

Amazon CloudFront supports for 3072-bit RSA certificates – You can now associate their 3072-bit RSA certificates with CloudFront distributions to enhance communication security between clients and CloudFront edge locations. To get started, associate a 3072-bit RSA certificate with your CloudFront distribution using console or APIs. There are no additional fees associated with this feature. For more information, please refer to the CloudFront Developer Guide.

Running GitHub Actions with AWS CodeBuild – Two weeks ago, AWS CodeBuild started to support GitHub Actions. You can now define GitHub Actions steps directly in the BuildSpec and run them alongside CodeBuild commands. Last week, the AWS DevOps Blog published the blog post about using the Liquibase GitHub Action for deploying changes to an Amazon Aurora database in a private subnet. You can learn how to integrate AWS CodeBuild and nearly 20,000 GitHub Actions developed by the open source community.

CodeBuild configuration showing the GitHub repository URL

Amazon DynamoDB local version 2.0 – You can develop and test applications by running Amazon DynamoDB local in your local development environment without incurring any additional costs. The new 2.0 version allows Java developers to use DynamoDB local to work with Spring Boot 3 and frameworks such as Spring Framework 6 and Micronaut Framework 4 to build modernized, simplified, and lightweight cloud-native applications.

For a full list of AWS announcements, be sure to keep an eye on the What’s New at AWS page.

Open Source Updates
Last week, we introduced new open source projects and significant roadmap contributions to the Jupyter community.

New joint maintainership between Weaveworks and AWS for eksctl – Now the eksctl open source project has been moved from the Weaveworks GitHub organization to a new top level GitHub organization—eksctl-io—that will be jointly maintained by Weaveworks and AWS moving forward. The eksctl project can now be found on GitHub.

Karpenter now supports Windows containers – Karpenter is an open source flexible, high-performance Kubernetes node provisioning and management solution that you can use to quickly scale Amazon EKS clusters. With the launch of version 0.29.0, Karpenter extends the automated node provisioning support to Windows containers running on EKS. Read this blog post for a step-by-step guide on how to get started with Karpenter for Windows node groups.

Updates in Amazon Aurora and Amazon OpenSearch Service – Following the announcement of updates to the PostgreSQL database in May by the open source community, we’ve updated Amazon Aurora PostgreSQL-Compatible Edition to support PostgreSQL 15.3, 14.8, 13.11, 12.15, and 11.20. These releases contain product improvements and bug fixes made by the PostgreSQL community, along with Aurora-specific improvements. You can also run OpenSearch version 2.7 in Amazon OpenSearch Service. With OpenSearch 2.7 (also released in May), we’ve made several improvements to observability, security analytics, index management, and geospatial capabilities in OpenSearch Service.

To learn about weekly updates for open source at AWS, check out the latest AWS open source newsletter by Ricardo.

Upcoming AWS Events
Check your calendars and sign up for these AWS events:

AWS Storage Day on August 9 – Join a one-day virtual event that will help you to better understand AWS storage services and make the most of your data. Register today.

AWS Global Summits – Sign up for the AWS Summit closest to your city: Hong Kong (July 20), New York City (July 26), Taiwan (August 2-3), São Paulo (August 3), and Mexico City (August 30).

AWS Community Days – Join a community-led conference run by AWS user group leaders in your region: Malaysia (July 22), Philippines (July 29-30), Colombia (August 12), and West Africa (August 19).

AWS re:Invent 2023 – Join us to hear the latest from AWS, learn from experts, and connect with the global cloud community. Registration is now open.

You can browse all upcoming AWS-led in-person and virtual events, and developer-focused events such as AWS DevDay.

Take the AWS Blog Customer Survey
We’re focused on improving our content to provide a better customer experience, and we need your feedback to do so. Take our survey to share insights regarding your experience on the AWS Blog.

This survey is hosted by an external company. AWS handles your information as described in the AWS Privacy Notice. AWS will own the data gathered via this survey and will not share the information collected with survey respondents.

That’s all for this week. Check back next Monday for another Week in Review!

Channy

This post is part of our Week in Review series. Check back each week for a quick roundup of interesting news and announcements from AWS!

IBM Consulting creates innovative AWS solutions in French Hackathon

Post Syndicated from Diego Colombatto original https://aws.amazon.com/blogs/architecture/ibm-consulting-creates-innovative-aws-solutions-in-french-hackathon/

In March 2023, IBM Consulting delivered an Innovation Hackathon in France, aimed at designing and building new innovative solutions for real customer use cases using the AWS Cloud.

In this post, we briefly explore six of the solutions considered and demonstrate the AWS architectures created and implemented during the Hackathon.

Hackathon solutions

Solution 1: Optimize digital channels monitoring and management for Marketing

Monitoring Marketing campaign impact can require a lot of effort, such as customers and competitors’ reactions on digital media channels. Digital campaign managers need this data to evaluate customer segment penetration and overall campaign effectiveness. Information can be collected via digital-channel API integrations or on the digital channel user interface (UI): digital-channel API integrations require frequent maintenance, while UI data collection can be labor-intensive.

On the AWS Cloud, IBM designed an augmented digital campaign manager solution, to assist digital campaign managers with digital-channel monitoring and management. This solution monitors social media APIs and, when APIs change, automatically updates the API integration, ensuring accurate information collection (Figure 1).

Optimize digital channels monitoring and management for Marketing

Figure 1. Optimize digital channels monitoring and management for Marketing

  1. Amazon Simple Storage Service (Amazon S3) and AWS Lambda are used to garner new digital estates, such as new social media APIs, and assess data quality.
  2. Amazon Kinesis Data Streams is used to decouple data ingestion from data query and storage.
  3. Lambda retrieves the required information from Amazon DynamoDB, like the most relevant brands; natural language processing (NLP) is applied to retrieved data, like URL, bio, about, verification status.
  4. Amazon S3 and Amazon CloudFront are used to present a dashboard where end-users can check, enrich, and validate collected data.
  5. When graph API calls detect an error/change, Lambda checks API documentation to update/correct the API call.
  6. A new Lambda function is generated, with updated API call.

Solution 2: 4th party logistics consulting service for a greener supply chain

Logistics companies have a wealth of trip data, both first- and third-party, and can leverage these data to provide new customer services, such as options for trips booking with optimized carbon footprint, duration, or costs.

IBM designed an AWS solution (Figure 2) enabling the customer to book goods transport by selecting from different route options, combining transport modes, selecting departure-location, arrival, cargo weight and carbon emissions. Proposed options include the greenest, fastest, and cheapest routes. Additionally, the user can provide financial and time constraints.

Optimized transport booking architecture

Figure 2. Optimized transport booking architecture

  1. User connects to web-app UI, hosted on Amazon S3.
  2. Amazon API Gateway receives user requests from web app; requests are forwarded to Lambda.
  3. Lambda calculates the best trip options based on the user’s prerequisites, such as carbon emissions.
  4. Lambda estimates carbon emissions; estimates are combined with trip options at Step 3.
  5. Amazon Neptune graph database is used to efficiently store and query trip data.
  6. Different Lambda instances are used to ingest data from on-premises data sources and send customer bookings through the customer ordering system.

Solution 3: Purchase order as a service

In the context of vendor-managed inventory and vendor-managed replenishment, inventory and logistics companies want to check on warehouse stock levels to identify the best available options for goods transport. Their objective is to optimize the availability of warehouse stock for order fulfillment; therefore, when a purchase order (PO) is received, required goods are identified as available in the correct warehouse, enabling swift delivery with minimal lead time and costs.

IBM designed an AWS PO as a service solution (Figure 3), using warehouse data to forecast future customer’s POs. Based on this forecast, the solution plans and optimizes warehouse goods availability and, hence, logistics required for the PO fulfillment.

Purchase order as a service AWS solution

Figure 3. Purchase order as a service AWS solution

  1. AWS Amplify provides web-mobile UI where users can set constraints (such as warehouse capacity, minimum/maximum capacity) and check: warehouses’ states, POs in progress. Additionally, UI proposes possible optimized POs, which are automatically generated by the solution. If the user accepts one of these solution-generated POs, the user will benefit from optimized delivery time, costs and carbon-footprint.
  2. Lambda receives Amazon Forecast inferences and reads/writes PO information on Amazon DynamoDB.
  3. Forecast provides inferences regarding the most probable future POs. Forecast uses POs, warehouse data, and goods delivery data to automatically train a machine learning (ML) model that is used to generate forecast inferences.
  4. Amazon DynamoDB stores PO and warehouse information.
  5. Lambda pushes PO, warehouse, and goods delivery data from Amazon DynamoDB into Amazon S3. These data are used in the Forecast ML-model re-train, to ensure high quality forecasting inferences.

Solution 4: Optimize environmental impact associated with engineers’ interventions for customer fiber connections

Telco companies that provide end-users’ internet connections need engineers executing field tasks, like deploying, activating, and repairing subscribers’ lines. In this scenario, it’s important to identify the most efficient engineers’ itinerary.

IBM designed an AWS solution that automatically generates engineers’ itineraries that consider criteria such as mileage, carbon-emission generation, and electric-/thermal-vehicle availability.

The solution (Figure 4) provides:

  • Customer management teams with a mobile dashboard showing carbon-emissions estimates for all engineers’ journeys, both in-progress and planned
  • Engineers with a mobile application including an optimized itinerary, trip updates based on real time traffic, and unexpected events
AWS telco solution for greener customer service

Figure 4. AWS telco solution for greener customer service

  1. Management team and engineers connect to web/mobile application, respectively. Amazon Cognito provides authentication and authorization, Amazon S3 stores application static content, and API Gateway receives and forwards API requests.
  2. AWS Step Functions implements different workflows. Application logic is implemented in Lambda, which connects to DynamoDB to get trip data (current route and driver location); Amazon Location Service provides itineraries, and Amazon SageMaker ML model implements itinerary optimization engine.
  3. Independently from online users, trip data are periodically sent to API Gateway and stored in Amazon S3.
  4. SageMaker notebook periodically uses Amazon S3 data to re-train the trip optimization ML model with updated data.

Solution 5: Improve the effectiveness of customer SAP level 1 support by reducing response times for common information requests

Companies using SAP usually provide first-level support to their internal SAP users. SAP users engage the support (usually via ticketing system) to ask for help when facing SAP issues or to request additional information. A high number of information requests requires significant effort to retrieve and provide the available information on resources like SAP notes/documentation or similar support requests.

IBM designed an AWS solution (Figure 5), based on support request information, that can automatically provide a short list of most probable solutions with a confidence score.

SAP customer support solution

Figure 5. SAP customer support solution

  1. Lambda receives ticket information, such as ticket number, business service, and description.
  2. Lambda processes ticket data and Amazon Translate translates text into country native-language and English.
  3. SageMaker ML model receives the question and provides the inference.
  4. If the inference has a high confidence score, Lambda provides it immediately as output.
  5. If the inference has a low confidence score, Amazon Kendra receives the question, searches automatically through indexed company information and provides the best answer available. Lambda then provides the answer as output.

Solution 6: Improve contact center customer experience providing faster and more accurate customer support

Insured customers often interact with insurer companies using contact centers, requesting information and services regarding their insurance policies.

IBM designed an AWS solution improving end-customer experience and contact center agent efficiency by providing automated customer-agent call/chat summarization. This enables:

  • The agent to quickly recall the customer need in following interactions
  • Contact center supervisor to quickly understand the objective of each case (intervening if necessary)
  • Insured customers to quickly have the information required, without repeating information already provided
Improving contact center customer experience

Figure 6. Improving contact center customer experience

Summarization capability is provided by generative AI, leveraging large language models (LLM) on SageMaker.

  1. Pretrained LLM model from Hugging Face is stored on Amazon S3.
  2. LLM model is fine-tuned and trained using Amazon SageMaker.
  3. LLM model is made available as SageMaker API endpoint, ready to provide inferences.
  4. Insured user contact customer support; the user request goes through voice/chatbot, then reaches Amazon Connect.
  5. Lambda queries the LLM model. The inference is provided by LLM and it’s sent to an Amazon Connect instance, where inference is enriched with knowledge-based search, using Amazon Connect Wisdom.
  6. If the user–agent conversation was a voice interaction (like a phone call), then the call recording is transcribed using Amazon Transcribe. Then, Lambda is called for summarization.

Conclusion

In this blog post, we have explored how IBM Consulting delivered an Innovation Hackathon in France. During the Hackathon, IBM worked backward from real customer use cases, designing and building innovative solutions using AWS services.

Reduce archive cost with serverless data archiving

Post Syndicated from Rostislav Markov original https://aws.amazon.com/blogs/architecture/reduce-archive-cost-with-serverless-data-archiving/

For regulatory reasons, decommissioning core business systems in financial services and insurance (FSI) markets requires data to remain accessible years after the application is retired. Traditionally, FSI companies either outsourced data archiving to third-party service providers, which maintained application replicas, or purchased vendor software to query and visualize archival data.

In this blog post, we present a more cost-efficient option with serverless data archiving on Amazon Web Services (AWS). In our experience, you can build your own cloud-native solution on Amazon Simple Storage Service (Amazon S3) at one-fifth of the price of third-party alternatives. If you are retiring legacy core business systems, consider serverless data archiving for cost-savings while keeping regulatory compliance.

Serverless data archiving and retrieval

Modern archiving solutions follow the principles of modern applications:

  • Serverless-first development, to reduce management overhead.
  • Cloud-native, to leverage native capabilities of AWS services, such as backup or disaster recovery, to avoid custom build.
  • Consumption-based pricing, since data archival is consumed irregularly.
  • Speed of delivery, as both implementation and archive operations need to be performed quickly to fulfill regulatory compliance.
  • Flexible data retention policies can be enforced in an automated manner.

AWS Storage and Analytics services offer the necessary building blocks for a modern serverless archiving and retrieval solution.

Data archiving can be implemented on top of Amazon S3) and AWS Glue.

  1. Amazon S3 storage tiers enable different data retention policies and retrieval service level agreements (SLAs). You can migrate data to Amazon S3 using AWS Database Migration Service; otherwise, consider another data transfer service, such as AWS DataSync or AWS Snowball.
  2. AWS Glue crawlers automatically infer both database and table schemas from your data in Amazon S3 and store the associated metadata in the AWS Glue Data Catalog.
  3. Amazon CloudWatch monitors the execution of AWS Glue crawlers and notifies of failures.

Figure 1 provides an overview of the solution.

Serverless data archiving and retrieval

Figure 1. Serverless data archiving and retrieval

Once the archival data is catalogued, Amazon Athena can be used for serverless data query operations using standard SQL.

  1. Amazon API Gateway receives the data retrieval requests and eases integration with other systems via REST, HTTPS, or WebSocket.
  2. AWS Lambda reads parametrization data/templates from Amazon S3 in order to construct the SQL queries. Alternatively, query templates can be stored as key-value entries in a NoSQL store, such as Amazon DynamoDB.
  3. Lambda functions trigger Athena with the constructed SQL query.
  4. Athena uses the AWS Glue Data Catalog to retrieve table metadata for the Amazon S3 (archival) data and to return the SQL query results.

How we built serverless data archiving

An early build-or-buy assessment compared vendor products with a custom-built solution using Amazon S3, AWS Glue, and a user frontend for data retrieval and visualization.

The total cost of ownership over a 10-year period for one insurance core system (Policy Admin System) was $0.25M to build and run the custom solution on AWS compared with >$1.1M for third-party alternatives. The implementation cost advantage of the custom-built solution was due to development efficiencies using AWS services. The lower run cost resulted from a decreased frequency of archival usage and paying only for what you use.

The data archiving solution was implemented with AWS services (Figure 2):

  1. Amazon S3 is used to persist archival data in Parquet format (optimized for analytics and compressed to reduce storage space) that is loaded from the legacy insurance core system. The archival data source was AS400/DB2 and moved with Informatica Cloud to Amazon S3.
  2. AWS Glue crawlers infer the database schema from objects in Amazon S3 and create tables in AWS Glue for the decommissioned application data.
  3. Lambda functions (Python) remove data records based on retention policies configured for each domain, such as customers, policies, claims, and receipts. A daily job (Control-M) initiates the retention process.
Exemplary implementation of serverless data archiving and retrieval for insurance core system

Figure 2. Exemplary implementation of serverless data archiving and retrieval for insurance core system

Retrieval operations are formulated and executed via Python functions in Lambda. The following AWS resources implement the retrieval logic:

  1. Athena is used to run SQL queries over the AWS Glue tables for the decommissioned application.
  2. Lambda functions (Python) build and execute queries for data retrieval. The functions render HMTL snippets using Jinja templating engine and Athena query results, returning the selected template filled with the requested archive data. Using Jinja as templating engine improved the speed of delivery and reduced the heavy lifting of frontend and backend changes when modeling retrieval operations by ~30% due to the decoupling between application layers. As a result, engineers only need to build an Athena query with the linked Jinja template.
  3. Amazon S3 stores templating configuration and queries (JSON files) used for query parametrization.
  4. Amazon API Gateway serves as single point of entry for API calls.

The user frontend for data retrieval and visualization is implemented as web application using React JavaScript library (with static content on Amazon S3) and Amazon CloudFront used for web content delivery.

The archiving solution enabled 80 use cases with 60 queries and reduced storage from three terabytes on source to only 35 gigabytes on Amazon S3. The success of the implementation depended on the following key factors:

  • Appropriate sponsorship from business across all areas (claims, actuarial, compliance, etc.)
  • Definition of SLAs for responding to courts, regulators, etc.
  • Minimum viable and mandatory approach
  • Prototype visualizations early on (fail fast)

Conclusion

Traditionally, FSI companies relied on vendor products for data archiving. In this post, we explored how to build a scalable solution on Amazon S3 and discussed key implementation considerations. We have demonstrated that AWS services enable FSI companies to build a serverless archiving solution while reaching and keeping regulatory compliance at a lower cost.

Learn more about some of the AWS services covered in this blog:

Build an Amazon Redshift data warehouse using an Amazon DynamoDB single-table design

Post Syndicated from Altaf Hussain original https://aws.amazon.com/blogs/big-data/build-an-amazon-redshift-data-warehouse-using-an-amazon-dynamodb-single-table-design/

Amazon DynamoDB is a fully managed NoSQL service that delivers single-digit millisecond performance at any scale. It’s used by thousands of customers for mission-critical workloads. Typical use cases for DynamoDB are an ecommerce application handling a high volume of transactions, or a gaming application that needs to maintain scorecards for players and games. In traditional databases, we would model such applications using a normalized data model (entity-relation diagram). This approach comes with a heavy computational cost in terms of processing and distributing the data across multiple tables while ensuring the system is ACID-compliant at all times, which can negatively impact performance and scalability. If these entities are frequently queried together, it makes sense to store them in a single table in DynamoDB. This is the concept of single-table design. Storing different types of data in a single table allows you to retrieve multiple, heterogeneous item types using a single request. Such requests are relatively straightforward, and usually take the following form:

SELECT * FROM TABLE WHERE Some_Attribute = 'some_value'

In this format, some_attribute is a partition key or part of an index.

Nonetheless, many of the same customers using DynamoDB would also like to be able to perform aggregations and ad hoc queries against their data to measure important KPIs that are pertinent to their business. Suppose we have a successful ecommerce application handling a high volume of sales transactions in DynamoDB. A typical ask for this data may be to identify sales trends as well as sales growth on a yearly, monthly, or even daily basis. These types of queries require complex aggregations over a large number of records. A key pillar of AWS’s modern data strategy is the use of purpose-built data stores for specific use cases to achieve performance, cost, and scale. Deriving business insights by identifying year-on-year sales growth is an example of an online analytical processing (OLAP) query. These types of queries are suited for a data warehouse.

The goal of a data warehouse is to enable businesses to analyze their data fast; this is important because it means they are able to gain valuable insights in a timely manner. Amazon Redshift is fully managed, scalable, cloud data warehouse. Building a performant data warehouse is non-trivial because the data needs to be highly curated to serve as a reliable and accurate version of the truth.

In this post, we walk through the process of exporting data from a DynamoDB table to Amazon Redshift. We discuss data model design for both NoSQL databases and SQL data warehouses. We begin with a single-table design as an initial state and build a scalable batch extract, load, and transform (ELT) pipeline to restructure the data into a dimensional model for OLAP workloads.

DynamoDB table example

We use an example of a successful ecommerce store allowing registered users to order products from their website. A simple ERD (entity-relation diagram) for this application will have four distinct entities: customers, addresses, orders, and products. For customers, we have information such as their unique user name and email address; for the address entity, we have one or more customer addresses. Orders contain information regarding the order placed, and the products entity provides information about the products placed in an order. As we can see from the following diagram, a customer can place one or more orders, and an order must contain one or more products.

We could store each entity in a separate table in DynamoDB. However, there is no way to retrieve customer details alongside all the orders placed by the customer without making multiple requests to the customer and order tables. This is inefficient from both a cost and performance perspective. A key goal for any efficient application is to retrieve all the required information in a single query request. This ensures fast, consistent performance. So how can we remodel our data to avoid making multiple requests? One option is to use single-table design. Taking advantage of the schema-less nature of DynamoDB, we can store different types of records in a single table in order to handle different access patterns in a single request. We can go further still and store different types of values in the same attribute and use it as a global secondary index (GSI). This is called index overloading.

A typical access pattern we may want to handle in our single table design is to get customer details and all orders placed by the customer.

To accommodate this access pattern, our single-table design looks like the following example.

By restricting the number of addresses associated with a customer, we can store address details as a complex attribute (rather than a separate item) without exceeding the 400 KB item size limit of DynamoDB.

We can add a global secondary index (GSIpk and GSIsk) to capture another access pattern: get order details and all product items placed in an order. We use the following table.

We have used generic attribute names, PK and SK, for our partition key and sort key columns. This is because they hold data from different entities. Furthermore, the values in these columns are prefixed by generic terms such as CUST# and ORD# to help us identify the type of data we have and ensure that the value in PK is unique across all records in the table.

A well-designed single table will not only reduce the number of requests for an access pattern, but will service many different access patterns. The challenge comes when we need to ask more complex questions of our data, for example, what was the year-on-year quarterly sales growth by product broken down by country?

The case for a data warehouse

A data warehouse is ideally suited to answer OLAP queries. Built on highly curated structured data, it provides the flexibility and speed to run aggregations across an entire dataset to derive insights.

To house our data, we need to define a data model. An optimal design choice is to use a dimensional model. A dimension model consists of fact tables and dimension tables. Fact tables store the numeric information about business measures and foreign keys to the dimension tables. Dimension tables store descriptive information about the business facts to help understand and analyze the data better. From a business perspective, a dimension model with its use of facts and dimensions can present complex business processes in a simple-to-understand manner.

Building a dimensional model

A dimensional model optimizes read performance through efficient joins and filters. Amazon Redshift automatically chooses the best distribution style and sort key based on workload patterns. We build a dimensional model from the single DynamoDB table based on the following star schema.

We have separated each item type into individual tables. We have a single fact table (Orders) containing the business measures price and numberofitems, and foreign keys to the dimension tables. By storing the price of each product in the fact table, we can track price fluctuations in the fact table without continually updating the product dimension. (In a similar vein, the DynamoDB attribute amount is a simple derived measure in our star schema: amount is the summation of product prices per orderid).

By splitting the descriptive content of our single DynamoDB table into multiple Amazon Redshift dimension tables, we can remove redundancy by only holding in each dimension the information pertinent to it. This allows us the flexibility to query the data under different contexts; for example, we may want to know the frequency of customer orders by city or product sales by date. The ability to freely join dimensions and facts when analyzing the data is one of the key benefits of dimensional modeling. It’s also good practice to have a Date dimension to allow us to perform time-based analysis by aggregating the fact by year, month, quarter, and so forth.

This dimensional model will be built in Amazon Redshift. When setting out to build a data warehouse, it’s a common pattern to have a data lake as the source of the data warehouse. The data lake in this context serves a number of important functions:

  • It acts as a central source for multiple applications, not just exclusively for data warehousing purposes. For example, the same dataset could be used to build machine learning (ML) models to identify trends and predict sales.
  • It can store data as is, be it unstructured, semi-structured, or structured. This allows you to explore and analyze the data without committing upfront to what the structure of the data should be.
  • It can be used to offload historical or less-frequently-accessed data, allowing you to manage your compute and storage costs more effectively. In our analytic use case, if we are analyzing quarterly growth rates, we may only need a couple of years’ worth of data; the rest can be unloaded into the data lake.

When querying a data lake, we need to consider user access patterns in order to reduce costs and optimize query performance. This is achieved by partitioning the data. The choice of partition keys will depend on how you query the data. For example, if you query the data by customer or country, then they are good candidates for partition keys; if you query by date, then a date hierarchy can be used to partition the data.

After the data is partitioned, we want to ensure it’s held in the right format for optimal query performance. The recommended choice is to use a columnar format such as Parquet or ORC. Such formats are compressed and store data column-wise, allowing for fast retrieval times, and are parallelizable, allowing for fast load times when moving the data into Amazon Redshift. In our use case, it makes sense to store the data in a data lake with minimal transformation and formatting to enable easy querying and exploration of the dataset. We partition the data by item type (Customer, Order, Product, and so on), and because we want to easily query each entity in order to move the data into our data warehouse, we transform the data into the Parquet format.

Solution overview

The following diagram illustrates the data flow to export data from a DynamoDB table to a data warehouse.

We present a batch ELT solution using AWS Glue for exporting data stored in DynamoDB to an Amazon Simple Storage Service (Amazon S3) data lake and then a data warehouse built in Amazon Redshift. AWS Glue is a fully managed extract, transform, and load (ETL) service that allows you to organize, cleanse, validate, and format data for storage in a data warehouse or data lake.

The solution workflow has the following steps:

  1. Move any existing files from the raw and data lake buckets into corresponding archive buckets to ensure any fresh export from DynamoDB to Amazon S3 isn’t duplicating data.
  2. Begin a new DynamoDB export to the S3 raw layer.
  3. From the raw files, create a data lake partitioned by item type.
  4. Load the data from the data lake to landing tables in Amazon Redshift.
  5. After the data is loaded, we take advantage of the distributed compute capability of Amazon Redshift to transform the data into our dimensional model and populate the data warehouse.

We orchestrate the pipeline using an AWS Step Functions workflow and schedule a daily batch run using Amazon EventBridge.

For simpler DynamoDB table structures you may consider skipping some of these steps by either loading data directly from DynamoDB to Redshift or using Redshift’s auto-copy or copy command to load data from S3.

Prerequisites

You must have an AWS account with a user who has programmatic access. For setup instructions, refer to AWS security credentials.

Use the AWS CloudFormation template cf_template_ddb-dwh-blog.yaml to launch the following resources:

  • A DynamoDB table with a GSI and point-in-time recovery enabled.
  • An Amazon Redshift cluster (we use two nodes of RA3.4xlarge).
  • Three AWS Glue database catalogs: raw, datalake, and redshift.
  • Five S3 buckets: two for the raw and data lake files; two for their respective archives, and one for the Amazon Athena query results.
  • Two AWS Identity and Access Management (IAM) roles: An AWS Glue role and a Step Functions role with the requisite permissions and access to resources.
  • A JDBC connection to Amazon Redshift.
  • An AWS Lambda function to retrieve the s3-prefix-list-id for your Region. This is required to allow traffic from a VPC to access an AWS service through a gateway VPC endpoint.
  • Download the following files to perform the ELT:
    • The Python script to load sample data into our DynamoDB table: load_dynamodb.py.
    • The AWS Glue Python Spark script to archive the raw and data lake files: archive_job.py.
    • The AWS Glue Spark scripts to extract and load the data from DynamoDB to Amazon Redshift: GlueSparkJobs.zip.
    • The DDL and DML SQL scripts to create the tables and load the data into the data warehouse in Amazon Redshift: SQL Scripts.zip.

Launch the CloudFormation template

AWS CloudFormation allows you to model, provision, and scale your AWS resources by treating infrastructure as code. We use the downloaded CloudFormation template to create a stack (with new resources).

  1. On the AWS CloudFormation console, create a new stack and select Template is ready.
  2. Upload the stack and choose Next.

  1. Enter a name for your stack.
  2. For MasterUserPassword, enter a password.
  3. Optionally, replace the default names for the Amazon Redshift database, DynamoDB table, and MasterUsername (in case these names are already in use).
  4. Reviewed the details and acknowledge that AWS CloudFormation may create IAM resources on your behalf.
  5. Choose Create stack.

Load sample data into a DynamoDB table

To load your sample data into DynamoDB, complete the following steps:

  1. Create an AWS Cloud9 environment with default settings.
  2. Upload the load DynamoDB Python script. From the AWS Cloud9 terminal, use the pip install command to install the following packages:
    1. boto3
    2. faker
    3. faker_commerce
    4. numpy
  3. In the Python script, replace all placeholders (capital letters) with the appropriate values and run the following command in the terminal:
python load_dynamodb.py

This command loads the sample data into our single DynamoDB table.

Extract data from DynamoDB

To extract the data from DynamoDB to our S3 data lake, we use the new AWS Glue DynamoDB export connector. Unlike the old connector, the new version uses a snapshot of the DynamoDB table and doesn’t consume read capacity units of your source DynamoDB table. For large DynamoDB tables exceeding 100 GB, the read performance of the new AWS Glue DynamoDB export connector is not only consistent but also significantly faster than the previous version.

To use this new export connector, you need to enable point-in-time recovery (PITR) for the source DynamoDB table in advance. This will take continuous backups of the source table (so be mindful of cost) and ensures that each time the connector invokes an export, the data is fresh. The time it takes to complete an export depends on the size of your table and how uniformly the data is distributed therein. This can range from a few minutes for small tables (up to 10 GiB) to a few hours for larger tables (up to a few terabytes). This is not a concern for our use case because data lakes and data warehouses are typically used to aggregate data at scale and generate daily, weekly, or monthly reports. It’s also worth noting that each export is a full refresh of the data, so in order to build a scalable automated data pipeline, we need to archive the existing files before beginning a fresh export from DynamoDB.

Complete the following steps:

  1. Create an AWS Glue job using the Spark script editor.
  2. Upload the archive_job.py file from GlueSparkJobs.zip.

This job archives the data files into timestamped folders. We run the job concurrently to archive the raw files and the data lake files.

  1. In Job details section, give the job a name and choose the AWS Glue IAM role created by our CloudFormation template.
  2. Keep all defaults the same and ensure maximum concurrency is set to 2 (under Advanced properties).

Archiving the files provides a backup option in the event of disaster recovery. As such, we can assume that the files will not be accessed frequently and can be kept in Standard_IA storage class so as to save up to 40% on costs while providing rapid access to the files when needed.

This job typically runs before each export of data from DynamoDB. After the datasets have been archived, we’re ready to (re)-export the data from our DynamoDB table.

We can use AWS Glue Studio to visually create the jobs needed to extract the data from DynamoDB and load into our Amazon Redshift data warehouse. We demonstrate how to do this by creating an AWS Glue job (called ddb_export_raw_job) using AWS Glue Studio.

  1. In AWS Glue Studio, create a job and select Visual with a blank canvas.
  2. Choose Amazon DynamoDB as the data source.

  1. Choose our DynamoDB table to export from.
  2. Leave all other options as is and finish setting up the source connection.

We then choose Amazon S3 as our target. In the target properties, we can transform the output to a suitable format, apply compression, and specify the S3 location to store our raw data.

  1. Set the following options:
    1. For Format, choose Parquet.
    2. For Compression type, choose Snappy.
    3. For S3 Target Location, enter the path for RawBucket (located on the Outputs tab of the CloudFormation stack).
    4. For Database, choose the value for GlueRawDatabase (from the CloudFormation stack output).
    5. For Table name, enter an appropriate name.

  1. Because our target data warehouse requires data to be in a flat structure, verify that the configuration option dynamodb.unnestDDBJson is set to True on the Script tab.

  1. On the Job details tab, choose the AWS Glue IAM role generated by the CloudFormation template.
  2. Save and run the job.

Depending on the data volumes being exported, this job may take a few minutes to complete.

Because we’ll be adding the table to our AWS Glue Data Catalog, we can explore the output using Athena after the job is complete. Athena is a serverless interactive query service that makes it simple to analyze data directly in Amazon S3 using standard SQL.

  1. In the Athena query editor, choose the raw database.

We can see that the attributes of the Address structure have been unnested and added as additional columns to the table.

  1. After we export the data into the raw bucket, create another job (called raw_to_datalake_job) using AWS Glue Studio (select Visual with a blank canvas) to load the data lake partitioned by item type (customer, order, and product).
  2. Set the source as the AWS Glue Data Catalog raw database and table.

  1. In the ApplyMapping transformation, drop the Address struct because we have already unnested these attributes into our flattened raw table.

  1. Set the target as our S3 data lake.

  1. Choose the AWS Glue IAM role in the job details, then save and run the job.

Now that we have our data lake, we’re ready to build our data warehouse.

Build the dimensional model in Amazon Redshift

The CloudFormation template launches a two-node RA3.4xlarge Amazon Redshift cluster. To build the dimensional model, complete the following steps:

  1. In Amazon Redshift Query Editor V2, connect to your database (default: salesdwh) within the cluster using the database user name and password authentication (MasterUserName and MasterUserPassword from the CloudFormation template).
  2. You may be asked to configure your account if this is your first time using Query Editor V2.
  3. Download the SQL scripts SQL Scripts.zip to create the following schemas and tables (run the scripts in numbered sequence).

In the landing schema:

  • address
  • customer
  • order
  • product

In the staging schema:

  • staging.address
  • staging.address_maxkey
  • staging.addresskey
  • staging.customer
  • staging.customer_maxkey
  • staging.customerkey
  • staging.date
  • staging.date_maxkey
  • staging.datekey
  • staging.order
  • staging.order_maxkey
  • staging.orderkey
  • staging.product
  • staging.product_maxkey
  • staging.productkey

In the dwh schema:

  • dwh.address
  • dwh.customer
  • dwh.order
  • dwh.product

We load the data from our data lake to the landing schema as is.

  1. Use the JDBC connector to Amazon Redshift to build an AWS Glue crawler to add the landing schema to our Data Catalog under the ddb_redshift database.

  1. Create an AWS Glue crawler with the JDBC data source.

  1. Select the JDBC connection you created and choose Next.

  1. Choose the IAM role created by the CloudFormation template and choose Next.

  1. Review your settings before creating the crawler.

The crawler adds the four landing tables in our AWS Glue database ddb_redshift.

  1. In AWS Glue Studio, create four AWS Glue jobs to load the landing tables (these scripts are available to download, and you can use the Spark script editor to upload these scripts individually to create the jobs):
    1. land_order_job
    2. land_product_job
    3. land_customer_job
    4. land_address_job

Each job has the structure as shown in the following screenshot.

  1. Filter the S3 source on the partition column type:
    1. For product, filter on type=‘product’.
    2. For order, filter on type=‘order’.
    3. For customer and address, filter on type=‘customer’.

  1. Set the target for the data flow as the corresponding table in the landing schema in Amazon Redshift.
  2. Use the built-in ApplyMapping transformation in our data pipeline to drop columns and, where necessary, convert the data types to match the target columns.

For more information about built-in transforms available in AWS Glue, refer to AWS Glue PySpark transforms reference.

The mappings for our four jobs are as follows:

  • land_order_job:
    mappings=[
    ("pk", "string", "pk", "string"),
    ("orderid", "string", "orderid", "string"),
    ("numberofitems", "string", "numberofitems", "int"),
    ("orderdate", "string", "orderdate", "timestamp"),
    ]

  • land_product_job:
    mappings=[
    ("orderid", "string", "orderid", "string"),
    ("category", "string", "category", "string"),
    ("price", "string", "price", "decimal"),
    ("productname", "string", "productname", "string"),
    ("productid", "string", "productid", "string"),
    ("color", "string", "color", "string"),
    ]

  • land_address_job:
    mappings=[
    ("username", "string", "username", "string"),
    ("email", "string", "email", "string"),
    ("fullname", "string", "fullname", "string"),
    ]

  • land_customer_job:
    mappings=[
    ("username", "string", "username", "string"),
    ("email", "string", "email", "string"),
    ("fullname", "string", "fullname", "string"),
    ]

  1. Choose the AWS Glue IAM role, and under Advanced properties, verify the JDBC connector to Amazon Redshift as a connection.
  2. Save and run each job to load the landing tables in Amazon Redshift.

Populate the data warehouse

From the landing schema, we move the data to the staging layer and apply the necessary transformations. Our dimensional model has a single fact table, the orders table, which is the largest table and as such needs a distribution key. The choice of key depends on how the data is queried and the size of the dimension tables being joined to. If you’re unsure of your query patterns, you can leave the distribution keys and sort keys for your tables unspecified. Amazon Redshift automatically assigns the correct distribution and sort keys based on your queries. This has the advantage that if and when query patterns change over time, Amazon Redshift can automatically update the keys to reflect the change in usage.

In the staging schema, we keep track of existing records based on their business key (the unique identifier for the record). We create key tables to generate a numeric identity column for each table based on the business key. These key tables allow us to implement an incremental transformation of the data into our dimensional model.

CREATE TABLE IF NOT EXISTS staging.productkey ( 
    productkey integer identity(1,1), 
    productid character varying(16383), 
    CONSTRAINT products_pkey PRIMARY KEY(productkey));   

When loading the data, we need to keep track of the latest surrogate key value to ensure that new records are assigned the correct increment. We do this using maxkey tables (pre-populated with zero):

CREATE TABLE IF NOT EXISTS staging.product_maxkey ( 
    productmaxkey integer);

INSERT INTO staging.product_maxkey
select 0;    

We use staging tables to store our incremental load, the structure of which will mirror our final target model in the dwh schema:

---staging tables to load data from data lake 
   
CREATE TABLE IF NOT EXISTS staging.product ( 
    productkey integer,
    productname character varying(200), 
    color character varying(50), 
    category character varying(100),
    PRIMARY KEY (productkey));
---dwh tables to load data from staging schema
     
CREATE TABLE IF NOT EXISTS dwh.product ( 
    productkey integer,
    productname character varying(200), 
    color character varying(50), 
    category character varying(100),
    PRIMARY KEY (productkey)); 

Incremental processing in the data warehouse

We load the target data warehouse using stored procedures to perform upserts (deletes and inserts performed in a single transaction):

CREATE OR REPLACE PROCEDURE staging.load_order() LANGUAGE plpgsql AS $$
DECLARE
BEGIN

TRUNCATE TABLE staging.order;

--insert new records to get new ids
insert into staging.orderkey
(
orderid
)
select
c.orderid
from landing.order c
LEFT JOIN staging.orderkey i
ON c.orderid=i.orderid
where i.orderid IS NULL;

--update the max key
update staging.order_maxkey
set ordermaxkey = (select max(orderkey) from staging.orderkey);


insert into staging.order
(
orderkey,
customerkey,
productkey,
addresskey,
datekey,
numberofitems,
price
)
select
xid.orderkey,
cid.customerkey,
pid.productkey,
aid.addresskey,
d.datekey,
o.numberofitems,
p.price
from
landing.order o
join staging.orderkey xid on o.orderid=xid.orderid
join landing.customer c on substring(o.pk,6,length(o.pk))=c.username   ---order table needs username
join staging.customerkey cid on cid.username=c.username
join landing.address a on a.username=c.username
join staging.addresskey aid on aid.pk=a.buildingnumber::varchar+'||'+a.postcode  ---maybe change pk to addressid
join staging.datekey d on d.orderdate=o.orderdate
join landing.product p on p.orderid=o.orderid
join staging.productkey pid on pid.productid=p.productid;

COMMIT;

END;
$$ 
CREATE OR REPLACE PROCEDURE dwh.load_order() LANGUAGE plpgsql AS $$
DECLARE
BEGIN

---delete old records 
delete from dwh.order
using staging.order as stage
where dwh.order.orderkey=stage.orderkey;

--insert new and modified
insert into dwh.order
(
orderkey,
customerkey,  
productkey,
addresskey,
price,
datekey  
)
select
orderkey,
customerkey,  
productkey,
addresskey,
price,
datekey
from staging.order;

COMMIT;
END;
$$

Use Step Functions to orchestrate the data pipeline

So far, we have stepped through each component in our workflow. We now need to stitch them together to build an automated, idempotent data pipeline. A good orchestration tool must manage failures, retries, parallelization, service integrations, and observability, so developers can focus solely on the business logic. Ideally, the workflow we build is also serverless so there is no operational overhead. Step Functions is an ideal choice to automate our data pipeline. It allows us to integrate the ELT components we have built on AWS Glue and Amazon Redshift and conduct some steps in parallel to optimize performance.

  1. On the Step Functions console, create a new state machine.
  2. Select Write your workflow in code.

  1. Enter the stepfunction_workflow.json code into the definition, replacing all placeholders with the appropriate values:
    1. [REDSHIFT-CLUSTER-IDENTIFIER] – Use the value for ClusterName (from the Outputs tab in the CloudFormation stack).
    2. [REDSHIFT-DATABASE] – Use the value for salesdwh (unless changed, this is the default database in the CloudFormation template).

We use the Step Functions IAM role from the CloudFormation template.

This JSON code generates the following pipeline.

Starting from the top, the workflow contains the following steps:

  1. We archive any existing raw and data lake files.
  2. We add two AWS Glue StartJobRun tasks that run sequentially: first to export the data from DynamoDB to our raw bucket, then from the raw bucket to our data lake.
  3. After that, we parallelize the landing of data from Amazon S3 to Amazon Redshift.
  4. We transform and load the data into our data warehouse using the Amazon Redshift Data API. Because this is asynchronous, we need to check the status of the runs before moving down the pipeline.
  5. After we move the data load from landing to staging, we truncate the landing tables.
  6. We load the dimensions of our target data warehouse (dwh) first, and finally we load our single fact table with its foreign key dependency on the preceding dimension tables.

The following figure illustrates a successful run.

After we set up the workflow, we can use EventBridge to schedule a daily midnight run, where the target is a Step Functions StartExecution API calling our state machine. Under the workflow permissions, choose Create a new role for this schedule and optionally rename it.

Query the data warehouse

We can verify the data has been successfully loaded into Amazon Redshift with a query.

After we have the data loaded into Amazon Redshift, we’re ready to answer the query asked at the start of this post: what is the year-on-year quarterly sales growth by product and country? The query looks like the following code (depending on your dataset, you may need to select alternative years and quarters):

with sales2021q2
as
(
  select d.year, d.quarter,a.country,p.category,sum(o.price) as revenue2021q2
  from dwh.order o
  join dwh.date d on o.datekey=d.datekey
  join dwh.product p on o.productkey=p.productkey
  join dwh.address a on a.addresskey=o.addresskey
  where d.year=2021 and d.quarter=2
  group by d.year, d.quarter,a.country,p.category
  ),
sales2022q2
as
(
  select d.year, d.quarter,a.country,p.category,sum(o.price) as revenue2022q2
  from dwh.order o
  join dwh.date d on o.datekey=d.datekey
  join dwh.product p on o.productkey=p.productkey
  join dwh.address a on a.addresskey=o.addresskey
  where d.year=2022 and d.quarter=2
  group by d.year, d.quarter,a.country,p.category
  )

select a.country,a.category, ((revenue2022q2 - revenue2021q2)/revenue2021q2)*100 as quarteronquartergrowth
from sales2022q2 a
join sales2021q2 b on a.country=b.country and a.category=b.category
order by a.country,a.category

We can visualize the results in Amazon Redshift Query Editor V2 by toggling the chart option and setting Type as Pie, Values as quarteronquartergrowth, and Labels as category.

Cost considerations

We give a brief outline of the indicative costs associated with the key services covered in our solution based on us-east-1 Region pricing using the AWS Pricing Calculator:

  • DynamoDB – With on-demand settings for 1.5 million items (average size of 355 bytes) and associated write and read capacity plus PITR storage, the cost of DynamoDB is approximately $2 per month.
  • AWS Glue DynamoDB export connector – This connector utilizes the DynamoDB export to Amazon S3 feature. This has no hourly cost—you only pay for the gigabytes of data exported to Amazon S3 ($0.11 per GiB).
  • Amazon S3 – You pay for storing objects in your S3 buckets. The rate you’re charged depends on your objects’ size, how long you stored the objects during the month, and the storage class. In our solution, we used S3 Standard for our data lake and S3 Standard – Infrequent Access for archive. Standard-IA storage is $0.0125 per GB/month; Standard storage is $0.023 per GB/month.
  • AWS Glue Jobs – With AWS Glue, you only pay for the time your ETL job takes to run. There are no resources to manage, no upfront costs, and you are not charged for startup or shutdown time. AWS charges you an hourly rate based on the number of Data Processing Units (DPUs) used to run your ETL job. A single DPU provides 4 vCPU and 16 GB of memory. Every one of our nine Spark jobs uses 10 DPUs and has an average runtime of 3 minutes. This gives an approximate cost of $0.29 per job.
  • Amazon Redshift – We provisioned two RA3.4xlarge nodes for our Amazon Redshift cluster. If run on-demand, each node costs $3.26 per hour. If utilized 24/7, our monthly cost would be approximately $4,759.60. You should evaluate your workload to determine what cost savings can be achieved by using Amazon Redshift Serverless or using Amazon Redshift provisioned reserved instances.
  • Step Functions – You are charged based on the number of state transitions required to run your application. Step Functions counts a state transition as each time a step of your workflow is run. You’re charged for the total number of state transitions across all your state machines, including retries. The Step Functions free tier includes 4,000 free state transitions per month. Thereafter, it’s $0.025 per 1,000 state transitions.

Clean up

Remember to delete any resources created through the CloudFormation stack. You first need to manually empty and delete the S3 buckets. Then you can delete the CloudFormation stack using the AWS CloudFormation console or AWS Command Line Interface (AWS CLI). For instructions, refer to Clean up your “hello, world!” application and related resources.

Summary

In this post, we demonstrated how you can export data from DynamoDB to Amazon S3 and Amazon Redshift to perform advanced analytics. We built an automated data pipeline that you can use to perform a batch ELT process that can be scheduled to run daily, weekly, or monthly and can scale to handle very large workloads.

Please leave your feedback or comments in the comments section.


About the Author

Altaf Hussain is an Analytics Specialist Solutions Architect at AWS. He helps customers around the globe design and optimize their big data and data warehousing solutions.


Appendix

To extract the data from DynamoDB and load it into our Amazon Redshift database, we can use the Spark script editor and upload the files from GlueSparkJobs.zip to create each individual job necessary to perform the extract and load. If you choose to do this, remember to update, where appropriate, the account ID and Region placeholders in the scripts. Also, on the Job details tab under Advanced properties, add the Amazon Redshift connection.

Join a streaming data source with CDC data for real-time serverless data analytics using AWS Glue, AWS DMS, and Amazon DynamoDB

Post Syndicated from Manish Kola original https://aws.amazon.com/blogs/big-data/join-streaming-source-cdc-glue/

Customers have been using data warehousing solutions to perform their traditional analytics tasks. Recently, data lakes have gained lot of traction to become the foundation for analytical solutions, because they come with benefits such as scalability, fault tolerance, and support for structured, semi-structured, and unstructured datasets.

Data lakes are not transactional by default; however, there are multiple open-source frameworks that enhance data lakes with ACID properties, providing a best of both worlds solution between transactional and non-transactional storage mechanisms.

Traditional batch ingestion and processing pipelines that involve operations such as data cleaning and joining with reference data are straightforward to create and cost-efficient to maintain. However, there is a challenge to ingest datasets, such as Internet of Things (IoT) and clickstreams, at a fast rate with near-real-time delivery SLAs. You will also want to apply incremental updates with change data capture (CDC) from the source system to the destination. To make data-driven decisions in a timely manner, you need to account for missed records and backpressure, and maintain event ordering and integrity, especially if the reference data also changes rapidly.

In this post, we aim to address these challenges. We provide a step-by-step guide to join streaming data to a reference table changing in real time using AWS Glue, Amazon DynamoDB, and AWS Database Migration Service (AWS DMS). We also demonstrate how to ingest streaming data to a transactional data lake using Apache Hudi to achieve incremental updates with ACID transactions.

Solution overview

For our example use case, streaming data is coming through Amazon Kinesis Data Streams, and reference data is managed in MySQL. The reference data is continuously replicated from MySQL to DynamoDB through AWS DMS. The requirement here is to enrich the real-time stream data by joining with the reference data in near-real time, and to make it queryable from a query engine such as Amazon Athena while keeping consistency. In this use case, reference data in MySQL can be updated when the requirement is changed, and then queries need to return results by reflecting updates in the reference data.

This solution addresses the issue of users wanting to join streams with changing reference datasets when the size of the reference dataset is small. The reference data is maintained in DynamoDB tables, and the streaming job loads the full table into memory for each micro-batch, joining a high-throughput stream to a small reference dataset.

The following diagram illustrates the solution architecture.

Architecture

Prerequisites

For this walkthrough, you should have the following prerequisites:

Create IAM roles and S3 bucket

In this section, you create an Amazon Simple Storage Service (Amazon S3) bucket and two AWS Identity and Access Management (IAM) roles: one for the AWS Glue job, and one for AWS DMS. We do this using an AWS CloudFormation template. Complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack::
  3. Choose Next.
  4. For Stack name, enter a name for your stack.
  5. For DynamoDBTableName, enter tgt_country_lookup_table. This is the name of your new DynamoDB table.
  6. For S3BucketNamePrefix, enter the prefix of your new S3 bucket.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  8. Choose Create stack.

Stack creation can take about 1 minute.

Create a Kinesis data stream

In this section, you create a Kinesis data stream:

  1. On the Kinesis console, choose Data streams in the navigation pane.
  2. Choose Create data stream.
  3. For Data stream name, enter your stream name.
  4. Leave the remaining settings as default and choose Create data stream.

A Kinesis data stream is created with on-demand mode.

Create and configure an Aurora MySQL cluster

In this section, you create and configure an Aurora MySQL cluster as the source database. First, configure your source Aurora MySQL database cluster to enable CDC through AWS DMS to DynamoDB.

Create a parameter group

Complete the following steps to create a new parameter group:

  1. On the Amazon RDS console, choose Parameter groups in the navigation pane.
  2. Choose Create parameter group.
  3. For Parameter group family, select aurora-mysql5.7.
  4. For Type, choose DB Cluster Parameter Group.
  5. For Group name, enter my-mysql-dynamodb-cdc.
  6. For Description, enter Parameter group for demo Aurora MySQL database.
  7. Choose Create.
  8. Select my-mysql-dynamodb-cdc, and choose Edit under Parameter group actions.
  9. Edit the parameter group as follows:
Name Value
binlog_row_image full
binlog_format ROW
binlog_checksum NONE
log_slave_updates 1
  1. Choose Save changes.

RDS parameter group

Create the Aurora MySQL cluster

Complete following steps to create the Aurora MySQL cluster:

  1. On the Amazon RDS console, choose Databases in the navigation pane.
  2. Choose Create database.
  3. For Choose a database creation method, choose Standard create.
  4. Under Engine options, for Engine type, choose Aurora (MySQL Compatible).
  5. For Engine version, choose Aurora (MySQL 5.7) 2.11.2.
  6. For Templates, choose Production.
  7. Under Settings, for DB cluster identifier, enter a name for your database.
  8. For Master username, enter your primary user name.
  9. For Master password and Confirm master password, enter your primary password.
  10. Under Instance configuration, for DB instance class, choose Burstable classes (includes t classes) and choose db.t3.small.
  11. Under Availability & durability, for Multi-AZ deployment, choose Don’t create an Aurora Replica.
  12. Under Connectivity, for Compute resource, choose Don’t connect to an EC2 compute resource.
  13. For Network type, choose IPv4.
  14. For Virtual private cloud (VPC), choose your VPC.
  15. For DB subnet group, choose your public subnet.
  16. For Public access, choose Yes.
  17. For VPC security group (firewall), choose the security group for your public subnet.
  18. Under Database authentication, for Database authentication options, choose Password authentication.
  19. Under Additional configuration, for DB cluster parameter group, choose the cluster parameter group you created earlier.
  20. Choose Create database.

Grant permissions to the source database

The next step is to grant the required permission on the source Aurora MySQL database. Now you can connect to the DB cluster using the MySQL utility. You can run queries to complete the following tasks:

  • Create a demo database and table and run queries on the data
  • Grant permission for a user used by the AWS DMS endpoint

Complete the following steps:

  1. Log in to the EC2 instance that you’re using to connect to your DB cluster.
  2. Enter the following command at the command prompt to connect to the primary DB instance of your DB cluster:
$ mysql -h mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com -P 3306 -u admin -p
  1. Run the following SQL command to create a database:
> CREATE DATABASE mydev;
  1. Run the following SQL command to create a table:
> use mydev; 
> CREATE TABLE country_lookup_table
(
code varchar(5),
countryname varchar(40) not null,
combinedname varchar(40) not null
);
  1. Run the following SQL command to populate the table with data:
> INSERT INTO country_lookup_table(code, countryname, combinedname) VALUES ('IN', 'India', 'IN-India'), ('US', 'USA', 'US-USA'), ('CA', 'Canada', 'CA-Canada'), ('CN', 'China', 'CN-China');
  1. Run the following SQL command to create a user for the AWS DMS endpoint and grant permissions for CDC tasks (replace the placeholder with your preferred password):
> CREATE USER repl IDENTIFIED BY '<your-password>';
> GRANT REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'repl'@'%';
> GRANT SELECT ON mydev.country_lookup_table TO 'repl'@'%';

Create and configure AWS DMS resources to load data into the DynamoDB reference table

In this section, you create and configure AWS DMS to replicate data into the DynamoDB reference table.

Create an AWS DMS replication instance

First, create an AWS DMS replication instance by completing the following steps:

  1. On the AWS DMS console, choose Replication instances in the navigation pane.
  2. Choose Create replication instance.
  3. Under Settings, for Name, enter a name for your instance.
  4. Under Instance configuration, for High Availability, choose Dev or test workload (Single-AZ).
  5. Under Connectivity and security, for VPC security groups, choose default.
  6. Choose Create replication instance.

Create Amazon VPC endpoints

Optionally, you can create Amazon VPC endpoints for DynamoDB when you need to connect to your DynamoDB table from the AWS DMS instance in a private network. Also make sure that you enable Publicly accessible when you need to connect to a database outside of your VPC.

Create an AWS DMS source endpoint

Create an AWS DMS source endpoint by completing the following steps:

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Choose Create endpoint.
  3. For Endpoint type, choose Source endpoint.
  4. Under Endpoint configuration, for Endpoint identifier, enter a name for your endpoint.
  5. For Source engine, choose Amazon Aurora MySQL.
  6. For Access to endpoint database, choose Provide access information manually.
  7. For Server Name, enter the endpoint name of your Aurora writer instance (for example, mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com).
  8. For Port, enter 3306.
  9. For User name, enter a user name for your AWS DMS task.
  10. For Password, enter a password.
  11. Choose Create endpoint.

Crate an AWS DMS target endpoint

Create an AWS DMS target endpoint by completing the following steps:

  1. On the AWS DMS console, choose Endpoints in the navigation pane.
  2. Choose Create endpoint.
  3. For Endpoint type, choose Target endpoint.
  4. Under Endpoint configuration, for Endpoint identifier, enter a name for your endpoint.
  5. For Target engine, choose Amazon DynamoDB.
  6. For Service access role ARN, enter the IAM role for your AWS DMS task.
  7. Choose Create endpoint.

Create AWS DMS migration tasks

Create AWS DMS database migration tasks by completing the following steps:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Choose Create task.
  3. Under Task configuration, for Task identifier, enter a name for your task.
  4. For Replication instance, choose your replication instance.
  5. For Source database endpoint, choose your source endpoint.
  6. For Target database endpoint, choose your target endpoint.
  7. For Migration type, choose Migrate existing data and replicate ongoing changes.
  8. Under Task settings, for Target table preparation mode, choose Do nothing.
  9. For Stop task after full load completes, choose Don’t stop.
  10. For LOB column settings, choose Limited LOB mode.
  11. For Task logs, enable Turn on CloudWatch logs and Turn on batch-optimized apply.
  12. Under Table mappings, choose JSON Editor and enter the following rules.

Here you can add values to the column. With the following rules, the AWS DMS CDC task will first create a new DynamoDB table with the specified name in target-table-name. Then it will replicate all the records, mapping the columns in the DB table to the attributes in the DynamoDB table.

{
    "rules": [
        {
            "rule-type": "selection",
            "rule-id": "1",
            "rule-name": "1",
            "object-locator": {
                "schema-name": "mydev",
                "table-name": "country_lookup_table"
            },
            "rule-action": "include"
        },
        {
            "rule-type": "object-mapping",
            "rule-id": "2",
            "rule-name": "2",
            "rule-action": "map-record-to-record",
            "object-locator": {
                "schema-name": "mydev",
                "table-name": "country_lookup_table"
            },
            "target-table-name": "tgt_country_lookup_table",
            "mapping-parameters": {
                "partition-key-name": "code",
                "sort-key-name": "countryname",
                "exclude-columns": [
                    "code",
                    "countryname"
                ],
                "attribute-mappings": [
                    {
                        "target-attribute-name": "code",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${code}"
                    },
                    {
                        "target-attribute-name": "countryname",
                        "attribute-type": "scalar",
                        "attribute-sub-type": "string",
                        "value": "${countryname}"
                    }
                ],
                "apply-during-cdc": true
            }
        }
    ]
}

DMS table mapping

  1. Choose Create task.

Now the AWS DMS replication task has been started.

  1. Wait for the Status to show as Load complete.

DMS task

  1. On the DynamoDB console, choose Tables in the navigation pane.
  2. Select the DynamoDB reference table, and choose Explore table items to review the replicated records.

DynamoDB reference table initial

Create an AWS Glue Data Catalog table and an AWS Glue streaming ETL job

In this section, you create an AWS Glue Data Catalog table and an AWS Glue streaming extract, transform, and load (ETL) job.

Create a Data Catalog table

Create an AWS Glue Data Catalog table for the source Kinesis data stream with the following steps:

  1. On the AWS Glue console, choose Databases under Data Catalog in the navigation pane.
  2. Choose Add database.
  3. For Name, enter my_kinesis_db.
  4. Choose Create database.
  5. Choose Tables under Databases, then choose Add table.
  6. For Name, enter my_stream_src_table.
  7. For Database, choose my_kinesis_db.
  8. For Select the type of source, choose Kinesis.
  9. For Kinesis data stream is located in, choose my account.
  10. For Kinesis stream name, enter a name for your data stream.
  11. For Classification, select JSON.
  12. Choose Next.
  13. Choose Edit schema as JSON, enter the following JSON, then choose Save.
[
  {
    "Name": "uuid",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "country",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "itemtype",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "saleschannel",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "orderpriority",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "orderdate",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "region",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "shipdate",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "unitssold",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "unitprice",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "unitcost",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "totalrevenue",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "totalcost",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "totalprofit",
    "Type": "string",
    "Comment": ""
  },
  {
    "Name": "impressiontime",
    "Type": "string",
    "Comment": ""
  }
]

Glue Catalog table schema

    1. Choose Next, then choose Create.

Create an AWS Glue streaming ETL job

Next, you create an AWS Glue streaming job. AWS Glue 3.0 and later supports Apache Hudi natively, so we use this native integration to ingest into a Hudi table. Complete the following steps to create the AWS Glue streaming job:

  1. On the AWS Glue Studio console, choose Spark script editor and choose Create.
  2. Under Job details tab, for Name, enter a name for your job.
  3. For IAM Role, choose the IAM role for your AWS Glue job.
  4. For Type, select Spark Streaming.
  5. For Glue version, choose Glue 4.0 – Supports spark 3.3, Scala 2, Python 3.
  6. For Requested number of workers, enter 3.
  7. Under Advanced properties, for Job parameters, choose Add new parameter.
  8. For Key, enter --conf.
  9. For Value, enter spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.hive.convertMetastoreParquet=false.
  10. Choose Add new parameter.
  11. For Key, enter --datalake-formats.
  12. For Value, enter hudi.
  13. For Script path, enter s3://<S3BucketName>/scripts/.
  14. For Temporary path, enter s3://<S3BucketName>/temporary/.
  15. Optionally, for Spark UI logs path, enter s3://<S3BucketName>/sparkHistoryLogs/.

Glue job parameter

  1. On the Script tab, enter the following script into the AWS Glue Studio editor and choose Create.

The near-real-time streaming job enriches data by joining a Kinesis data stream with a DynamoDB table that contains frequently updated reference data. The enriched dataset is loaded into the target Hudi table in the data lake. Replace <S3BucketName> with your bucket that you created via AWS CloudFormation:

import sys, json
import boto3
from pyspark.sql import DataFrame, Row
from pyspark.context import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv,["JOB_NAME"])

# Initialize spark session and Glue context
sc = SparkContext() 
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# job paramters
dydb_lookup_table = "tgt_country_lookup_table"
kin_src_database_name = "my_kinesis_db" 
kin_src_table_name = "my_stream_src_table" 
hudi_write_operation = "upsert" 
hudi_record_key = "uuid" 
hudi_precomb_key = "orderdate" 
checkpoint_path = "s3://<S3BucketName>/streamlab/checkpoint/" 
s3_output_folder = "s3://<S3BucketName>/output/"
hudi_table = "hudi_table"
hudi_database = "my_kinesis_db"

# hudi options 
additional_options={
    "hoodie.datasource.hive_sync.use_jdbc": "false",
    "hoodie.datasource.write.recordkey.field": hudi_record_key,
    "hoodie.datasource.hive_sync.database": hudi_database,
    "hoodie.table.name": hudi_table,
    "hoodie.consistency.check.enabled": "true",
    "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.NonpartitionedKeyGenerator",
    "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.NonPartitionedExtractor",
    "hoodie.datasource.write.hive_style_partitioning": "false",
    "hoodie.datasource.write.precombine.field": hudi_precomb_key,
    "hoodie.bulkinsert.shuffle.parallelism": "4",
    "hoodie.datasource.hive_sync.enable": "true",
    "hoodie.datasource.write.operation": hudi_write_operation,
    "hoodie.datasource.write.storage.type": "COPY_ON_WRITE",
}

# Scan and load the reference data table from DynamoDB into AWS Glue DynamicFrames using boto3 API.
def readDynamoDb():
    dynamodb = boto3.resource(“dynamodb”)
    table = dynamodb.Table(dydb_lookup_table)
    response = table.scan()
    items = response[“Items”]
    jsondata = sc.parallelize(items)
    lookupDf = glueContext.read.json(jsondata)
    return lookupDf


# Load the Amazon Kinesis data stream from Amazon Glue Data Catalog.
source_df = glueContext.create_data_frame.from_catalog(
    database=kin_src_database_name,
    table_name=kin_src_table_name,
    transformation_ctx=”source_df”,
    additional_options={“startingPosition”: “TRIM_HORIZON”},
)

# As part of batch processing, implement the transformation logic for joining streaming data frames with reference data frames.
def processBatch(data_frame, batchId):
    if data_frame.count() > 0:

        # Refresh the dymanodb table to pull latest snapshot for each microbatch
        country_lookup_df = readDynamoDb() 
                
        final_frame = data_frame.join(
            country_lookup_df, 
            data_frame["country"] == country_lookup_df["countryname"], 
            'left'
        ).drop(
            "countryname",
            "country",
            "unitprice", 
            "unitcost",
            "totalrevenue",
            "totalcost",
            "totalprofit"
        )

        # Script generated for node my-lab-hudi-connector
        final_frame.write.format("hudi") \
            .options(**additional_options) \
            .mode("append") \
            .save(s3_output_folder)
        
try:
    glueContext.forEachBatch(
        frame=source_df,
        batch_function=processBatch,
        options={"windowSize": "60 seconds", "checkpointLocation": checkpoint_path},
    )
except Exception as e:
    print(f"Error is @@@ ....{e}")
  1. Choose Run to start the streaming job.

The following screenshot shows examples of the DataFrames data_frame, country_lookup_df, and final_frame.

Glue job log output initial

The AWS Glue job successfully joined records coming from the Kinesis data stream and the reference table in DynamoDB, and then ingested the joined records into Amazon S3 in Hudi format.

Create and run a Python script to generate sample data and load it into the Kinesis data stream

In this section, you create and run a Python to generate sample data and load it into the source Kinesis data stream. Complete the following steps:

  1. Log in to AWS Cloud9, your EC2 instance, or any other computing host that puts records in your data stream.
  2. Create a Python file called generate-data-for-kds.py:
$ python3 generate-data-for-kds.py
  1. Open the Python file and enter the following script:
import json
import random
import boto3
import time

STREAM_NAME = "<mystreamname>"

def get_data():
    return {
        "uuid": random.randrange(0, 1000001, 1),
        "country": random.choice( [ "United Arab Emirates", "China", "India", "United Kingdom", "United States of America", ] ),
        "itemtype": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ),
        "saleschannel": random.choice( [ "Snacks", "Cereals", "Cosmetics", "Fruits", "Clothes", "Babycare", "Household", ] ),
        "orderpriority": random.choice(["H", "L", "M", "C"]),
        "orderdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12",
                                      "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", 
                                      "7/15/14", "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", 
                                      "1/27/17", "2/25/17", "3/10/17", "4/1/17", ] ),
        "region": random.choice( ["Asia" "Europe", "Americas", "Middle Eastern", "Africa"] ),
        "shipdate": random.choice( [ "1/4/10", "2/28/10", "2/15/11", "11/8/11", "2/1/12", "2/18/12", "3/1/12", "9/24/12", 
                                    "10/13/12", "12/2/12", "12/29/12", "3/30/13", "7/29/13", "3/23/14", "6/14/14", "7/15/14",
                                      "10/19/14", "5/7/15", "10/11/15", "11/22/15", "8/23/16", "1/15/17", "1/27/17", 
                                      "2/25/17", "3/10/17", "4/1/17", ] ),
        "unitssold": random.choice( [ "8217", "3465", "8877", "2882", "70", "7044", "6307", "2384", "1327", "2572", "8794", 
                                     "4131", "5793", "9091", "4314", "9085", "5270", "5459", "1982", "8245", "4860", "4656", 
                                     "8072", "65", "7864", "9778", ] ),
        "unitprice": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", 
                                     "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", 
                                     "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ),
        "unitcost": random.choice( [ "97.44", "117.11", "364.69", "502.54", "263.33", "117.11", "35.84", "6.92", "35.84", 
                                    "6.92", "35.84", "56.67", "159.42", "502.54", "117.11", "56.67", "524.96", "502.54", 
                                    "56.67", "56.67", "159.42", "56.67", "35.84", "159.42", "502.54", "31.79", ] ),
        "totalrevenue": random.choice( [ "1253749.86", "712750.5", "3745117.53", "1925954.14", "30604", "1448950.8", 
                                        "689228.96", "22242.72", "145014.56", "23996.76", "961008.32", "337626.63", 
                                        "1478837.04", "6075242.57", "887389.8", "742517.05", "3431876.7", "3648085.93", 
                                        "161988.86", "673863.85", "1240660.8", "380534.88", "882108.16", "16593.2", 
                                        "5255275.28", "463966.1", ] ),
        "totalcost": random.choice( [ "800664.48", "405786.15", "3237353.13", "1448320.28", "18433.1", "824922.84", 
                                     "226042.88", "16497.28", "47559.68", "17798.24", "315176.96", "234103.77", "923520.06",
                                       "4568591.14", "505212.54", "514846.95", "2766539.2", "2743365.86", 
                                       "112319.94", "467244.15", "774781.2", "263855.52", "289300.48", "10362.3", 
                                       "3951974.56", "310842.62", ] ),
        "totalprofit": random.choice( [ "453085.38", "306964.35", "507764.4", "477633.86", "12170.9", "624027.96", 
                                       "463186.08", "5745.44", "97454.88", "6198.52", "645831.36", "103522.86", "555316.98", 
                                       "1506651.43", "382177.26", "227670.1", "665337.5", "904720.07", "49668.92", "206619.7",
                                         "465879.6", "116679.36", "592807.68", "6230.9", "1303300.72", "153123.48", ] ),
        "impressiontime": random.choice( [ "2022-10-24T02:27:41Z", "2022-10-24T02:27:41Z", "2022-11-24T02:27:41Z", 
                                          "2022-12-24T02:27:41Z", "2022-13-24T02:27:41Z", "2022-14-24T02:27:41Z", 
                                          "2022-15-24T02:27:41Z", ] ),
    }

def generate(stream_name, kinesis_client):
    while True:
        data = get_data()
        print(data)
        kinesis_client.put_record(
            StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
        )
        time.sleep(2)

if __name__ == "__main__":
    generate(STREAM_NAME, boto3.client("kinesis"))

This script puts a Kinesis data stream record every 2 seconds.

Simulate updating the reference table in the Aurora MySQL cluster

Now all the resources and configurations are ready. For this example, we want to add a 3-digit country code to the reference table. Let’s update records in the Aurora MySQL table to simulate changes. Complete the following steps:

  1. Make sure that the AWS Glue streaming job is already running.
  2. Connect to the primary DB instance again, as described earlier.
  3. Enter your SQL commands to update records:
> UPDATE country_lookup_table SET combinedname='US-USA-US' WHERE code='US';
> UPDATE country_lookup_table SET combinedname='CA-CAN-Canada' WHERE code='CA';
> UPDATE country_lookup_table SET combinedname='CN-CHN-China' WHERE code='CN';
> UPDATE country_lookup_table SET combinedname='IN-IND-India' WHERE code='IN';

Now the reference table in the Aurora MySQL source database has been updated. Then the changes are automatically replicated to the reference table in DynamoDB.

DynamoDB reference table updated

The following tables show records in data_frame, country_lookup_df, and final_frame. In country_lookup_df and final_frame, the combinedname column has values formatted as <2-digit-country-code>-<3-digit-country-code>-<country-name>, which shows that the changed records in the referenced table are reflected in the table without restarting the AWS Glue streaming job. It means that the AWS Glue job successfully joins the incoming records from the Kinesis data stream with the reference table even when the reference table is changing.
Glue job log output updated

Query the Hudi table using Athena

Let’s query the Hudi table using Athena to see the records in the destination table. Complete the following steps:

  1. Make sure that the script and the AWS Glue Streaming job is still working:
    1. The Python script (generate-data-for-kds.py) is still running.
    2. The generated data is being sent to the data stream.
    3. The AWS Glue streaming job is still running.
  2. On the Athena console, run the following SQL in the query editor:
select shipdate, unitssold, impressiontime, code,combinedname from <database>.<table>
where combinedname is not null
limit 10;

The following query result shows the records that are processed before the referenced table was changed. Records in the combinedname column are similar to <2-digit-country-code>-<country-name>.

Athena query result initial

The following query result shows the records that are processed after the referenced table was changed. Records in the combinedname column are similar to <2-digit-country-code>-<3-digit-country-code>-<country-name>.

Athena query result updated

Now you understand that the changed reference data is successfully reflected in the target Hudi table joining records from the Kinesis data stream and the reference data in DynamoDB.

Clean up

As the final step, clean up the resources:

  1. Delete the Kinesis data stream.
  2. Delete the AWS DMS migration task, endpoint, and replication instance.
  3. Stop and delete the AWS Glue streaming job.
  4. Delete the AWS Cloud9 environment.
  5. Delete the CloudFormation template.

Conclusion

Building and maintaining a transactional data lake that involves real-time data ingestion and processing has multiple variable components and decisions to be made, such as what ingestion service to use, how to store your reference data, and what transactional data lake framework to use. In this post, we provided the implementation details of such a pipeline, using AWS native components as the building blocks and Apache Hudi as the open-source framework for a transactional data lake.

We believe that this solution can be a starting point for organizations looking to implement a new data lake with such requirements. Additionally, the different components are fully pluggable and can be mixed and matched to existing data lakes to target new requirements or migrate existing ones, addressing their pain points.


About the authors

Manish Kola is a Data Lab Solutions Architect at AWS, where he works closely with customers across various industries to architect cloud-native solutions for their data analytics and AI needs. He partners with customers on their AWS journey to solve their business problems and build scalable prototypes. Before joining AWS, Manish’s experience includes helping customers implement data warehouse, BI, data integration, and data lake projects.

Santosh Kotagiri is a Solutions Architect at AWS with experience in data analytics and cloud solutions leading to tangible business results. His expertise lies in designing and implementing scalable data analytics solutions for clients across industries, with a focus on cloud-native and open-source services. He is passionate about leveraging technology to drive business growth and solve complex problems.

Chiho Sugimoto is a Cloud Support Engineer on the AWS Big Data Support team. She is passionate about helping customers build data lakes using ETL workloads. She loves planetary science and enjoys studying the asteroid Ryugu on weekends.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Introducing the Enhanced Document API for DynamoDB in the AWS SDK for Java 2.x

Post Syndicated from John Viegas original https://aws.amazon.com/blogs/devops/introducing-the-enhanced-document-api-for-dynamodb-in-the-aws-sdk-for-java-2-x/

We are excited to announce that the AWS SDK for Java 2.x now offers the Enhanced Document API for DynamoDB, providing an enhanced way of working with Amazon DynamoDb items.
This post covers using the Enhanced Document API for DynamoDB with the DynamoDB Enhanced Client. By using the Enhanced Document API, you can create an EnhancedDocument instance to represent an item with no fixed schema, and then use the DynamoDB Enhanced Client to read and write to DynamoDB.
Furthermore, unlike the Document APIs of aws-sdk-java 1.x, which provided arguments and return types that were not type-safe, the EnhancedDocument provides strongly-typed APIs for working with documents. This interface simplifies the development process and ensures that the data is correctly typed.

Prerequisites:

Before getting started, ensure you are using an up-to-date version of the AWS Java SDK dependency with all the latest released bug-fixes and features. For Enhanced Document API support, you must use version 2.20.33 or later. See our “Set up an Apache Maven project” guide for details on how to manage the AWS Java SDK dependency in your project.

Add dependency for dynamodb-enhanced in pom.xml.

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>dynamodb-enhanced</artifactId>
<version>2.20.33</version>
</dependency>

Quick walk-through for using Enhanced Document API to interact with DDB

Step 1 : Create a DynamoDB Enhanced Client

Create an instance of the DynamoDbEnhancedClient class, which provides a high-level interface for Amazon DynamoDB that simplifies working with DynamoDB tables.

DynamoDbEnhancedClient enhancedClient = DynamoDbEnhancedClient.builder()
                                               .dynamoDbClient(DynamoDbClient.create())
                                               .build();

Step 2 : Create a DynamoDbTable resource object with Document table schema

To execute commands against a DynamoDB table using the Enhanced Document API, you must associate the table with your Document table schema to create a DynamoDbTable resource object. The Document table schema builder requires the primary index key and attribute converter providers. Use AttributeConverterProvider.defaultProvider() to convert document attributes of default types. An optional secondary index key can be added to the builder.


DynamoDbTable<EnhancedDocument> documentTable = enhancedClient.table("my_table",
                                              TableSchema.documentSchemaBuilder()
                                                         .addIndexPartitionKey(TableMetadata.primaryIndexName(),"hashKey", AttributeValueType.S)
                                                         .addIndexSortKey(TableMetadata.primaryIndexName(), "sortKey", AttributeValueType.N)
                                                         .attributeConverterProviders(AttributeConverterProvider.defaultProvider())
                                                         .build());
                                                         
// call documentTable.createTable() if "my_table" does not exist in DynamoDB

Step 3 : Write a DynamoDB item using an EnhancedDocument

The EnhancedDocument class has static factory methods along with a builder method to add attributes to a document. The following snippet demonstrates the type safety provided by EnhancedDocument when you construct a document item.

EnhancedDocument simpleDoc = EnhancedDocument.builder()
 .attributeConverterProviders(defaultProvider())
 .putString("hashKey", "sampleHash")
 .putNull("nullKey")
 .putNumber("sortKey", 1.0)
 .putBytes("byte", SdkBytes.fromUtf8String("a"))
 .putBoolean("booleanKey", true)
 .build();
 
documentTable.putItem(simpleDoc);

Step 4 : Read a Dynamo DB item as an EnhancedDocument

Attributes of the Documents retrieved from a DynamoDB table can be accessed with getter methods

EnhancedDocument docGetItem = documentTable.getItem(r -> r.key(k -> k.partitionValue("samppleHash").sortValue(1)));

docGetItem.getString("hashKey");
docGetItem.isNull("nullKey")
docGetItem.getNumber("sortKey").floatValue();
docGetItem.getBytes("byte");
docGetItem.getBoolean("booleanKey"); 

AttributeConverterProviders for accessing document attributes as custom objects

You can provide a custom AttributeConverterProvider instance to an EnhancedDocument to convert document attributes to a specific object type.
These providers can be set on either DocumentTableSchema or EnhancedDocument to read or write attributes as custom objects.

TableSchema.documentSchemaBuilder()
           .attributeConverterProviders(CustomClassConverterProvider.create(), defaultProvider())
           .build();
    
// Insert a custom class instance into an EnhancedDocument as attribute 'customMapOfAttribute'.
EnhancedDocument customAttributeDocument =
EnhancedDocument.builder().put("customMapOfAttribute", customClassInstance, CustomClass.class).build();

// Retrieve attribute 'customMapOfAttribute' as CustomClass object.
CustomClass customClassObject = customAttributeDocument.get("customMapOfAttribute", CustomClass.class);

Convert Documents to JSON and vice-versa

The Enhanced Document API allows you to convert a JSON string to an EnhancedDocument and vice-versa.

// Enhanced document created from JSON string using defaultConverterProviders.
EnhancedDocument documentFromJson = EnhancedDocument.fromJson("{\"key\": \"Value\"}")
                                              
// Converting an EnhancedDocument to JSON string "{\"key\": \"Value\"}"                                                 
String jsonFromDocument = documentFromJson.toJson();

Define a Custom Attribute Converter Provider

Custom attribute converter providers are implementations of AttributeConverterProvider that provide converters for custom classes.
Below is an example for a CustomClassForDocumentAPI which has as a single field stringAttribute of type String and its corresponding AttributeConverterProvider implementation.

public class CustomClassForDocumentAPI {
    private final String stringAttribute;

    public CustomClassForDocumentAPI(Builder builder) {
        this.stringAttribute = builder.stringAttribute;
    }
    public static Builder builder() {
        return new Builder();
    }
    public String stringAttribute() {
        return stringAttribute;
    }
    public static final class Builder {
        private String stringAttribute;
        private Builder() {
        }
        public Builder stringAttribute(String stringAttribute) {
            this.stringAttribute = string;
            return this;
        }
        public CustomClassForDocumentAPI build() {
            return new CustomClassForDocumentAPI(this);
        }
    }
}
import java.util.Map;
import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
import software.amazon.awssdk.enhanced.dynamodb.AttributeConverterProvider;
import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
import software.amazon.awssdk.utils.ImmutableMap;

public class CustomAttributeForDocumentConverterProvider implements AttributeConverterProvider {
    private final Map<EnhancedType<?>, AttributeConverter<?>> converterCache = ImmutableMap.of(
        EnhancedType.of(CustomClassForDocumentAPI.class), new CustomClassForDocumentAttributeConverter());
        // Different types of converters can be added to this map.

    public static CustomAttributeForDocumentConverterProvider create() {
        return new CustomAttributeForDocumentConverterProvider();
    }

    @Override
    public <T> AttributeConverter<T> converterFor(EnhancedType<T> enhancedType) {
        return (AttributeConverter<T>) converterCache.get(enhancedType);
    }
}

A custom attribute converter is an implementation of AttributeConverter that converts a custom classes to and from a map of attribute values, as shown below.

import java.util.LinkedHashMap;
import java.util.Map;
import software.amazon.awssdk.enhanced.dynamodb.AttributeConverter;
import software.amazon.awssdk.enhanced.dynamodb.AttributeValueType;
import software.amazon.awssdk.enhanced.dynamodb.EnhancedType;
import software.amazon.awssdk.enhanced.dynamodb.internal.converter.attribute.EnhancedAttributeValue;
import software.amazon.awssdk.enhanced.dynamodb.internal.converter.attribute.StringAttributeConverter;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;

public class CustomClassForDocumentAttributeConverter implements AttributeConverter<CustomClassForDocumentAPI> {
    public static CustomClassForDocumentAttributeConverter create() {
        return new CustomClassForDocumentAttributeConverter();
    }
    @Override
    public AttributeValue transformFrom(CustomClassForDocumentAPI input) {
        Map<String, AttributeValue> attributeValueMap = new LinkedHashMap<>();
        if(input.string() != null){
            attributeValueMap.put("stringAttribute", AttributeValue.fromS(input.string()));
        }
        return EnhancedAttributeValue.fromMap(attributeValueMap).toAttributeValue();
    }

    @Override
    public CustomClassForDocumentAPI transformTo(AttributeValue input) {
        Map<String, AttributeValue> customAttr = input.m();
        CustomClassForDocumentAPI.Builder builder = CustomClassForDocumentAPI.builder();
        if (customAttr.get("stringAttribute") != null) {
            builder.stringAttribute(StringAttributeConverter.create().transformTo(customAttr.get("stringAttribute")));
        }
        return builder.build();
    }
    @Override
    public EnhancedType<CustomClassForDocumentAPI> type() {
        return EnhancedType.of(CustomClassForDocumentAPI.class);
    }
    @Override
    public AttributeValueType attributeValueType() {
        return AttributeValueType.M;
    }
}

Attribute Converter Provider for EnhancedDocument Builder

When working outside of a DynamoDB table context, make sure to set the attribute converter providers explicitly on the EnhancedDocument builder. When used within a DynamoDB table context, the table schema’s converter provider will be used automatically for the EnhancedDocument.
The code snippet below shows how to set an AttributeConverterProvider using the EnhancedDocument builder method.

// Enhanced document created from JSON string using custom AttributeConverterProvider.
EnhancedDocument documentFromJson = EnhancedDocument.builder()
                                                    .attributeConverterProviders(CustomClassConverterProvider.create())
                                                    .json("{\"key\": \"Values\"}")
                                                    .build();
                                                    
CustomClassForDocumentAPI customClass = documentFromJson.get("key", CustomClassForDocumentAPI.class)

Conclusion

In this blog post we showed you how to set up and begin using the Enhanced Document API with the DynamoDB Enhanced Client and standalone with the EnhancedDocument class. The enhanced client is open-source and resides in the same repository as the AWS SDK for Java 2.0.
We hope you’ll find this new feature useful. You can always share your feedback on our GitHub issues page.

How Novo Nordisk built distributed data governance and control at scale

Post Syndicated from Jonatan Selsing original https://aws.amazon.com/blogs/big-data/how-novo-nordisk-built-distributed-data-governance-and-control-at-scale/

This is a guest post co-written with Jonatan Selsing and Moses Arthur from Novo Nordisk.

This is the second post of a three-part series detailing how Novo Nordisk, a large pharmaceutical enterprise, partnered with AWS Professional Services to build a scalable and secure data and analytics platform. The first post of this series describes the overall architecture and how Novo Nordisk built a decentralized data mesh architecture, including Amazon Athena as the data query engine. The third post will show how end-users can consume data from their tool of choice, without compromising data governance. This will include how to configure Okta, AWS Lake Formation, and a business intelligence tool to enable SAML-based federated use of Athena for an enterprise BI activity.

When building a scalable data architecture on AWS, giving autonomy and ownership to the data domains are crucial for the success of the platform. By providing the right mix of freedom and control to those people with the business domain knowledge, your business can maximize value from the data as quickly and effectively as possible. The challenge facing organizations, however, is how to provide the right balance between freedom and control. At the same time, data is a strategic asset that needs to be protected with the highest degree of rigor. How can organizations strike the right balance between freedom and control?

In this post, you will learn how to build decentralized governance with Lake Formation and AWS Identity and Access Management (IAM) using attribute-based access control (ABAC). We discuss some of the patterns we use, including Amazon Cognito identity pool federation using ABAC in permission policies, and Okta-based SAML federation with ABAC enforcement on role trust policies.

Solution overview

In the first post of this series, we explained how Novo Nordisk and AWS Professional Services built a modern data architecture based on data mesh tenets. This architecture enables data governance on distributed data domains, using an end-to-end solution to create data products and providing federated data access control. This post dives into three elements of the solution:

  • How IAM roles and Lake Formation are used to manage data access across data domains
  • How data access control is enforced at scale, using a group membership mapping with an ABAC pattern
  • How the system maintains state across the different layers, so that the ecosystem of trust is configured appropriately

From the end-user perspective, the objective of the mechanisms described in this post is to enable simplified data access from the different analytics services adopted by Novo Nordisk, such as those provided by software as a service (SaaS) vendors like Databricks, or self-hosted ones such as JupyterHub. At the same time, the platform must guarantee that any change in a dataset is immediately reflected at the service user interface. The following figure illustrates at a high level the expected behavior.

High-level data platform expected behavior

Following the layer nomenclature established in the first post, the services are created and managed in the consumption layer. The domain accounts are created and managed in the data management layer. Because changes can occur from both layers, continuous communication in both directions is required. The state information is kept in the virtualization layer along with the communication protocols. Additionally, at sign-in time, the services need information about data resources required to provide data access abstraction.

Managing data access

The data access control in this architecture is designed around the core principle that all access is encapsulated in isolated IAM role sessions. The layer pattern that we described in the first post ensures that the creation and curation of the IAM role policies involved can be delegated to the different data management ecosystems. Each data management platform integrated can use their own data access mechanisms, with the unique requirement that the data is accessed via specific IAM roles.

To illustrate the potential mechanisms that can be used by data management solutions, we show two examples of data access permission mechanisms used by two different data management solutions. Both systems utilize the same trust policies as described in the following sections, but have a completely different permission space.

Example 1: Identity-based ABAC policies

The first mechanism we discuss is an ABAC role that provides access to a home-like data storage area, where users can share within their departments and with the wider organization in a structure that mimics the organizational structure. Here, we don’t utilize the group names, but instead forward user attributes from the corporate Active Directory directly into the permission policy through claim overrides. We do this by having the corporate Active Directory as the identity provider (IdP) for the Amazon Cognito user pool and mapping the relevant IdP attributes to user pool attributes. Then, in the Amazon Cognito identity pool, we map the user pool attributes to session tags to use them for access control. Custom overrides can be included in the claim mapping, through the use of a pre token generation Lambda trigger. This way, claims from AD can be mapped to Amazon Cognito user pool attributes and then ultimately used in the Amazon Cognito identity pool to control IAM role permissions. The following is an example of an IAM policy with sessions tags:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Condition": {
                "StringLike": {
                    "s3:prefix": [
                        "",
                        "public/",
                        "public/*",
                        "home/",
                        "home/${aws:PrincipalTag/initials}/*",
                        "home/${aws:PrincipalTag/department}/*"
                    ]
                }
            },
            "Action": "s3:ListBucket",
            "Resource": [
                "arn:aws:s3:::your-home-bucket"
            ],
            "Effect": "Allow"
        },
        {
            "Action": [
                "s3:GetObject*",
                "s3:PutObject*",
                "s3:DeleteObject*"
            ],
            "Resource": [
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/initials}",
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/initials}/*",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/initials}",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/initials}/*",
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/department}",
                "arn:aws:s3:::your-home-bucket/home/${aws:PrincipalTag/department}/*",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/department}",
                "arn:aws:s3:::your-home-bucket/public/${aws:PrincipalTag/department}/*"
            ],
            "Effect": "Allow"
        },
        {
            "Action": "s3:GetObject*",
            "Resource": [
                "arn:aws:s3:::your-home-bucket/public/",
                "arn:aws:s3:::your-home-bucket/public/*"
            ],
            "Effect": "Allow"
        }
    ]
}

This role is then embedded in the analytics layer (together with the data domain roles) and assumed on behalf of the user. This enables users to mix and match between data domains—as well as utilizing private and public data paths that aren’t necessarily tied to any data domain. For more examples of how ABAC can be used with permission policies, refer to How to scale your authorization needs by using attribute-based access control with S3.

Example 2: Lake Formation name-based access controls

In the data management solution named Novo Nordisk Enterprise Datahub (NNEDH), which we introduced in the first post, we use Lake Formation to enable standardized data access. The NNEDH datasets are registered in the Lake Formation Data Catalog as databases and tables, and permissions are granted using the named resource method. The following screenshot shows an example of these permissions.

Lakeformation named resource method for permissions management

In this approach, data access governance is delegated to Lake Formation. Every data domain in NNEDH has isolated permissions synthesized by NNEDH as the central governance management layer. This is a similar pattern to what is adopted for other domain-oriented data management solutions. Refer to Use an event-driven architecture to build a data mesh on AWS for an example of tag-based access control in Lake Formation.

These patterns don’t exclude implementations of peer-to-peer type data sharing mechanisms, such as those that can be achieved using AWS Resource Access Manager (AWS RAM), where a single IAM role session can have permissions that span across accounts.

Delegating role access to the consumption later

The following figure illustrates the data access workflow from an external service.

Data access workflow from external service

The workflow steps are as follows:

  1. A user authenticates on an IdP used by the analytics tool that they are trying to access. A wide range of analytics tools are supported by Novo Nordisk platform, such as Databricks and JupyterHub, and the IdP can be either SAML or OIDC type depending on the capabilities of the third-party tool. In this example, an Okta SAML application is used to sign into a third-party analytics tool, and an IAM SAML IdP is configured in the data domain AWS account to federate with the external IdP. The third post of this series describes how to set up an Okta SAML application for IAM role federation on Athena.
  2. The SAML assertion obtained during the sign-in process is used to request temporary security credentials of an IAM role through the AssumeRole operation. In this example, the SAML assertion is used onAssumeRoleWithSAMLoperation. For OpenID Connect-compatible IdPs, the operationAssumeRoleWithWebIdentitymust be used with the JWT. The SAML attributes in the assertion or the claims in the token can be generated at sign-in time, to ensure that the group memberships are forwarded, for the ABAC policy pattern described in the following sections.
  3. The analytics tool, such as Databricks or JupyterHub, abstracts the usage of the IAM role session credentials in the tool itself, and data can be accessed directly according to the permissions of the IAM role assumed. This pattern is similar in nature to IAM passthrough as implemented by Databricks, but in Novo Nordisk it’s extended across all analytics services. In this example, the analytics tool accesses the data lake on Amazon Simple Storage Service (Amazon S3) through Athena queries.

As the data mesh pattern expands across domains covering more downstream services, we need a mechanism to keep IdPs and IAM role trusts continuously updated. We come back to this part later in the post, but first we explain how role access is managed at scale.

Attribute-based trust policies

In previous sections, we emphasized that this architecture relies on IAM roles for data access control. Each data management platform can implement its own data access control method using IAM roles, such as identity-based policies or Lake Formation access control. For data consumption, it’s crucial that these IAM roles are only assumable by users that are part of Active Directory groups with the appropriate entitlements to use the role. To implement this at scale, the IAM role’s trust policy uses ABAC.

When a user authenticates on the external IdP of the consumption layer, we add in the access token a claim derived from their Active Directory groups. This claim is propagated by theAssumeRoleoperation into the trust policy of the IAM role, where it is compared with the expected Active Directory group. Only users that belong to the expected groups can assume the role. This mechanism is illustrated in the following figure.

Architecture of the integration with the identity provider

Translating group membership to attributes

To enforce the group membership entitlement at the role assumption level, we need a way to compare the required group membership with the group memberships that a user comes with in their IAM role session. To achieve this, we use a form of ABAC, where we have a way to represent the sum of context-relevant group memberships in a single attribute. A single IAM role session tag value is limited to 256 characters. The corresponding limit for SAML assertions is 100,000 characters, so for systems where a very large number of either roles or group-type mappings are required, SAML can support a wider range of configurations.

In our case, we have opted for a compression algorithm that takes a group name and compresses it to a 4-character string hash. This means that, together with a group-separation character, we can fit 51 groups in a single attribute. This gets pushed down to approximately 20 groups for OIDC type role assumption due to the PackedPolicySize, but is higher for a SAML-based flow. This has shown to be sufficient for our case. There is a risk that two different groups could hash to the same character combination; however, we have checked that there are no collisions in the existing groups. To mitigate this risk going forward, we have introduced guardrails in multiples places. First, before adding new groups entitlements in the virtualization layer, we check if there’s a hash collision with any existing group. When a duplicated group is attempted to be added, our service team is notified and we can react accordingly. But as stated earlier, there is a low probability of clashes, so the flexibility this provides outweighs the overhead associated with managing clashes (we have not had any yet). We additionally enforce this at SAML assertion creation time as well, to ensure that there are no duplicated groups in the users group list, and in cases of duplication, we remove both entirely. This means malicious actors can at most limit the access of other users, but not gain unauthorized access.

Enforcing audit functionality across sessions

As mentioned in the first post, on top of governance, there are strict requirements around auditability of data accesses. This means that for all data access requests, it must be possible to trace the specific user across services and retain this information. We achieve this by setting (and enforcing) a source identity for all role sessions and make sure to propagate enterprise identity to this attribute. We use a combination of Okta inline hooks and SAML session tags to achieve this. This means that the AWS CloudTrail logs for an IAM role session have the following information:

{
    "eventName": "AssumeRoleWithSAML",
    "requestParameters": {
        "SAMLAssertionlD": "id1111111111111111111111111",
        "roleSessionName": "[email protected]",
        "principalTags": {
            "nn-initials": "user",
            "department": "NNDepartment",
            "GroupHash": "xxxx",
            "email": "[email protected]",
            "cost-center": "9999"
        },
        "sourceIdentity": "[email protected]",
        "roleArn": "arn:aws:iam::111111111111:role/your-assumed-role",
        "principalArn": "arn:aws:iam,111111111111:saml-provider/your-saml-provider",
        ...
    },
    ...
}

On the IAM role level, we can enforce the required attribute configuration with the following example trust policy. This is an example for a SAML-based app. We support the same patterns through OpenID Connect IdPs.

We now go through the elements of an IAM role trust policy, based on the following example:

{
    "Version": "2008-10-17",
    "Statement": {
        "Effect": "Allow",
        "Principal": {
            "Federated": [SAML_IdP_ARN]
        },
        "Action": [
            "sts:AssumeRoleWithSAML",
            "sts:TagSession",
            "sts:SetSourceIdentity"
        ],
        "Condition": {
            "StringEquals": {
                "SAML:aud": "https://signin.aws.amazon.com/saml"
            },
            "StringLike": {
                "sts:SourceIdentity": "*@novonordisk.com",
                "aws:RequestTag/GroupHash": ["*xxxx*"]
            },
            "StringNotLike": {
                "sts:SourceIdentity": "*"
            }
        }
    }
}

The policy contains the following details:

  • ThePrincipalstatement should point to the list of apps that are served through the consumption layer. These can be Azure app registrations, Okta apps, or Amazon Cognito app clients. This means that SAML assertions (in the case of SAML-based flows) minted from these applications can be used to run the operationAssumeRoleWithSamlif the remaining elements are also satisfied.
  • TheActionstatement includes the required permissions for theAssumeRolecall to succeed, including adding the contextual information to the role session.
  • In the first condition, the audience of the assertion needs to be targeting AWS.
  • In the second condition, there are twoStringLikerequirements:
    • A requirement on the source identity as the naming convention to follow at Novo Nordisk (users must come with enterprise identity, following our audit requirements).
    • Theaws:RequestTag/GroupHashneeds to bexxxx, which represents the hashed group name mentioned in the upper section.
  • Lastly, we enforce that sessions can’t be started without setting the source identity.

This policy enforces that all calls are from recognized services, include auditability, have the right target, and enforces that the user has the right group memberships.

Building a central overview of governance and trust

In this section, we discuss how Novo Nordisk keeps track of the relevant group-role relations and maps these at sign-in time.

Entitlements

In Novo Nordisk, all accesses are based on Active Directory group memberships. There is no user-based access. Because this pattern is so central, we have extended this access philosophy into our data accesses. As mentioned earlier, at sign-in time, the hooks need to be able to know which roles to assume for a given user, given this user’s group membership. We have modeled this data in Amazon DynamoDB, where just-in-time provisioning ensures that only the required user group memberships are available. By building our application around the use of groups, and by having the group propagation done by the application code, we avoid having to make a more general Active Directory integration, which would, for a company the size of Novo Nordisk, severely impact the application, simply due to the volume of users and groups.

The DynamoDB entitlement table contains all relevant information for all roles and services, including role ARNs and IdP ARNs. This means that when users log in to their analytics services, the sign-in hook can construct the required information for the Roles SAML attribute.

When new data domains are added to the data management layer, the data management layer needs to communicate both the role information and the group name that gives access to the role.

Single sign-on hub for analytics services

When scaling this permission model and data management pattern to a large enterprise such as Novo Nordisk, we ended up creating a large number of IAM roles distributed across different accounts. Then, a solution is required to map and provide access for end-users to the required IAM role. To simplify user access to multiple data sources and analytics tools, Novo Nordisk developed a single sign-on hub for analytics services. From the end-user perspective, this is a web interface that glues together different offerings in a unified system, making it a one-stop tool for data and analytics needs. When signing in to each of the analytical offerings, the authenticated sessions are forwarded, so users never have to reauthenticate.

Common for all the services supported in the consumption layer is that we can run a piece of application code at sign-in time, allowing sign-in time permissions to be calculated. The hooks that achieve this functionality can, for instance, be run by Okta inline hooks. This means that each of the target analytics services can have custom code to translate relevant contextual information or provide other types of automations for the role forwarding.

The sign-in flow is demonstrated in the following figure.

Sign-in flow

The workflow steps are as follows:

  1. A user accesses an analytical service such as Databricks in the Novo Nordisk analytics hub.
  2. The service uses Okta as the SAML-based IdP.
  3. Okta invokes an AWS Lambda-based SAML assertion inline hook.
  4. The hook uses the entitlement database, converting application-relevant group memberships into role entitlements.
  5. Relevant contextual information is returned from the entitlement database.
  6. The Lambda-based hook adds new SAML attributes to the SAML assertion, including the hashed group memberships and other contextual information such as source identity.
  7. A modified SAML assertion is used to sign users in to the analytical service.
  8. The user can now use the analytical tool with active IAM role sessions.

Synchronizing role trust

The preceding section gives an overview of how federation works in this solution. Now we can go through how we ensure that all participating AWS environments and accounts are in sync with the latest configuration.

From the end-user perspective, the synchronization mechanism must ensure that every analytics service instantiated can access the data domains assigned to the groups that the user belongs to. Also, changes in data domains—such as granting data access to an Active Directory group—must be effective immediately to every analytics service.

Two event-based mechanisms are used to maintain all the layers synchronized, as detailed in this section.

Synchronize data access control on the data management layer with changes to services in the consumption layer

As describe in the previous section, the IAM roles used for data access are created and managed by the data management layer. These IAM roles have a trust policy providing federated access to the external IdPs used by the analytics tools of the consumption layer. It implies that for every new analytical service created with a different IDP, the IAM roles used for data access on data domains must be updated to trust this new IdP.

Using NNEDH as an example of a data management solution, the synchronization mechanism is demonstrated in the following figure.

Synchronization mechanism in a data management solution

Taking as an example a scenario where a new analytics service is created, the steps in this workflow are as follows:

  1. A user with access to the administration console of the consumption layer instantiates a new analytics service, such as JupyterHub.
  2. A job running on AWS Fargate creates the resources needed for this new analytics service, such as an Amazon Elastic Compute Cloud (Amazon EC2) instance for JupyterHub, and the IdP required, such as a new SAML IdP.
  3. When the IdP is created in the previous step, an event is added in an Amazon Simple Notification Service (Amazon SNS) topic with its details, such as name and SAML metadata.
  4. In the NNEDH control plane, a Lambda job is triggered by new events on this SNS topic. This job creates the IAM IdP, if needed, and updates the trust policy of the required IAM roles in all the AWS accounts used as data domains, adding the trust on the IdP used by the new analytics service.

In this architecture, all the update steps are event-triggered and scalable. This means that users of new analytics services can access their datasets almost instantaneously when they are created. In the same way, when a service is removed, the federation to the IdP is automatically removed if not used by other services.

Propagate changes on data domains to analytics services

Changes to data domains, such as the creation of a new S3 bucket used as a dataset, or adding or removing data access to a group, must be reflected immediately on analytics services of the consumption layer. To accomplish it, a mechanism is used to synchronize the entitlement database with the relevant changes made in NNEDH. This flow is demonstrated in the following figure.

Changes propagation flow

Taking as an example a scenario where access to a specific dataset is granted to a new group, the steps in this workflow are as follows:

  1. Using the NNEDH admin console, a data owner approves a dataset sharing request that grants access on a dataset to an Active Directory group.
  2. In the AWS account of the related data domain, the dataset components such as the S3 bucket and Lake Formation are updated to provide data access to the new group. The cross-account data sharing in Lake Formation uses AWS RAM.
  3. An event is added in an SNS topic with the current details about this dataset, such as the location of the S3 bucket and the groups that currently have access to it.
  4. In the virtualization layer, the updated information from the data management layer is used to update the entitlement database in DynamoDB.

These steps make sure that changes on data domains are automatically and immediately reflected on the entitlement database, which is used to provide data access to all the analytics services of the consumption layer.

Limitations

Many of these patterns rely on the analytical tool to support a clever use of IAM roles. When this is not the case, the platform teams themselves need to develop custom functionality at the host level to ensure that role accesses are correctly controlled. This, for example, includes writing custom authenticators for JupyterHub.

Conclusion

This post shows an approach to building a scalable and secure data and analytics platform. It showcases some of the mechanisms used at Novo Nordisk and how to strike the right balance between freedom and control. The architecture laid out in the first post in this series enables layer independence, and exposes some extremely useful primitives for data access and governance. We make heavy use of contextual attributes to modulate role permissions at the session level, which provide just-in-time permissions. These permissions are propagated at a scale, across data domains. The upside is that a lot of the complexity related to managing data access permission can be delegated to the relevant business groups, while enabling the end-user consumers of data to think as little as possible about data accesses and focus on providing value for the business use cases. In the case of Novo Nordisk, they can provide better outcomes for patients and acceleration innovation.

The next post in this series describes how end-users can consume data from their analytics tool of choice, aligned with the data access controls detailed in this post.


About the Authors

Jonatan Selsing is former research scientist with a PhD in astrophysics that has turned to the cloud. He is currently the Lead Cloud Engineer at Novo Nordisk, where he enables data and analytics workloads at scale. With an emphasis on reducing the total cost of ownership of cloud-based workloads, while giving full benefit of the advantages of cloud, he designs, builds, and maintains solutions that enable research for future medicines.

Hassen Riahi is a Sr. Data Architect at AWS Professional Services. He holds a PhD in Mathematics & Computer Science on large-scale data management. He works with AWS customers on building data-driven solutions.

Alessandro Fior is a Sr. Data Architect at AWS Professional Services. He is passionate about designing and building modern and scalable data platforms that accelerate companies to extract value from their data.

Moses Arthur comes from a mathematics and computational research background and holds a PhD in Computational Intelligence specialized in Graph Mining. He is currently a Cloud Product Engineer at Novo Nordisk, building GxP-compliant enterprise data lakes and analytics platforms for Novo Nordisk global factories producing digitalized medical products.

Anwar RizalAnwar Rizal is a Senior Machine Learning consultant based in Paris. He works with AWS customers to develop data and AI solutions to sustainably grow their business.

Kumari RamarKumari Ramar is an Agile certified and PMP certified Senior Engagement Manager at AWS Professional Services. She delivers data and AI/ML solutions that speed up cross-system analytics and machine learning models, which enable enterprises to make data-driven decisions and drive new innovations.

Accelerate HiveQL with Oozie to Spark SQL migration on Amazon EMR

Post Syndicated from Vinay Kumar Khambhampati original https://aws.amazon.com/blogs/big-data/accelerate-hiveql-with-oozie-to-spark-sql-migration-on-amazon-emr/

Many customers run big data workloads such as extract, transform, and load (ETL) on Apache Hive to create a data warehouse on Hadoop. Apache Hive has performed pretty well for a long time. But with advancements in infrastructure such as cloud computing and multicore machines with large RAM, Apache Spark started to gain visibility by performing better than Apache Hive.

Customers now want to migrate their Apache Hive workloads to Apache Spark in the cloud to get the benefits of optimized runtime, cost reduction through transient clusters, better scalability by decoupling the storage and compute, and flexibility. However, migration from Apache Hive to Apache Spark needs a lot of manual effort to write migration scripts and maintain different Spark job configurations.

In this post, we walk you through a solution that automates the migration from HiveQL to Spark SQL. The solution was used to migrate Hive with Oozie workloads to Spark SQL and run them on Amazon EMR for a large gaming client. You can also use this solution to develop new jobs with Spark SQL and process them on Amazon EMR. This post assumes that you have a basic understanding of Apache Spark, Hive, and Amazon EMR.

Solution overview

In our example, we use Apache Oozie, which schedules Apache Hive jobs as actions to collect and process data on a daily basis.

We migrate these Oozie workflows with Hive actions by extracting the HQL files, and dynamic and static parameters, and converting them to be Spark compliant. Manual conversion is both time consuming and error prone. To convert the HQL to Spark SQL, you’ll need to sort through existing HQLs, replace the parameters, and change the syntax for a bunch of files.

Instead, we can use automation to speed up the process of migration and reduce heavy lifting tasks, costs, and risks.

We split the solution into two primary components: generating Spark job metadata and running the SQL on Amazon EMR. The first component (metadata setup) consumes existing Hive job configurations and generates metadata such as number of parameters, number of actions (steps), and file formats. The second component consumes the generated metadata from the first component and prepares the run order of Spark SQL within a Spark session. With this solution, we support basic orchestration and scheduling with the help of AWS services like Amazon DynamoDB and Amazon Simple Storage Service (Amazon S3). We can validate the solution by running queries in Amazon Athena.

In the following sections, we walk through these components and how to use these automations in detail.

Generate Spark SQL metadata

Our batch job consists of Hive steps scheduled to run sequentially. For each step, we run HQL scripts that extract, transform, and aggregate input data into one final Hive table, which stores data in HDFS. We use the following Oozie workflow parser script, which takes the input of an existing Hive job and generates configurations artifacts needed for running SQL using PySpark.

Oozie workflow XML parser

We create a Python script to automatically parse the Oozie jobs, including workflow.xml, co-ordinator.xml, job properties, and HQL files. This script can handle many Hive actions in a workflow by organizing the metadata at the step level into separate folders. Each step includes the list of SQLs, SQL paths, and their static parameters, which are input for the solution in the next step.

The process consists of two steps:

  1. The Python parser script takes input of the existing Oozie Hive job and its configuration files.
  2. The script generates a metadata JSON file for each step.

The following diagram outlines these steps and shows sample output.

Prerequisites

You need the following prerequisites:

  • Python 3.8
  • Python packages:
    • sqlparse==0.4.2
    • jproperties==2.1.1
    • defusedxml== 0.7.1

Setup

Complete the following steps:

  1. Install Python 3.8.
  2. Create a virtual environment:
python3 -m venv /path/to/new/virtual/environment
  1. Activate the newly created virtual environment:
source /path/to/new/virtual/environment/bin/activate
  1. Git clone the project:
git clone https://github.com/aws-samples/oozie-job-parser-extract-hive-sql
  1. Install dependent packages:
cd oozie-job-parser-extract-hive-sql
pip install -r requirements.txt

Sample command

We can use the following sample command:

python xml_parser.py --base-folder ./sample_jobs/ --job-name sample_oozie_job_name --job-version V3 --hive-action-version 0.4 --coordinator-action-version 0.4 --workflow-version 0.4 --properties-file-name job.coordinator.properties

The output is as follows:

{'nameNode': 'hdfs://@{{/cluster/${{cluster}}/namenode}}:54310', 'jobTracker': '@{{/cluster/${{cluster}}/jobtracker}}:54311', 'queueName': 'test_queue', 'appName': 'test_app', 'oozie.use.system.libpath': 'true', 'oozie.coord.application.path': '${nameNode}/user/${user.name}/apps/${appName}', 'oozie_app_path': '${oozie.coord.application.path}', 'start': '${{startDate}}', 'end': '${{endDate}}', 'initial_instance': '${{startDate}}', 'job_name': '${appName}', 'timeOut': '-1', 'concurrency': '3', 'execOrder': 'FIFO', 'throttle': '7', 'hiveMetaThrift': '@{{/cluster/${{cluster}}/hivemetastore}}', 'hiveMySQL': '@{{/cluster/${{cluster}}/hivemysql}}', 'zkQuorum': '@{{/cluster/${{cluster}}/zookeeper}}', 'flag': '_done', 'frequency': 'hourly', 'owner': 'who', 'SLA': '2:00', 'job_type': 'coordinator', 'sys_cat_id': '6', 'active': '1', 'data_file': 'hdfs://${nameNode}/hive/warehouse/test_schema/test_dataset', 'upstreamTriggerDir': '/input_trigger/upstream1'}
('./sample_jobs/development/sample_oozie_job_name/step1/step1.json', 'w')

('./sample_jobs/development/sample_oozie_job_name/step2/step2.json', 'w')

Limitations

This method has the following limitations:

  • The Python script parses only HiveQL actions from the Oozie workflow.xml.
  • The Python script generates one file for each SQL statement and uses the sequence ID for file names. It doesn’t name the SQL based on the functionality of the SQL.

Run Spark SQL on Amazon EMR

After we create the SQL metadata files, we use another automation script to run them with Spark SQL on Amazon EMR. This automation script supports custom UDFs by adding JAR files to spark submit. This solution uses DynamoDB for logging the run details of SQLs for support and maintenance.

Architecture overview

The following diagram illustrates the solution architecture.

Prerequisites

You need the following prerequisites:

  • Version:
    • Spark 3.X
    • Python 3.8
    • Amazon EMR 6.1

Setup

Complete the following steps:

  1. Install the AWS Command Line Interface (AWS CLI) on your workspace by following the instructions in Installing or updating the latest version of the AWS CLI. To configure AWS CLI interaction with AWS, refer to Quick setup.
  2. Create two tables in DynamoDB: one to store metadata about jobs and steps, and another to log job runs.
    • Use the following AWS CLI command to create the metadata table in DynamoDB:
aws dynamodb create-table --region us-east-1 --table-name dw-etl-metadata --attribute-definitions '[ { "AttributeName": "id","AttributeType": "S" } , { "AttributeName": "step_id","AttributeType": "S" }]' --key-schema '[{"AttributeName": "id", "KeyType": "HASH"}, {"AttributeName": "step_id", "KeyType": "RANGE"}]' --billing-mode PROVISIONED --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

You can check on the DynamoDB console that the table dw-etl-metadata is successfully created.

The metadata table has the following attributes.

Attributes Type Comments
id String partition_key
step_id String sort_key
step_name String Step description
sql_base_path string Base path
sql_info list List of SQLs in ETL pipeline
. sql_path SQL file name
. sql_active_flag y/n
. sql_load_order Order of SQL
. sql_parameters Parameters in SQL and values
spark_config Map Spark configs
    • Use the following AWS CLI command to create the log table in DynamoDB:
aws dynamodb create-table --region us-east-1 --table-name dw-etl-pipelinelog --attribute-definitions '[ { "AttributeName":"job_run_id", "AttributeType": "S" } , { "AttributeName":"step_id", "AttributeType": "S" } ]' --key-schema '[{"AttributeName": "job_run_id", "KeyType": "HASH"},{"AttributeName": "step_id", "KeyType": "RANGE"}]' --billing-mode PROVISIONED --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

You can check on the DynamoDB console that the table dw-etl-pipelinelog is successfully created.

The log table has the following attributes.

Attributes Type Comments
job_run_id String partition_key
id String sort_key (UUID)
end_time String End time
error_description String Error in case of failure
expire Number Time to Live
sql_seq Number SQL sequence number
start_time String Start time
Status String Status of job
step_id String Job ID SQL belongs

The log table can grow quickly if there are too many jobs or if they are running frequently. We can archive them to Amazon S3 if they are no longer used or use the Time to Live feature of DynamoDB to clean up old records.

  1. Run the first command to set the variable in case you have an existing bucket that can be reused. If not, create a S3 bucket to store the Spark SQL code, which will be run by Amazon EMR.
export s3_bucket_name=unique-code-bucket-name # Change unique-code-bucket-name to a valid bucket name
aws s3api create-bucket --bucket $s3_bucket_name --region us-east-1
  1. Enable secure transfer on the bucket:
aws s3api put-bucket-policy --bucket $s3_bucket_name --policy '{"Version": "2012-10-17", "Statement": [{"Effect": "Deny", "Principal": {"AWS": "*"}, "Action": "s3:*", "Resource": ["arn:aws:s3:::unique-code-bucket-name", "arn:aws:s3:::unique-code-bucket-name/*"], "Condition": {"Bool": {"aws:SecureTransport": "false"} } } ] }' # Change unique-code-bucket-name to a valid bucket name

  1. Clone the project to your workspace:
git clone https://github.com/aws-samples/pyspark-sql-framework.git
  1. Create a ZIP file and upload it to the code bucket created earlier:
cd pyspark-sql-framework/code
zip code.zip -r *
aws s3 cp ./code.zip s3://$s3_bucket_name/framework/code.zip
  1. Upload the ETL driver code to the S3 bucket:
cd $OLDPWD/pyspark-sql-framework
aws s3 cp ./code/etl_driver.py s3://$s3_bucket_name/framework/

  1. Upload sample job SQLs to Amazon S3:
aws s3 cp ./sample_oozie_job_name/ s3://$s3_bucket_name/DW/sample_oozie_job_name/ --recursive

  1. Add a sample step (./sample_oozie_job_name/step1/step1.json) to DynamoDB (for more information, refer to Write data to a table using the console or AWS CLI):
{
  "name": "step1.q",
  "step_name": "step1",
  "sql_info": [
    {
      "sql_load_order": 5,
      "sql_parameters": {
        "DATE": "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}",
        "HOUR": "${coord:formatTime(coord:nominalTime(), 'HH')}"
      },
      "sql_active_flag": "Y",
      "sql_path": "5.sql"
    },
    {
      "sql_load_order": 10,
      "sql_parameters": {
        "DATE": "${coord:formatTime(coord:nominalTime(), 'yyyy-MM-dd')}",
        "HOUR": "${coord:formatTime(coord:nominalTime(), 'HH')}"
      },
      "sql_active_flag": "Y",
      "sql_path": "10.sql"
    }
  ],
  "id": "emr_config",
  "step_id": "sample_oozie_job_name#step1",
  "sql_base_path": "sample_oozie_job_name/step1/",
  "spark_config": {
    "spark.sql.parser.quotedRegexColumnNames": "true"
  }
}

  1. In the Athena query editor, create the database base:
create database base;
  1. Copy the sample data files from the repo to Amazon S3:
    1. Copy us_current.csv:
aws s3 cp ./sample_data/us_current.csv s3://$s3_bucket_name/covid-19-testing-data/base/source_us_current/;

  1. Copy states_current.csv:
aws s3 cp ./sample_data/states_current.csv s3://$s3_bucket_name/covid-19-testing-data/base/source_states_current/;

  1. To create the source tables in the base database, run the DDLs present in the repo in the Athena query editor:
    1. Run the ./sample_data/ddl/states_current.q file by modifying the S3 path to the bucket you created.
    1. Run the ./sample_data/ddl/us_current.q file by modifying the S3 path to the bucket you created.

The ETL driver file implements the Spark driver logic. It can be invoked locally or on an EMR instance.

  1. Launch an EMR cluster.
    1. Make sure to select Use for Spark table metadata under AWS Glue Data Catalog settings.

  1. Add the following steps to EMR cluster.
aws emr add-steps --cluster-id <<cluster id created above>> --steps 'Type=CUSTOM_JAR,Name="boto3",ActionOnFailure=CONTINUE,Jar=command-runner.jar,Args=[bash,-c,"sudo pip3 install boto3"]'
aws emr add-steps --cluster-id <<cluster id created above>> --steps 'Name="sample_oozie_job_name",Jar="command-runner.jar",Args=[spark-submit,--py-files,s3://unique-code-bucket-name-#####/framework/code.zip,s3://unique-code-bucket-name-#####/framework/etl_driver.py,--step_id,sample_oozie_job_name#step1,--job_run_id,sample_oozie_job_name#step1#2022-01-01-12-00-01,  --code_bucket=s3://unique-code-bucket-name-#####/DW,--metadata_table=dw-etl-metadata,--log_table_name=dw-etl-pipelinelog,--sql_parameters,DATE=2022-02-02::HOUR=12::code_bucket=s3://unique-code-bucket-name-#####]' # Change unique-code-bucket-name to a valid bucket name

The following table summarizes the parameters for the spark step.

Step type Spark Application
Name Any Name
Deploy mode Client
Spark-submit options --py-files s3://unique-code-bucket-name-#####/framework/code.zip
Application location s3://unique-code-bucket-name-####/framework/etl_driver.py
Arguments --step_id sample_oozie_job_name#step1 --job_run_id sample_oozie_job_name#step1#2022-01-01-12-00-01 --code_bucket=s3://unique-code-bucket-name-#######/DW --metadata_table=dw-etl-metadata --log_table_name=dw-etl-pipelinelog --sql_parameters DATE=2022-02-02::HOUR=12::code_bucket=s3://unique-code-bucket-name-#######
Action on failure Continue

The following table summarizes the script arguments.

Script Argument Argument Description
deploy-mode Spark deploy mode. Client/Cluster.
name <jobname>#<stepname> Unique name for the Spark job. This can be used to identify the job on the Spark History UI.
py-files <s3 path for code>/code.zip S3 path for the code.
<s3 path for code>/etl_driver.py S3 path for the driver module. This is the entry point for the solution.
step_id <jobname>#<stepname> Unique name for the step. This refers to the step_id in the metadata entered in DynamoDB.
job_run_id <random UUID> Unique ID to identify the log entries in DynamoDB.
log_table_name <DynamoDB Log table name> DynamoDB table for storing the job run details.
code_bucket <s3 bucket> S3 path for the SQL files that are uploaded in the job setup.
metadata_table <DynamoDB Metadata table name> DynamoDB table for storing the job metadata.
sql_parameters DATE=2022-07-04::HOUR=00 Any additional or dynamic parameters expected by the SQL files.

Validation

After completion of EMR step you should have data on S3 bucket for the table base.states_daily. We can validate the data by querying the table base.states_daily in Athena.

Congratulations, you have converted an Oozie Hive job to Spark and run on Amazon EMR successfully.

Solution highlights

This solution has the following benefits:

  • It avoids boilerplate code for any new pipeline and offers less maintenance of code
  • Onboarding any new pipeline only needs the metadata set up—the DynamoDB entries and SQL to be placed in the S3 path
  • Any common modifications or enhancements can be done at the solution level, which will be reflected across all jobs
  • DynamoDB metadata provides insight into all active jobs and their optimized runtime parameters
  • For each run, this solution persists the SQL start time, end time, and status in a log table to identify issues and analyze runtimes
  • It supports Spark SQL and UDF functionality. Custom UDFs can be provided externally to the spark submit command

Limitations

This method has the following limitations:

  • The solution only supports SQL queries on Spark
  • Each SQL should be a Spark action like insert, create, drop, and so on

In this post, we explained the scenario of migrating an existing Oozie job. We can use the PySpark solution independently for any new development by creating DynamoDB entries and SQL files.

Clean up

Delete all the resources created as part of this solution to avoid ongoing charges for the resources:

  1. Delete the DynamoDB tables:
aws dynamodb delete-table --table-name dw-etl-metadata --region us-east-1
aws dynamodb delete-table --table-name dw-etl-pipelinelog --region us-east-1
  1. Delete the S3 bucket:
aws s3 rm s3://$s3_bucket_name --region us-east-1 --recursive
aws s3api create-bucket --bucket $s3_bucket_name --region us-east-1

  1. Stop the EMR cluster if it wasn’t a transient cluster:
aws emr terminate-clusters --cluster-ids <<cluster id created above>> 

Conclusion

In this post, we presented two automated solutions: one for parsing Oozie workflows and HiveQL files to generate metadata, and a PySpark solution for running SQLs using generated metadata. We successfully implemented these solutions to migrate a Hive workload to EMR Spark for a major gaming customer and achieved about 60% effort reduction.

For a Hive with Oozie to Spark migration, these solutions help complete the code conversion quickly so you can focus on performance benchmark and testing. Developing a new pipeline is also quick—you only need to create SQL logic, test it using Spark (shell or notebook), add metadata to DynamoDB, and test via the PySpark SQL solution. Overall, you can use the solution in this post to accelerate Hive to Spark code migration.


About the authors

Vinay Kumar Khambhampati is a Lead Consultant with the AWS ProServe Team, helping customers with cloud adoption. He is passionate about big data and data analytics.

Sandeep Singh is a Lead Consultant at AWS ProServe, focused on analytics, data lake architecture, and implementation. He helps enterprise customers migrate and modernize their data lake and data warehouse using AWS services.

Amol Guldagad is a Data Analytics Consultant based in India. He has worked with customers in different industries like banking and financial services, healthcare, power and utilities, manufacturing, and retail, helping them solve complex challenges with large-scale data platforms. At AWS ProServe, he helps customers accelerate their journey to the cloud and innovate using AWS analytics services.

Optimizing fleet utilization with Amazon Location Service and HERE Technologies

Post Syndicated from Mahesh Geeniga original https://aws.amazon.com/blogs/architecture/optimizing-fleet-utilization-with-amazon-location-service-and-here-technologies/

The fleet management market is expected to grow at a Compound Annual Growth Rate (CAGR) of 15.5 percent—from 25.5 billion US dollars in 2022 to USD 52.4 billion in 2027. Optimizing how your organization uses its vehicle fleet is important for logistics and service providers such as last mile, middle mile, and field services.

In this post, we demonstrate how to build and run a solution for many-to-many vehicle routing using HERE Tour Planning and Amazon Location Service. HERE Technologies is a data provider for Amazon Location Service and provides it with map rendering, geocoding, search, and routing. HERE Tour Planning expands on functionality such as geocoding, basic routing, and matrix routing to consider parameters such as time windows, job requirements or priorities, vehicle capabilities, range, and traffic information. They also support immediate re-planning when conditions change.

The architecture described in this post can help you optimize your fleets for delivering shipments, such as perishable items on pallets, from a central distribution center to multiple retail locations. The architecture uses the AWS Cloud Development Kit (AWS CDK) to help you provision and version control your infrastructure. The architecture also uses Event Driven Architecture (EDA) based on AWS Lambda and Amazon DynamoDB. It uses Amazon Simple Storage Service (Amazon S3) and DynamoDB to store the artifacts generated and integrates with Amazon Location Service to plot the routes visually on a map for each delivery driver.

Solution overview

This post will help you:

  • Configure the many-to-many vehicle routing architecture using HERE Tour Planning and Amazon Location Service
  • Submit a HERE Tour Planning problem
  • Generate an optimized solution file
  • Run a React app that:
    • Generates a list of routes for each vehicle in the fleet
    • Allows drivers to select and view routes in detail

The following diagram outlines how the architecture works.

Many-to-many vehicle routing architecture

Figure 1. Many-to-many vehicle routing architecture using HERE Tour Planning and Amazon Location Service

Let’s explore the steps in the diagram.

  1. The fleet operator uploads the tour requirements file to an Amazon S3 bucket.
  2. The upload invokes a Lambda function to process the new Tour Request. If the HERE API key is present, it calls the HERE Tour Planning API.
  3. The HERE Tour Planning API calculates the solution to the routing problem.
  4. The driver uses the React app to select a vehicle, which requests a route.
  5. The invoked Lambda function uses Amazon Location Service to calculate the route and render it in the React app.

Prerequisites

This walkthrough requires the following installations and resources:

  • Have an AWS account
  • Install the AWS Amplify CLI (command-line interface)
  • Install the AWS CDK CLI
  • Install the AWS CLI
    • Configure and authenticate the AWS CLI to interact with your AWS account.
  • Have your preferred integrated development environment (IDE), such as Visual Studio Code
  • Have GitHub repository access
    • git clone https://github.com/aws-samples/aws-here-optimize-fleet-utilization
  • Have a HERE API Key (optional)
    • This is needed to invoke the HERE Tour Planning API for generating solutions to new routing problems.
    • The GitHub sample repository includes a problem and pre-solved solution file, so you don’t need to acquire a HERE API key to learn about these offerings.
    • To acquire an API key, create a free account on the HERE Platform, then follow the instructions in the HERE Tour Planning documentation to create your API key.
    • There can be additional charges based on API key use. For more details, see the HERE Tour Planning section within HERE service rates.

Walkthrough

Provision the infrastructure

  1. The architecture uses NPM node modules. Run the following commands to install the dependencies:
    • # AWS Lambda function dependencies
      cd lib/lambda/calculate-route
      npm install
    • # Sample Frontend application dependencies
      cd frontend/here-driver-app
      npm install
  2. If you have never used AWS CDK in your AWS account, you must first bootstrap the solution, which creates Amazon S3 buckets and metadata to support AWS CDK operations. Note: Architectural components described in this article are covered under AWS Free Tier and HERE Free monthly usage, but additional charges can occur based on usage beyond the Free Tier limits. We recommend following the Cleanup instructions after completing the walkthrough.
      • From the root of the repository, run the following command to generate the needed infrastructure:
        • cdk bootstrap
      • Output similar to the following indicates that you successfully bootstrapped the AWS account with what AWS CDK requires.

        Successful bootstrap of AWS account with AWS CDK requirements

        Figure 2. Successful bootstrap of AWS account with AWS CDK requirements

  3. Deploy the infrastructure for this solution by running the following command. This provisions much of the infrastructure required for this solution such as DynamoDB, Lambda functions, and Amazon S3 buckets:
    • cdk deploy
  4. Next, Amplify provisions the remaining resources to complete the architecture. Navigate to the folder for the frontend by running the following command:
    • cd frontend/here-driver-app
  5. Run the following series of Amplify commands to create the remaining resources, Amazon API Gateway, Amazon Cognito, and Amazon Location Service. For additional details, see Clone sample Amplify project.
  6. To accept the defaults, run the following command:
    • amplify init
  7. To push the infrastructure out to the AWS account, run the following command:
    • amplify push
  8. To publish the environment, run the following command:
    • amplify publish

Create problem and generate architecture files

The next step is to create the HERE Tour Planning problem file and submit it to the HERE Tour Planning API to solve. Note: You can either sign up for the HERE Developer program to receive an API key to test this solution live or use the problem and pre-solved solution provided in the /data folder of the repository.

  1. Open the Amazon S3 bucket that the previous step created.
  2. Upload a problem file (in JSON format) to the bucket.
  3. An Amazon S3 event notification invokes a Lambda function that performs a synchronous call to the HERE Tour Planning API and generates vehicle routing problem solution file in JSON format.
  4. The Lambda function saves the solution file to the Amazon S3 bucket and additional details about the solution to a DynamoDB table.
  5. The delivery drivers can use the example React app to view the list of vehicles and the routes.
Creating a problem file and submitting it to HERE Tour Planning API

Figure 3. Creating a problem file and submitting it to HERE Tour Planning API

Frontend

The next step is to run the React Frontend app to see the results. Access to the app and the API Gateway is secured with Amazon Cognito.

  1. To run the web application, run the following command:
    • npm start
  2. A local web server runs on http://localhost:3000.
  3. To use the system, the user must authenticate. Amazon Cognito allows users to sign in or create a new account.
  4. After you authenticate, the Home screen displays a list of available vehicles and routes.

    Available vehicles and routes

    Figure 4. Available vehicles and routes

Choose a vehicle to see the detail of the route. Each red marker is a stop.

Vehicle route details

Figure 5. Vehicle route details

Cleanup

To avoid incurring future charges, delete all resources created.

  1. Run the following commands to delete the application in AWS Amplify. As an alternative, you can use the Amplify console to delete Amplify resources:
    • cd frontend/here-driver-app
      amplify pull
      amplify delete
  2. With AWS CDK, run the following command to delete the AWS CloudFormation stack that was used to provision the resources. Note: You can leave the AWS CLI, Amplify CLI, and CDK CLI installed on your computer for future development:
    • cdk destroy

Conclusion

In this post, using shipment delivery from a central distribution center use case, we’ve demonstrated how you can build your own serverless solution for optimizing middle and last mile operations. The solution uses multi-vehicle and multi-stop optimization services provided by HERE Tour Planning and Amazon Location Service to visualize the generated routes for each delivery vehicle driver. For additional details about HERE’s offerings on AWS Marketplace, see AWS Marketplace: HERE Technologies.

Serverless ICYMI Q1 2023

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/serverless-icymi-q1-2023/

Welcome to the 21st edition of the AWS Serverless ICYMI (in case you missed it) quarterly recap. Every quarter, we share all the most recent product launches, feature enhancements, blog posts, webinars, live streams, and other interesting things that you might have missed!

ICYMI2023Q1

In case you missed our last ICYMI, check out what happened last quarter here.

Artificial intelligence (AI) technologies, ChatGPT, and DALL-E are creating significant interest in the industry at the moment. Find out how to integrate serverless services with ChatGPT and DALL-E to generate unique bedtime stories for children.

Example notification of a story hosted with Next.js and App Runner

Example notification of a story hosted with Next.js and App Runner

Serverless Land is a website maintained by the Serverless Developer Advocate team to help you build serverless applications and includes workshops, code examples, blogs, and videos. There is now enhanced search functionality so you can search across resources, patterns, and video content.

SLand-search

ServerlessLand search

AWS Lambda

AWS Lambda has improved how concurrency works with Amazon SQS. You can now control the maximum number of concurrent Lambda functions invoked.

The launch blog post explains the scaling behavior of Lambda using this architectural pattern, challenges this feature helps address, and a demo of maximum concurrency in action.

Maximum concurrency is set to 10 for the SQS queue.

Maximum concurrency is set to 10 for the SQS queue.

AWS Lambda Powertools is an open-source library to help you discover and incorporate serverless best practices more easily. Lambda Powertools for .NET is now generally available and currently focused on three observability features: distributed tracing (Tracer), structured logging (Logger), and asynchronous business and application metrics (Metrics). Powertools is also available for Python, Java, and Typescript/Node.js programming languages.

To learn more:

Lambda announced a new feature, runtime management controls, which provide more visibility and control over when Lambda applies runtime updates to your functions. The runtime controls are optional capabilities for advanced customers that require more control over their runtime changes. You can now specify a runtime management configuration for each function with three settings, Automatic (default), Function update, or manual.

There are three new Amazon CloudWatch metrics for asynchronous Lambda function invocations: AsyncEventsReceived, AsyncEventAge, and AsyncEventsDropped. You can track the asynchronous invocation requests sent to Lambda functions to monitor any delays in processing and take corrective actions if required. The launch blog post explains the new metrics and how to use them to troubleshoot issues.

Lambda now supports Amazon DocumentDB change streams as an event source. You can use Lambda functions to process new documents, track updates to existing documents, or log deleted documents. You can use any programming language that is supported by Lambda to write your functions.

There is a helpful blog post suggesting best practices for developing portable Lambda functions that allow you to port your code to containers if you later choose to.

AWS Step Functions

AWS Step Functions has expanded its AWS SDK integrations with support for 35 additional AWS services including Amazon EMR Serverless, AWS Clean Rooms, AWS IoT FleetWise, AWS IoT RoboRunner and 31 other AWS services. In addition, Step Functions also added support for 1000+ new API actions from new and existing AWS services such as Amazon DynamoDB and Amazon Athena. For the full list of added services, visit AWS SDK service integrations.

Amazon EventBridge

Amazon EventBridge has launched the AWS Controllers for Kubernetes (ACK) for EventBridge and Pipes . This allows you to manage EventBridge resources, such as event buses, rules, and pipes, using the Kubernetes API and resource model (custom resource definitions).

EventBridge event buses now also support enhanced integration with Service Quotas. Your quota increase requests for limits such as PutEvents transactions-per-second, number of rules, and invocations per second among others will be processed within one business day or faster, enabling you to respond quickly to changes in usage.

AWS SAM

The AWS Serverless Application Model (SAM) Command Line Interface (CLI) has added the sam list command. You can now show resources defined in your application, including the endpoints, methods, and stack outputs required to test your deployed application.

AWS SAM has a preview of sam build support for building and packaging serverless applications developed in Rust. You can use cargo-lambda in the AWS SAM CLI build workflow and AWS SAM Accelerate to iterate on your code changes rapidly in the cloud.

You can now use AWS SAM connectors as a source resource parameter. Previously, you could only define AWS SAM connectors as a AWS::Serverless::Connector resource. Now you can add the resource attribute on a connector’s source resource, which makes templates more readable and easier to update over time.

AWS SAM connectors now also support multiple destinations to simplify your permissions. You can now use a single connector between a single source resource and multiple destination resources.

In October 2022, AWS released OpenID Connect (OIDC) support for AWS SAM Pipelines. This improves your security posture by creating integrations that use short-lived credentials from your CI/CD provider. There is a new blog post on how to implement it.

Find out how best to build serverless Java applications with the AWS SAM CLI.

AWS App Runner

AWS App Runner now supports retrieving secrets and configuration data stored in AWS Secrets Manager and AWS Systems Manager (SSM) Parameter Store in an App Runner service as runtime environment variables.

AppRunner also now supports incoming requests based on HTTP 1.0 protocol, and has added service level concurrency, CPU and Memory utilization metrics.

Amazon S3

Amazon S3 now automatically applies default encryption to all new objects added to S3, at no additional cost and with no impact on performance.

You can now use an S3 Object Lambda Access Point alias as an origin for your Amazon CloudFront distribution to tailor or customize data to end users. For example, you can resize an image depending on the device that an end user is visiting from.

S3 has introduced Mountpoint for S3, a high performance open source file client that translates local file system API calls to S3 object API calls like GET and LIST.

S3 Multi-Region Access Points now support datasets that are replicated across multiple AWS accounts. They provide a single global endpoint for your multi-region applications, and dynamically route S3 requests based on policies that you define. This helps you to more easily implement multi-Region resilience, latency-based routing, and active-passive failover, even when data is stored in multiple accounts.

Amazon Kinesis

Amazon Kinesis Data Firehose now supports streaming data delivery to Elastic. This is an easier way to ingest streaming data to Elastic and consume the Elastic Stack (ELK Stack) solutions for enterprise search, observability, and security without having to manage applications or write code.

Amazon DynamoDB

Amazon DynamoDB now supports table deletion protection to protect your tables from accidental deletion when performing regular table management operations. You can set the deletion protection property for each table, which is set to disabled by default.

Amazon SNS

Amazon SNS now supports AWS X-Ray active tracing to visualize, analyze, and debug application performance. You can now view traces that flow through Amazon SNS topics to destination services, such as Amazon Simple Queue Service, Lambda, and Kinesis Data Firehose, in addition to traversing the application topology in Amazon CloudWatch ServiceLens.

SNS also now supports setting content-type request headers for HTTPS notifications so applications can receive their notifications in a more predictable format. Topic subscribers can create a DeliveryPolicy that specifies the content-type value that SNS assigns to their HTTPS notifications, such as application/json, application/xml, or text/plain.

EDA Visuals collection added to Serverless Land

The Serverless Developer Advocate team has extended Serverless Land and introduced EDA visuals. These are small bite sized visuals to help you understand concept and patterns about event-driven architectures. Find out about batch processing vs. event streaming, commands vs. events, message queues vs. event brokers, and point-to-point messaging. Discover bounded contexts, migrations, idempotency, claims, enrichment and more!

EDA-visuals

EDA Visuals

To learn more:

Serverless Repos Collection on Serverless Land

There is also a new section on Serverless Land containing helpful code repositories. You can search for code repos to use for examples, learning or building serverless applications. You can also filter by use-case, runtime, and level.

Serverless Repos Collection

Serverless Repos Collection

Serverless Blog Posts

January

Jan 12 – Introducing maximum concurrency of AWS Lambda functions when using Amazon SQS as an event source

Jan 20 – Processing geospatial IoT data with AWS IoT Core and the Amazon Location Service

Jan 23 – AWS Lambda: Resilience under-the-hood

Jan 24 – Introducing AWS Lambda runtime management controls

Jan 24 – Best practices for working with the Apache Velocity Template Language in Amazon API Gateway

February

Feb 6 – Previewing environments using containerized AWS Lambda functions

Feb 7 – Building ad-hoc consumers for event-driven architectures

Feb 9 – Implementing architectural patterns with Amazon EventBridge Pipes

Feb 9 – Securing CI/CD pipelines with AWS SAM Pipelines and OIDC

Feb 9 – Introducing new asynchronous invocation metrics for AWS Lambda

Feb 14 – Migrating to token-based authentication for iOS applications with Amazon SNS

Feb 15 – Implementing reactive progress tracking for AWS Step Functions

Feb 23 – Developing portable AWS Lambda functions

Feb 23 – Uploading large objects to Amazon S3 using multipart upload and transfer acceleration

Feb 28 – Introducing AWS Lambda Powertools for .NET

March

Mar 9 – Server-side rendering micro-frontends – UI composer and service discovery

Mar 9 – Building serverless Java applications with the AWS SAM CLI

Mar 10 – Managing sessions of anonymous users in WebSocket API-based applications

Mar 14 –
Implementing an event-driven serverless story generation application with ChatGPT and DALL-E

Videos

Serverless Office Hours – Tues 10AM PT

Weekly office hours live stream. In each session we talk about a specific topic or technology related to serverless and open it up to helping you with your real serverless challenges and issues. Ask us anything you want about serverless technologies and applications.

January

Jan 10 – Building .NET 7 high performance Lambda functions

Jan 17 – Amazon Managed Workflows for Apache Airflow at Scale

Jan 24 – Using Terraform with AWS SAM

Jan 31 – Preparing your serverless architectures for the big day

February

Feb 07- Visually design and build serverless applications

Feb 14 – Multi-tenant serverless SaaS

Feb 21 – Refactoring to Serverless

Feb 28 – EDA visually explained

March

Mar 07 – Lambda cookbook with Python

Mar 14 – Succeeding with serverless

Mar 21 – Lambda Powertools .NET

Mar 28 – Server-side rendering micro-frontends

FooBar Serverless YouTube channel

Marcia Villalba frequently publishes new videos on her popular serverless YouTube channel. You can view all of Marcia’s videos at https://www.youtube.com/c/FooBar_codes.

January

Jan 12 – Serverless Badge – A new certification to validate your Serverless Knowledge

Jan 19 – Step functions Distributed map – Run 10k parallel serverless executions!

Jan 26 – Step Functions Intrinsic Functions – Do simple data processing directly from the state machines!

February

Feb 02 – Unlock the Power of EventBridge Pipes: Integrate Across Platforms with Ease!

Feb 09 – Amazon EventBridge Pipes: Enrichment and filter of events Demo with AWS SAM

Feb 16 – AWS App Runner – Deploy your apps from GitHub to Cloud in Record Time

Feb 23 – AWS App Runner – Demo hosting a Node.js app in the cloud directly from GitHub (AWS CDK)

March

Mar 02 – What is Amazon DynamoDB? What are the most important concepts? What are the indexes?

Mar 09 – Choreography vs Orchestration: Which is Best for Your Distributed Application?

Mar 16 – DynamoDB Single Table Design: Simplify Your Code and Boost Performance with Table Design Strategies

Mar 23 – 8 Reasons You Should Choose DynamoDB for Your Next Project and How to Get Started

Sessions with SAM & Friends

SAMFiends

AWS SAM & Friends

Eric Johnson is exploring how developers are building serverless applications. We spend time talking about AWS SAM as well as others like AWS CDK, Terraform, Wing, and AMPT.

Feb 16 – What’s new with AWS SAM

Feb 23 – AWS SAM with AWS CDK

Mar 02 – AWS SAM and Terraform

Mar 10 – Live from ServerlessDays ANZ

Mar 16 – All about AMPT

Mar 23 – All about Wing

Mar 30 – SAM Accelerate deep dive

Still looking for more?

The Serverless landing page has more information. The Lambda resources page contains case studies, webinars, whitepapers, customer stories, reference architectures, and even more Getting Started tutorials.

You can also follow the Serverless Developer Advocacy team on Twitter to see the latest news, follow conversations, and interact with the team.

Genomics workflows, Part 5: automated benchmarking

Post Syndicated from Rostislav Markov original https://aws.amazon.com/blogs/architecture/genomics-workflows-part-5-automated-benchmarking/

Launching and running genomics workflows can take hours and involves large pools of compute instances that process data at a petabyte scale. Benchmarking helps you evaluate workflow performance and discover faster and cheaper ways of running them.

In practice, performance evaluations happen irregularly because of the associated heavy lifting. In this blog post, we discuss how life-science research teams can automate evaluations.

Business Benefits

An automated benchmarking solution provides:

  • more accurate enterprise resource planning by performing historical analytics,
  • lower cost to the business by comparing performance on different resource types, and
  • cost transparency to the business by quantifying periodical chargeback.

We’ve used automated benchmarking to compare processing times on different services such as Amazon Elastic Compute Cloud (Amazon EC2), AWS Batch, AWS ParallelCluster, Amazon Elastic Kubernetes Service (Amazon EKS), and on-premises HPC clusters. Scientists, financiers, technical leaders, and other stakeholders can build reports and dashboards to compare consumption data by consumer, workflow type, and time period.

Design pattern

Our automated benchmarking solution measures performance on two dimensions:

  • Timing: measures the duration of a workflow launch on a specific dataset
  • Pricing: measures the associated cost

This solution can be extended to other performance metrics such as iterations per second or process/thread distribution across compute nodes.

Our requirements include the following:

  • Consistent measurement of timing based on workflow status (such as preparing, waiting, ready, running, failed, complete)
  • Extensible pricing models based on unit prices (the Amazon EC2 Spot price at a specific period of time compared to Amazon EC2 On-Demand pricing)
  • Scalable, cost-efficient, and flexible data store enabling historical benchmarking and estimations
  • Minimal infrastructure management overhead

We choose a serverless design pattern using AWS Step Functions orchestration, AWS Lambda for our application code, and Amazon DynamoDB to track workflow launch IDs and states (as described in Part 3). We assume that the genomics workflows run on AWS Batch with genomics data on Amazon FSx for Lustre (Part 1). AWS Step Functions allows us to break down processing into smaller steps and avoid monolithic application code. Our evaluation process runs in four steps:

  1. Monitor for completed workflow launches in the DynamoDB stream using an Amazon EventBridge pipe with a Step Functions workflow as target. This event-driven approach is more efficient than periodic polling and avoids custom code for parsing status and cost values in all records of the DynamoDB stream.
  2. Collect a list of all compute resources associated with the workflow launch. Design a Lambda function that queries the AWS Batch API (see Part 1) to describe compute environment parameters like the Amazon EC2 instance IDs and their details, such as processing times, instance family/size, and allocation strategy (for example, Spot Instances, Reserved Instances, On-Demand Instances).
  3. Calculate the cost of all consumed resources. We achieve this with another Lambda function, which calculates the total price based on unit prices from the AWS Price List Query API.
  4. Our state machine updates the total price in the DynamoDB table without the need for additional application code.

Figure 1 visualizes these steps.

Automated benchmarking of genomics workflows

Figure 1. Automated benchmarking of genomics workflows

Implementation considerations

AWS Step Functions orchestrates our benchmarking workflow reliably and makes our application code easy to maintain. Figure 2 summarizes the state machine transitions that we’ll describe.

AWS Step Functions state machine for automated benchmarking

Figure 2. AWS Step Functions state machine for automated benchmarking

Gather consumption details

Configure the DynamoDB stream view type to New image so that the entire item is passed through as it appears after it was changed. We set up an Amazon EventBridge pipe with event filtering and the DynamoDB stream as a source. Our event filter uses multiple matching on records with a status of COMPLETE, but no cost entry in order to avoid an infinite loop. Once our state machine has updated the DynamoDB item with the workflow price, the resulting record in the DynamoDB stream will not pass our event filter.

The syntax of our event filter is as follows:

{
  "dynamodb": {
    "NewImage": {
      "status": {
        "S": ["COMPLETE"]
      },
      "totalCost": {
        "S": [{
          "exists": false
        }]
      }
    }
  }
}

We use an input transformer to simplify follow-on parsing by removing unnecessary metadata from the event.

The consumed resources included in the stream record are the auto-scaling group ID for AWS Batch and the Amazon FSx for Lustre volume ID. We use the DescribeJobs API (describe_jobs in Boto3) to determine which compute resources were used. If the response is a list of EC2 instances, we then look up consumption information including start and end times using the ListJobs API (list_jobs in Boto3) for each compute node. We use describe_volumes with filters on the identified EC2 instances to obtain the size and type of Amazon Elastic Block Store (Amazon EBS) volumes.

Calculate prices

Another Lambda function obtains the associated unit prices of all consumed resources using the GetProducts request of AWS Price List Query API (get_products in Boto3) and then parsing the pricePerUnit value. For Spot Instances, we use describe_spot_price_history of the EC2 client in Boto3 and specify the time range and instance types for which we want to receive prices.

Calculate the price of workflow launches based on the following factors:

  • Number and size of EC2 instances in auto-scaling node groups
  • Size of EBS volumes and Amazon FSx for Lustre
  • Processing duration

Our Python-based Lambda function calculates the total, rounds it, and delivers the price breakdown in the following format:

total_cost: str, instance_cost: str, volume_cost: str, filesystem_cost: str

Lastly, we put the price breakdown to the DynamoDB table using UpdateItem directly from the Amazon States Language.

Note that AWS credits and enterprise discounts might not be reflected in the responses of the AWS Price List Query API unless applied to the particular AWS account. This is often considered best practice in light of least-privilege considerations.

In the past, we’ve also used AWS Cost Explorer instead of the AWS Price List API. AWS Cost Explorer data is updated at least once every 24 hours. You can denote the pending price status in the DynamoDB table item and use the Wait state to delay the calculation process.

The presented solution can be extended to other compute services such as Amazon Elastic Kubernetes Service (Amazon EKS). For Amazon EKS, events are enriched with the cluster ID from the DynamoDB table and the price calculation should also include control plane costs.

Conclusion

Life-science research teams use benchmarking to compare workflow performance and inform their architectural decisions. Such evaluations are effort-intensive and therefore done irregularly.

In this blog post, we showed how life-science research teams can automate benchmarking for their scientific workflows. The insights teams gain from automated benchmarking indicate continuous optimization opportunities, such as by adjusting compute node configuration. The evaluation data is also available on demand for other purposes including chargeback.

Stay tuned for our next post in which we show how to use historical benchmarking data for price estimations of future workflow launches.

Related information

Unit Testing AWS Lambda with Python and Mock AWS Services

Post Syndicated from Kevin Hakanson original https://aws.amazon.com/blogs/devops/unit-testing-aws-lambda-with-python-and-mock-aws-services/

When building serverless event-driven applications using AWS Lambda, it is best practice to validate individual components.  Unit testing can quickly identify and isolate issues in AWS Lambda function code.  The techniques outlined in this blog demonstrates unit test techniques for Python-based AWS Lambda functions and interactions with AWS Services.

The full code for this blog is available in the GitHub project as a demonstrative example.

Example use case

Let’s consider unit testing a serverless application which provides an API endpoint to generate a document.  When the API endpoint is called with a customer identifier and document type, the Lambda function retrieves the customer’s name from DynamoDB, then retrieves the document text from DynamoDB for the given document type, finally generating and writing the resulting document to S3.

Figure 1. Example application architecture

Figure 1. Example application architecture

  1. Amazon API Gateway provides an endpoint to request the generation of a document for a given customer.  A document type and customer identifier are provided in this API call.
  2. The endpoint invokes an AWS Lambda function that generates a document using the customer identifier and the document type provided.
  3. An Amazon DynamoDB table stores the contents of the documents and the users name, which are retrieved by the Lambda function.
  4. The resulting text document is stored to Amazon S3.

Our testing goal is to determine if an isolated “unit” of code works as intended. In this blog, we will be writing tests to provide confidence that the logic written in the above AWS Lambda function behaves as we expect. We will mock the service integrations to Amazon DynamoDB and S3 to isolate and focus our tests on the Lambda function code, and not on the behavior of the AWS Services.

Define the AWS Service resources in the Lambda function

Before writing our first unit test, let’s look at the Lambda function that contains the behavior we wish to test.  The full code for the Lambda function is available in the GitHub repository as src/sample_lambda/app.py.

As part of our Best practices for working AWS Lambda functions, we recommend initializing AWS service resource connections outside of the handler function and in the global scope.  Additionally, we can retrieve any relevant environment variables in the global scope so that subsequent invocations of the Lambda function do not repeatedly need to retrieve them.  For organization, we can put the resource and variables in a dictionary:

_LAMBDA_DYNAMODB_RESOURCE = { "resource" : resource('dynamodb'), 
                              "table_name" : environ.get("DYNAMODB_TABLE_NAME","NONE") }

However, globally scoped code and global variables are challenging to test in Python, as global statements are executed on import, and outside of the controlled test flow.  To facilitate testing, we define classes for supporting AWS resource connections that we can override (patch) during testing.  These classes will accept a dictionary containing the boto3 resource and relevant environment variables.

For example, we create a DynamoDB resource class with a parameter “boto3_dynamodb_resource” that accepts a boto3 resource connected to DynamoDB:

class LambdaDynamoDBClass:
    def __init__(self, lambda_dynamodb_resource):
        self.resource = lambda_dynamodb_resource["resource"]
        self.table_name = lambda_dynamodb_resource["table_name"]
        self.table = self.resource.Table(self.table_name)

Build the Lambda Handler

The Lambda function handler is the method in the AWS Lambda function code that processes events. When the function is invoked, Lambda runs the handler method. When the handler exits or returns a response, it becomes available to process another event.

To facilitate unit test of the handler function, move as much of logic as possible to other functions that are then called by the Lambda hander entry point.  Also, pass the AWS resource global variables to these subsequent function calls.  This approach enables us to mock and intercept all resources and calls during test.

In our example, the handler references the global variables, and instantiates the resource classes to setup the connections to specific AWS resources.  (We will be able to override and mock these connections during unit test.)

Then the handler calls the create_letter_in_s3 function to perform the steps of creating the document, passing the resource classes.  This downstream function avoids directly referencing the global context or any AWS resource connections directly.

def lambda_handler(event: APIGatewayProxyEvent, context: LambdaContext) -> Dict[str, Any]:

    global _LAMBDA_DYNAMODB_RESOURCE
    global _LAMBDA_S3_RESOURCE

    dynamodb_resource_class = LambdaDynamoDBClass(_LAMBDA_DYNAMODB_RESOURCE)
    s3_resource_class = LambdaS3Class(_LAMBDA_S3_RESOURCE)

    return create_letter_in_s3(
            dynamo_db = dynamodb_resource_class,
            s3 = s3_resource_class,
            doc_type = event["pathParameters"]["docType"],
            cust_id = event["pathParameters"]["customerId"])

Unit testing with mock AWS services

Our Lambda function code has now been written and is ready to be tested, let’s take a look at the unit test code!   The full code for the unit test is available in the GitHub repository as tests/unit/src/test_sample_lambda.py.

In production, our Lambda function code will directly access the AWS resources we defined in our function handler; however, in our unit tests we want to isolate our code and replace the AWS resources with simulations.  This isolation facilitates running unit tests in an isolated environment to prevent accidental access to actual cloud resources.

Moto is a python library for Mocking AWS Services that we will be using to simulate AWS resource our tests.  Moto supports many AWS resources, and it allows you to test your code with little or no modification by emulating functionality of these services.

Moto uses decorators to intercept and simulate responses to and from AWS resources.  By adding a decorator for a given AWS service, subsequent calls from the module to that service will be re-directed to the mock.

@moto.mock_dynamodb
@moto.mock_s3

Configure Test Setup and Tear-down

The mocked AWS resources will be used during the unit test suite.  Using the setUp() method allows you to define and configure the mocked global AWS Resources before the tests are run.

We define the test class and a setUp() method and initialize the mock AWS resource.  This includes configuring the resource to prepare it for testing, such as defining a mock DynamoDB table or creating a mock S3 Bucket.

class TestSampleLambda(TestCase):
    def setUp(self) -> None:
        dynamodb = boto3.resource("dynamodb", region_name="us-east-1")
        dynamodb.create_table(
            TableName = self.test_ddb_table_name,
            KeySchema = [{"AttributeName": "PK", "KeyType": "HASH"}],
            AttributeDefinitions = [{"AttributeName": "PK", 
                                     "AttributeType": "S"}],
            BillingMode = 'PAY_PER_REQUEST'
           
        s3_client = boto3.client('s3', region_name="us-east-1")
        s3_client.create_bucket(Bucket = self.test_s3_bucket_name ) 

After creating the mocked resources, the setup function creates resource class object referencing those mocked resources, which will be used during testing.

        mocked_dynamodb_resource = resource("dynamodb")
        mocked_s3_resource = resource("s3")
        mocked_dynamodb_resource = { "resource" : resource('dynamodb'),
                                     "table_name" : self.test_ddb_table_name  }
        mocked_s3_resource = { "resource" : resource('s3'),
                               "bucket_name" : self.test_s3_bucket_name }
        self.mocked_dynamodb_class = LambdaDynamoDBClass(mocked_dynamodb_resource)
        self.mocked_s3_class = LambdaS3Class(mocked_s3_resource)

Test #1: Verify the code writes the document to S3

Our first test will validate our Lambda function writes the customer letter to an S3 bucket in the correct manner.  We will follow the standard test format of arrange, act, assert when writing this unit test.

Arrange the data we need in the DynamoDB table:

def test_create_letter_in_s3(self) -> None:
    
    self.mocked_dynamodb_class.table.put_item(Item={"PK":"D#UnitTestDoc",
                                                        "data":"Unit Test Doc Corpi"})
    self.mocked_dynamodb_class.table.put_item(Item={"PK":"C#UnitTestCust",
                                                        "data":"Unit Test Customer"})

Act by calling the create_letter_in_s3 function.  During these act calls, the test passes the AWS resources as created in the setUp().

    test_return_value = create_letter_in_s3(
                        dynamo_db = self.mocked_dynamodb_class,
                        s3=self.mocked_s3_class,
                        doc_type = "UnitTestDoc",
                        cust_id = "UnitTestCust"
                        )

Assert by reading the data written to the mock S3 bucket, and testing conformity to what we are expecting:

bucket_key = "UnitTestCust/UnitTestDoc.txt"
    body = self.mocked_s3_class.bucket.Object(bucket_key).get()['Body'].read()

    self.assertEqual(test_return_value["statusCode"], 200)
    self.assertIn("UnitTestCust/UnitTestDoc.txt", test_return_value["body"])
    self.assertEqual(body.decode('ascii'),"Dear Unit Test Customer;\nUnit Test Doc Corpi")

Tests #2 and #3: Data not found error conditions

We can also test error conditions and handling, such as keys not found in the database.  For example, if a customer identifier is submitted, but does not exist in the database lookup, does the logic handle this and return a “Not Found” code of 404?

To test this in test #2, we add data to the mocked DynamoDB table, but then submit a customer identifier that is not in the database.

This test, and a similar test #3 for “Document Types not found”, are implemented in the example test code on GitHub.

Test #4: Validate the handler interface

As the application logic resides in independently tested functions, the Lambda handler function provides only interface validation and function call orchestration.  Therefore, the test for the handler validates that the event is parsed correctly, any functions are invoked as expected, and the return value is passed back.

To emulate the global resource variables and other functions, patch both the global resource classes and logic functions.

    @patch("src.sample_lambda.app.LambdaDynamoDBClass")
    @patch("src.sample_lambda.app.LambdaS3Class")
    @patch("src.sample_lambda.app.create_letter_in_s3")
    def test_lambda_handler_valid_event_returns_200(self,
                            patch_create_letter_in_s3 : MagicMock,
                            patch_lambda_s3_class : MagicMock,
                            patch_lambda_dynamodb_class : MagicMock
                            ):

Arrange for the test by setting return values for the patched objects.

patch_lambda_dynamodb_class.return_value = self.mocked_dynamodb_class
        patch_lambda_s3_class.return_value = self.mocked_s3_class

        return_value_200 = {"statusCode" : 200, "body":"OK"}
        patch_create_letter_in_s3.return_value = return_value_200

We need to provide event data when invoking the Lambda handler.  A good practice is to save test events as separate JSON files, rather than placing them inline as code. In the example project, test events are located in the folder “tests/events/”. During test execution, the event object is created from the JSON file using the utility function named load_sample_event_from_file.

test_event = self.load_sample_event_from_file("sampleEvent1")

Act by calling the lambda_handler function.

test_return_value = lambda_handler(event=test_event, context=None)

Assert by ensuring the create_letter_in_s3 function is called with the expected parameters based on the event, and a create_letter_in_s3 function return value is passed back to the caller.  In our example, this value is simply passed with no alterations.

patch_create_letter_in_s3.assert_called_once_with(
                                        dynamo_db=self.mocked_dynamodb_class,
                                        s3=self.mocked_s3_class,
                                        doc_type=test_event["pathParameters"]["docType"],
                                        cust_id=test_event["pathParameters"]["customerId"])

       self.assertEqual(test_return_value, return_value_200)

Tear Down

The tearDown() method is called immediately after the test method has been run and the result is recorded.  In our example tearDown() method, we clean up any data or state created so the next test won’t be impacted.

Running the unit tests

The unittest Unit testing framework can be run using the Python pytest utility.  To ensure network isolation and verify the unit tests are not accidently connecting to AWS resources, the pytest-socket project provides the ability to disable network communication during a test.

pytest -v --disable-socket -s tests/unit/src/

The pytest command results in a PASSED or FAILED status for each test.  A PASSED status verifies that your unit tests, as written, did not encounter errors or issues,

Conclusion

Unit testing is a software development process in which different parts of an application, called units, are individually and independently tested. Tests validate the quality of the code and confirm that it functions as expected. Other developers can gain familiarity with your code base by consulting the tests. Unit tests reduce future refactoring time, help engineers get up to speed on your code base more quickly, and provide confidence in the expected behaviour.

We’ve seen in this blog how to unit test AWS Lambda functions and mock AWS Services to isolate and test individual logic within our code.

AWS Lambda Powertools for Python has been used in the project to validate hander events.   Powertools provide a suite of utilities for AWS Lambda functions to ease adopting best practices such as tracing, structured logging, custom metrics, idempotency, batching, and more.

Learn more about AWS Lambda testing in our prescriptive test guidance, and find additional test examples on GitHub.  For more serverless learning resources, visit Serverless Land.

About the authors:

Tom Romano

Tom Romano is a Solutions Architect for AWS World Wide Public Sector from Tampa, FL, and assists GovTech and EdTech customers as they create new solutions that are cloud-native, event driven, and serverless. He is an enthusiastic Python programmer for both application development and data analytics. In his free time, Tom flies remote control model airplanes and enjoys vacationing with his family around Florida and the Caribbean.

Kevin Hakanson

Kevin Hakanson is a Sr. Solutions Architect for AWS World Wide Public Sector based in Minnesota. He works with EdTech and GovTech customers to ideate, design, validate, and launch products using cloud-native technologies and modern development practices. When not staring at a computer screen, he is probably staring at another screen, either watching TV or playing video games with his family.

Realtime monitoring of microservices and cloud-native applications with IBM Instana SaaS on AWS

Post Syndicated from Eduardo Monich Fronza original https://aws.amazon.com/blogs/architecture/realtime-monitoring-of-microservices-and-cloud-native-applications-with-ibm-instana-saas-on-aws/

Customers are adopting microservices architecture to build innovative and scalable applications on Amazon Web Services (AWS). These microservices applications are deployed across multiple AWS services, and customers are looking for comprehensive observability solutions that can help them effectively monitor and manage the performance of their applications in real-time.

IBM Instana is a fully automated application performance management (APM) solution, available to customers as a fully managed software as a service (SaaS) solution on AWS. It is specifically designed to help customers address the challenges of monitoring microservices and cloud-native applications in real-time. It uses artificial intelligence and machine learning to provide detailed insights into the health and behavior of applications, allowing developers and IT teams to gain real-time insights into their microservices applications, optimize performance, and quickly identify and troubleshoot issues.

This post explains the capabilities of IBM Instana to automatically collect observability metrics, traces, and events from microservices deployed on AWS cloud, as well as on-premises, to provide full visibility into the performance of individual components and applications as a whole.

IBM Instana solution overview

IBM Instana is designed to be highly scalable and adaptable to changing microservices applications environments. Its architecture (Figure 1) consists of several components that work together to provide comprehensive monitoring for microservices and cloud-native applications.

Instana’s main building blocks are host agents and agent sensors that are deployed in a customer’s AWS account and responsible for collecting, aggregating, and sending detailed monitoring information of applications and AWS services to the Instana SaaS backend.

The Instana SaaS backend services provide several key components, including data collectors, storage services, analytics engines, and user interfaces. It allows customers to process and analyze data in real-time, generate actionable insights, have a comprehensive view of their applications and infrastructure performance, enabling them to quickly identify and resolve issues and improve their overall operations.

IBM Instana architecture on AWS

Figure 1. IBM Instana architecture on AWS

Monitoring data

Instana monitors and observes microservices and cloud-native applications by collecting beacons, traces, and one-second metrics:

  • Beacons are small monitoring payloads that are transmitted by a JavaScript agent to the Instana servers, modeling specific events occurring within the lifecycle of a page view of a website; for example, page loading, resource retrieval, and HTTP requests.
  • Traces are detailed records of the requests and transactions that flow through a microservice architecture. They record the sequence of events that occur when a request is processed, including the services that are involved, the duration of each service, and any errors or exceptions that occur. Instana automatically correlates traces across services to provide a complete view of an entire transaction. This allows for easy identification and diagnosis of performance issues.
  • Metrics are numerical values that represent the performance and resource utilization of a microservice or infrastructure component. Metrics are collected by Instana Agents and sent to the Instana backend at regular intervals. Instana Agents collect hundreds of different metrics, including (but not limited to) CPU usage, memory usage, network traffic, and disk I/O.

This information is captured by Instana agents and sensors, which also collect application configurations and events, plus discover application building blocks, including clusters, containers, and services.

IBM Instana agents and sensors

The Instana host agent is a lightweight software component that collects and aggregates data from various sensors before sending the data to the Instana backend. It can be deployed to AWS services, including Amazon Elastic Compute Cloud (Amazon EC2), Amazon Elastic Kubernetes Service (Amazon EKS), AWS Fargate, AWS Lambda, or Red Hat OpenShift Service on AWS (ROSA). A single host agent, one per host, is used to collect data from monitored systems.

Once Instana agents are running, they automatically detect applications and services, such as containers running on Amazon EKS, and processes like Nginx, NodeJS, Spring Boot, Postgres, Elasticsearch, or Cassandra. For each component detected, different Instana sensors are automatically downloaded, installed, and configured to monitor the environment.

Instana sensors are small programs that are designed to attach and monitor one specific technology and pass their data to the agent. They are automatically managed, updated, loaded, and unloaded by the host agent.

These sensors can monitor several different AWS services like Lambda, Amazon DynamoDB, Amazon Simple Storage Service (Amazon S3), Amazon Aurora, Amazon Simple Queue Service, and Amazon Managed Streaming for Apache Kafka. They collect data—like request and error rates, latency, CPU utilization—via AWS APIs and Amazon CloudWatch.

Instana also provides sensors to collect data from applications running on AWS, like IBM MQ, IBM Db2, or Red Hat OpenShift Container Platform. Review IBM’s full list of supported technologies and AWS services.

Instana also provides tracers, which are used with runtimes like Java, .NET, NodeJS, plus others. They modify code execution to capture logs, traces at request level, and send those back to the Instana agent.

With the use of sensors, the host agent collects configuration data and monitors the applications it has detected. The host agent also handles communications with the Instana SaaS backend services. It collects, aggregates and sends logs, traces and records metrics (such as response times, error rates, and resource utilization) every second to the Instana SaaS backend in real-time, using secure and efficient communication protocols.

IBM Instana SaaS

The Instana SaaS backend is the heart of the Instana APM solution and responsible for processing, storing, and analyzing the monitoring data collected from the Instana agents and sensors installed in the customer’s infrastructure.

It consists of several components and services that work together to provide real-time monitoring and analysis of microservices applications, including:

  • Data collectors: Receive and process data from the Instana agents and sensors, and store it in the Instana backend for further analysis.
  • Analytics engine: Analyzes the data collected by the agents and sensors to provide insights into the performance and health of the microservices applications.
  • User interface: Web-based interface that customers use to view and analyze their monitoring data.
  • Alerting engine: Generates alerts when thresholds or anomalies are detected in the monitoring data.
  • Data storage: Time-series database that stores the monitoring data collected by the agents and sensors. Allows customers to query and analyze the data in real-time.
  • Integrations: Integrates with various third-party tools, such as Slack, PagerDuty, and ServiceNow, providing seamless alerting and incident management.

IBM Instana backend: making sense of the situation in real time

The Instana SaaS platform automatically ingests data from agents and continuously updates a dependency map (Figure 2). This map presents every dependency in context, giving users an easy way to understand the interrelationships between application components and services.

This understanding enables users to identify the upstream and downstream impacts of any issue, ensuring that they stay informed about any potential impacts.

An example of an IBM Instana dependency map

Figure 2. An example of an IBM Instana dependency map

Instana traces every request end-to-end without sampling. The traces are analyzed in real-time, providing metrics that make any performance problems immediately visible. In the event of an incident, Instana can illustrate how a single issue can generate a ripple effect and impact a number of directly and indirectly connected services. Using the relationship information from the Dynamic Graph, Instana’s automatic root-cause analysis can precisely aggregate the individual issues into a single incident.

Applications monitoring with IBM Instana

Figure 3. Applications monitoring with IBM Instana

Developers, IT operations, or site reliability engineers (SREs) can access the Instana backend end-user monitoring interface (Figure 3) or end-user monitoring (EUM) interface (Figure 4) to view monitoring data of their workloads. These can be websites, mobile applications, AWS services, and infrastructure levels. From this UI, these personas can access service dashboards that show key performance indicators (KPIs), like response time and error rate.

End-user monitoring with IBM Instana

Figure 4. End-user monitoring with IBM Instana

The following actions demonstrate how an EUM for a JavaScript application, deployed to Amazon S3 can be completed:

  • Developers inject Instana JavaScript code (Figure 5) into the static website (HTML).
  • When a user visits the website, the JavaScript agent sends beacons to the Instana backend.
  • Dashboards show specific events of the website lifecycle, including page loading, JS errors, and HTTP requests.
  • Teams access Instana UI to check performance matrices. They can configure Smart Alerts with custom alerting policies based on specific metrics and KPIs.
  • Smart Alerts can send alerts via various channels, such as email, Slack, or IBM Watson AIOps Webhook.
  • In case of an incident, teams can use Instana to retrieve various performance metrics for root-cause analysis.
  • Developers can resolve the issues and apply the patch.
IBM Instana EUM JavaScript agent

Figure 5. IBM Instana EUM JavaScript agent

Instana also offers Smart Alerts (Figure 6) to provide a more intuitive process of managing alerts. With Smart Alerts, customers can automatically generate alerting configurations using relevant KPIs and automatic threshold detection for use cases like website slowness or website errors.

IBM Instana Smart Alerts

Figure 6. IBM Instana Smart Alerts

Conclusion

In this post, we discussed how IBM Instana provides a comprehensive monitoring solution with the right tools to help you implement a real-time observability and monitoring solution. It allows you to gain insight into your microservices and cloud-native applications, including visibility into AWS services, containers, on-premises infrastructure, and other technologies. Instana can quickly identify and resolve issues before they impact end-users, ensuring that your applications are performing optimally.

As an IT administrator, developer, or business owner, IBM Instana on AWS give a deeper understanding of your applications and help you make data-driven decisions to improve overall performance.

Additional resources

Implementing an event-driven serverless story generation application with ChatGPT and DALL-E

Post Syndicated from David Boyne original https://aws.amazon.com/blogs/compute/implementing-an-event-driven-serverless-story-generation-application-with-chatgpt-and-dall-e/

This post demonstrates how to integrate AWS serverless services with artificial intelligence (AI) technologies, ChatGPT, and DALL-E. This full stack event-driven application showcases a method of generating unique bedtime stories for children by using predetermined characters and scenes as a prompt for ChatGPT.

Every night at bedtime, the serverless scheduler triggers the application, initiating an event-driven workflow to create and store new unique AI-generated stories with AI-generated images and supporting audio.

These datasets are used to showcase the story on a custom website built with Next.js hosted with AWS App Runner. After the story is created, a notification is sent to the user containing a URL to view and read the story to the children.

Example notification of a story hosted with Next.js and App Runner

Example notification of a story hosted with Next.js and App Runner

By integrating AWS services with AI technologies, you can now create new and innovative ideas that were previously unimaginable.

The application mentioned in this blog post demonstrates examples of point-to-point messaging with Amazon EventBridge pipes, publish/subscribe patterns with Amazon EventBridge and reacting to change data capture events with DynamoDB Streams.

Understanding the architecture

The following image shows the serverless architecture used to generate stories:

Architecture diagram for Serverless bed time story generation with ChatGPT and DALL-E

Architecture diagram for Serverless bed time story generation with ChatGPT and DALL-E

A new children’s story is generated every day at configured time using Amazon EventBridge Scheduler (Step 1). EventBridge Scheduler is a service capable of scaling millions of schedules with over 200 targets and over 6000 API calls. This example application uses EventBridge scheduler to trigger an AWS Lambda function every night at the same time (7:15pm). The Lambda function is triggered to start the generation of the story.

EventBridge scheduler triggers Lambda function every day at 7:15pm (bed time)

EventBridge scheduler triggers Lambda function every day at 7:15pm (bed time)

The “Scenes” and “Characters” Amazon DynamoDB tables contain the characters involved in the story and a scene that is randomly selected during its creation. As a result, ChatGPT receives a unique prompt each time. An example of the prompt may look like this:

“`
Write a title and a rhyming story on 2 main characters called Parker and Jackson. The story needs to be set within the scene haunted woods and be at least 200 words long

“`

After the story is created, it is then saved in the “Stories” DynamoDB table (Step 2).

Scheduler triggering Lambda function to generate the story and store story into DynamoDB

Scheduler triggering Lambda function to generate the story and store story into DynamoDB

Once the story is created this initiates a change data capture event using DynamoDB Streams (Step 3). This event flows through point-to-point messaging with EventBridge pipes and directly into EventBridge. Input transforms are then used to convert the DynamoDB Stream event into a custom EventBridge event, which downstream consumers can comprehend. Adopting this pattern is beneficial as it allows us to separate contracts from the DynamoDB event schema and not having downstream consumers conform to this schema structure, this mapping allows us to remain decoupled from implementation details.

EventBridge Pipes connecting DynamoDB streams directly into EventBridge.

EventBridge Pipes connecting DynamoDB streams directly into EventBridge.

Upon triggering the StoryCreated event in EventBridge, three targets are triggered to carry out several processes (Step 4). Firstly, AI Images are processed, followed by the creation of audio for the story. Finally, the end user is notified of the completed story through Amazon SNS and email subscriptions. This fan-out pattern enables these tasks to be run asynchronously and in parallel, allowing for faster processing times.

EventBridge pub/sub pattern used to start async processing of notifications, audio, and images.

EventBridge pub/sub pattern used to start async processing of notifications, audio, and images.

An SNS topic is triggered by the `StoryCreated` event to send an email to the end user using email subscriptions (Step 6). The email consists of a URL with the id of the story that has been created. Clicking on the URL takes the user to the frontend application that is hosted with App Runner.

Using SNS to notify the user of a new story

Using SNS to notify the user of a new story

Example email sent to the user

Example email sent to the user

Amazon Polly is used to generate the audio files for the story (Step 6). Upon triggering the `StoryCreated` event, a Lambda function is triggered, and the story description is used and given to Amazon Polly. Amazon Polly then creates an audio file of the story, which is stored in Amazon S3. A presigned URL is generated and saved in DynamoDB against the created story. This allows the frontend application and browser to retrieve the audio file when the user views the page. The presigned URL has a validity of two days, after which it can no longer be accessed or listened to.

Lambda function to generate audio using Amazon Polly, store in S3 and update story with presigned URL

Lambda function to generate audio using Amazon Polly, store in S3 and update story with presigned URL

The `StoryCreated` event also triggers another Lambda function, which uses the OpenAI API to generate an AI image using DALL-E based on the generated story (Step 7). Once the image is generated, the image is downloaded and stored in Amazon S3. Similar to the audio file, the system generates a presigned URL for the image and saves it in DynamoDB against the story. The presigned URL is only valid for two days, after which it becomes inaccessible for download or viewing.

Lambda function to generate images, store in S3 and update story with presigned URL.

Lambda function to generate images, store in S3 and update story with presigned URL.

In the event of a failure in audio or image generation, the frontend application still loads the story, but does not display the missing image or audio at that moment. This ensures that the frontend can continue working and provide value. If you wanted more control and only trigger the user’s notification event once all parallel tasks are complete the aggregator messaging pattern can be considered.

Hosting the frontend Next.js application with AWS App Runner

Next.js is used by the frontend application to render server-side rendered (SSR) pages that can access the stories from the DynamoDB table, which are then hosted with AWS App Runner after being containerized.

Next.js application hosted with App Runner, with permissions into DynamoDB table.

Next.js application hosted with App Runner, with permissions into DynamoDB table.

AWS App Runner enables you to deploy containerized web applications and APIs securely, without needing any prior knowledge of containers or infrastructure. With App Runner, developers can concentrate on their application, while the service handles container startup, running, scaling, and load balancing. After deployment, App Runner provides a secure URL for clients to begin making HTTP requests against.

With App Runner, you have two primary options for deploying your container: source code connections or source images. Using source code connections grants App Runner permission to pull the image file directly from your source code, and with Automatic deployment configured, it can redeploy the application when changes are made. Alternatively, source images provide App Runner with the image’s location in an image registry, and this image is deployed by App Runner.

In this example application, CDK deploys the application using the DockerImageAsset construct with the App Runner construct. Once deployed, App Runner builds and uploads the frontend image to Amazon Elastic Container Registry (ECR) and deploys it. Downstream consumers can access the application using the secure URL provided by App Runner. In this example, the URL is used when the SNS notification is sent to the user when the story is ready to be viewed.

Giving the frontend container permission to DynamoDB table

To grant the Next.js application permission to obtain stories from the Stories DynamoDB table, App Runner instance roles are configured. These roles are optional and can provide the necessary permissions for the container to access AWS services required by the compute service.

If you want to learn more about AWS App Runner, you can explore the free workshop.

Design choices and assumptions

The DynamoDB Time to Live (TTL) feature is ideal for the short-lived nature of daily generated stories. DynamoDB handle the deletion of stories after two days by setting the TTL attribute on each story. Once a story is deleted, it becomes inaccessible through the generated story URLs.

Using Amazon S3 presigned URLs is a method to grant temporary access to a file in S3. This application creates presigned URLs for the audio file and generated images that last for 2 days, after which the URLs for the S3 items become invalid.

Input transforms are used between DynamoDB streams and EventBridge events to decouple the schemas and events consumed by downstream targets. Consuming the events as they are is known as the “conformist” pattern, and couples us to implementation details of DynamoDB streams with downstream EventBridge consumers. This allows the application to remain decoupled from implementation details and remain flexible.

Conclusion

The adoption of artificial intelligence (AI) technology has significantly increased in various industries. ChatGPT, a large language model that can understand and generate human-like responses in natural language, and DALL-E, an image generation system that can create realistic images based on textual descriptions, are examples of such technology. These systems have demonstrated the potential for AI to provide innovative solutions and transform the way we interact with technology.

This blog post explores ways in which you can utilize AWS serverless services with ChatGTP and DALL-E to create a story generation application fronted by a Next.js application hosted with App Runner. EventBridge Scheduler is used to trigger the story creation process then react to change data capture events with DynamoDB streams and EventBridge Pipes, and use Amazon EventBridge to fan out compute tasks to process notifications, images, and audio files.

You can find the documentation and the source code for this application in GitHub.

For more serverless learning resources, visit Serverless Land.

Managing sessions of anonymous users in WebSocket API-based applications

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/managing-sessions-of-anonymous-users-in-websocket-api-based-applications/

This post is written by Alexey Paramonov, Solutions Architect, ISV.

This blog post demonstrates how to reconnect anonymous users to WebSocket API without losing their session context. You learn how to link WebSocket API connection IDs to the logical user, what to do when the connection fails, and how to store and recover session information when the user reconnects.

WebSocket APIs are common in modern interactive applications. For example, for watching stock prices, following live chat feeds, collaborating with others in productivity tools, or playing online games. Another example described in “Implementing reactive progress tracking for AWS Step Functions” blog post uses WebSocket APIs to send progress back to the client.

The backend is aware of the client’s connection ID to find the right client to send data to. But if the connection temporarily fails, the client reconnects with a new connection ID. If the backend does not have a mechanism to associate a new connection ID with the same client, the interaction context is lost and the user must start over again. In a live chat feed that could mean skipping to the most recent message with a loss of some previous messages. And in case of AWS Step Functions progress tracking the user would lose progress updates.

Overview

The sample uses Amazon API Gateway WebSocket APIs. It uses AWS Lambda for WebSocket connection management and for mocking a teleprinter to generate a stateful stream of characters for testing purposes. There are two Amazon DynamoDB tables for storing connection IDs and user sessions, as well as an optional MediaWiki API for retrieving an article.

The following diagram outlines the architecture:

Reference architecture

  1. The browser generates a random user ID and stores it locally in the session storage. The client sends the user ID inside the Sec-WebSocket-Protocol header to WebSocket API.
  2. The default WebSocket API route OnConnect invokes OnConnect Lambda function and passes the connection ID and the user ID to it. OnConnect Lambda function determines if the user ID exists in the DynamoDB table and either creates a new item or updates the existing one.
  3. When the connection is open, the client sends the Read request to the Teleprinter Lambda function.
  4. The Teleprinter Lambda function downloads an article about WebSocket APIs from Wikipedia and stores it as a string in memory.
  5. The Teleprinter Lambda function checks if there is a previous state stored in the Sessions table in DynamoDB. If it is a new user, the Teleprinter Lambda function starts sending the article from the beginning character by character back to the client via WebSocket API. If it is a returning user, the Teleprinter Lambda function retrieves the last cursor position (the last successfully sent character) from the DynamoDB table and continues from there.
  6. The Teleprinter Lambda function sends 1 character every 100ms back to the client.
  7. The client receives the article and prints it out character by character as each character arrives.
  8. If the connection breaks, the WebSocket API calls the OnDisconnect Lambda function automatically. The function marks the connection ID for the given user ID as inactive.
  9. If the user does not return within 5 minutes, Amazon EventBridge scheduler invokes the OnDelete Lambda function, which deletes items with more than 5 minutes of inactivity from Connections and Sessions tables.

A teleprinter returns data one character at a time instead of a single response. When the Lambda function fetches an article, it feeds it character by character inside the for-loop with a delay of 100ms on every iteration. This demonstrates how the user can continue reading the article after reconnecting and not starting the feed all over again. The pattern could be useful for traffic limiting by slowing down user interactions with the backend.

Understanding sample code functionality

When the user connects to the WebSocket API, the client generates a user ID and saves it in the browser’s session storage. The user ID is a random string. When the client opens a WebSocket connection, the frontend sends the user ID inside the Sec-WebSocket-Protocol header, which is standard for WebSocket APIs. When using the JavaScript WebSocket library, there is no need to specify the header manually. To initialize a new connection, use:

const newWebsocket = new WebSocket(wsUri, userId);

wsUri is the deployed WebSocket API URL and userId is a random string generated in the browser. The userId goes to the Sec-WebSocket-Protocol header because WebSocket APIs generally offer less flexibility in choosing headers compared to RESTful APIs. Another way to pass the user ID to the backend could be a query string parameter.

The backend receives the connection request with the user ID and connection ID. The WebSocket API generates the connection ID automatically. It’s important to include the Sec-WebSocket-Protocol in the backend response, otherwise the connection closes immediately:

    return {
        'statusCode': 200,
        'headers': {
            'Sec-WebSocket-Protocol': userId
        }
    }

The Lambda function stores this information in the DynamoDB Connections table:

    ddb.put_item(
        TableName=table_name,
        Item={
            'userId': {'S': userId},
            'connectionId': {'S': connection_id},
            'domainName': {'S': domain_name},
            'stage': {'S': stage},
            'active': {'S': True}
        }
    )

The active attribute indicates if the connection is up. In the case of inactivity over a specified time limit, the OnDelete Lambda function deletes the item automatically. The Put operation comes handy here because you don’t need to query the DB if the user already exists. If it is a new user, Put creates a new item. If it is a reconnection, Put updates the connectionId and sets active to True again.

The primary key for the Connections table is userId, which helps locate users faster as they reconnect. The application relies on a global secondary index (GSI) for locating connectionId. This is necessary for marking the connection inactive when WebSocket API calls OnDisconnect function automatically.

Now the application has connection management, you can retrieve session data. The Teleprinter Lambda function receives a connection ID from WebSocket API. You can query the GSI of Connections table and find if the user exists:

    def get_user_id(connection_id):
        response = ddb.query(
            TableName=connections_table_name,
            IndexName='connectionId-index',
            KeyConditionExpression='connectionId = :c',
            ExpressionAttributeValues={
                ':c': {'S': connection_id}
            }
        )

        items = response['Items']

        if len(items) == 1:
            return items[0]['userId']['S']

        return None

Another DynamoDB table called Sessions is used to check if the user has an existing session context. If yes, the Lambda function retrieves the cursor position and resumes teletyping. If it is a brand-new user, the Lambda function starts sending characters from the beginning. The function stores a new cursor position if the connection breaks. This makes it easier to detect stale connections and store the current cursor position in the Sessions table.

    Try:
        api_client.post_to_connection(
            ConnectionId=connection_id,
            Data=bytes(ch, 'utf-8')
        )
    except api_client.exceptions.GoneException as e:
        print(f"Found stale connection, persisting state")
        store_cursor_position(user_id, pos)
        return {
            'statusCode': 410
        }

    def store_cursor_position(user_id, pos):
        ddb.put_item(
            TableName=sessions_table_name,
            Item={
                'userId': {'S': user_id},
                'cursorPosition': {'N': str(pos)}
            }
        )

After this, the Teleprinter Lambda function ends.

Serving authenticated users

The same mechanism also works for authenticated users. The main difference is it takes a given ID from a JWT token or other form of authentication and uses it instead of a randomly generated user ID. The backend relies on unambiguous user identification and may require only minor changes for handling authenticated users.

Deleting inactive users

The OnDisconnect function marks user connections as inactive and adds a timestamp to ‘lastSeen’ attribute. If the user never reconnects, the application should purge inactive items from Connections and Sessions tables. Depending on your operational requirements, there are two options.

Option 1: Using EventBridge Scheduler

The sample application uses EventBridge to schedule OnDelete function execution every 5 minutes. The following code shows how AWS SAM adds the scheduler to the OnDelete function:

  OnDeleteSchedulerFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: app.handler
      Runtime: python3.9
      CodeUri: handlers/onDelete/
      MemorySize: 128
      Environment:
        Variables:
          CONNECTIONS_TABLE_NAME: !Ref ConnectionsTable
          SESSIONS_TABLE_NAME: !Ref SessionsTable
      Policies:
      - DynamoDBCrudPolicy:
          TableName: !Ref ConnectionsTable
      - DynamoDBCrudPolicy:
          TableName: !Ref SessionsTable
      Events:
        Schedule:
          Type: ScheduleV2
          Properties:
            ScheduleExpression: rate(5 minute)

The Events section of the function definition is where AWS SAM sets up the scheduled execution. Change values in ScheduleExpression to meet your scheduling requirements.

The OnDelete function relies on the GSI to find inactive user IDs. The following code snippet shows how to query connections with more than 5 minutes of inactivity:

    five_minutes_ago = int((datetime.now() - timedelta(minutes=5)).timestamp())

    stale_connection_items = table_connections.query(
        IndexName='lastSeen-index',
        KeyConditionExpression='active = :hk and lastSeen < :rk',
        ExpressionAttributeValues={
            ':hk': 'False',
            ':rk': five_minutes_ago
        }
    )

After that, the function iterates through the list of user IDs and deletes them from the Connections and Sessions tables:

    # remove inactive connections
    with table_connections.batch_writer() as batch:
        for item in inactive_users:
            batch.delete_item(Key={'userId': item['userId']})

    # remove inactive sessions
    with table_sessions.batch_writer() as batch:
        for item in inactive_users:
            batch.delete_item(Key={'userId': item['userId']})

The sample uses batch_writer() to avoid requests for each user ID.

Option 2: Using DynamoDB TTL

DynamoDB provides a built-in mechanism for expiring items called Time to Live (TTL). This option is easier to implement. With TTL, there’s no need for EventBridge scheduler, OnDelete Lambda function and additional GSI to track time span. To set up TTL, use the ‘lastSeen’ attribute as an object creation time. TTL deletes or archives the item after a specified period of time. When using AWS SAM or AWS CloudFormation templates, add TimeToLiveSpecification to your code.

The TTL typically deletes expired items within 48 hours. If your operational requirements demand faster and more predictable timing, use option 1. For example, if your application aggregates data while the user is offline, use option 1. But if your application stores a static session data, like cursor position used in this sample, option 2 can be an easier alternative.

Deploying the sample

Prerequisites

Ensure you can manage AWS resources from your terminal.

  • AWS CLI and AWS SAM CLI installed.
  • You have an AWS account. If not, visit this page.
  • Your user has enough permissions to manage AWS resources.
  • Git is installed.
  • NPM is installed (only for local frontend deployment).

You can find the source code and README here.

The repository contains both the frontend and the backend code. To deploy the sample, follow this procedure:

  1. Open a terminal and clone the repository:
    git clone https://github.com/aws-samples/websocket-sessions-management.git
  2. Navigate to the root of the repository.
  3. Build and deploy the AWS SAM template:
    sam build && sam deploy --guided
  4. Note the value of WebSocketURL in the output. You need it later.

With the backend deployed, test it using the hosted frontend.

Testing the application

Testing the application

You can watch an animated demo here.

Notice that the app has generated a random user ID on startup. The app shows the user ID above in the header.

  1. Paste the WebSocket URL into the text field. You can find the URL in the console output after you have successfully deployed your AWS SAM template. Alternatively, you can navigate to AWS Management Console (make sure you are in the right Region), select the API you have recently deployed, go to “Stages”, select the deployed stage and copy the “WebSocket URL” value.
  2. Choose Connect. The app opens a WebSocket connection.
  3. Choose Tele-read to start receiving the Wikipedia article character by character. New characters appear in the second half of the screen as they arrive.
  4. Choose Disconnect to close WebSocket connection. Reconnect again and choose Tele-read. Your session resumes from where you stopped.
  5. Choose Reset Identity. The app closes the WebSocket connection and changes the user ID. Choose Connect and Tele-read again and your character feed starts from the beginning.

Cleaning up

To clean up the resources, in the root directory of the repository run:

sam delete

This removes all resources provisioned by the template.yml file.

Conclusion

In this post, you learn how to keep track of user sessions when using WebSockets API and not lose the session context when the user reconnects again. Apply learnings from this example to improve your user experience when using WebSocket APIs for web-frontend and mobile applications, where internet connections may be unstable.

For more serverless learning resources, visit  Serverless Land.

Introducing AWS Lambda Powertools for .NET

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/introducing-aws-lambda-powertools-for-net/

This blog post is written by Amir Khairalomoum, Senior Solutions Architect.

Modern applications are built with modular architectural patterns, serverless operational models, and agile developer processes. They allow you to innovate faster, reduce risk, accelerate time to market, and decrease your total cost of ownership (TCO). A microservices architecture comprises many distributed parts that can introduce complexity to application observability. Modern observability must respond to this complexity, the increased frequency of software deployments, and the short-lived nature of AWS Lambda execution environments.

The Serverless Applications Lens for the AWS Well-Architected Framework focuses on how to design, deploy, and architect your serverless application workloads in the AWS Cloud. AWS Lambda Powertools for .NET translates some of the best practices defined in the serverless lens into a suite of utilities. You can use these in your application to apply structured logging, distributed tracing, and monitoring of metrics.

Following the community’s continued adoption of AWS Lambda Powertools for Python, Java, and TypeScript, AWS Lambda Powertools for .NET is now generally available.

This post shows how to use the new open source Powertools library to implement observability best practices with minimal coding. It walks through getting started, with the provided examples available in the Powertools GitHub repository.

About Powertools

Powertools for .NET is a suite of utilities that helps with implementing observability best practices without needing to write additional custom code. It currently supports Lambda functions written in C#, with support for runtime versions .NET 6 and newer. Powertools provides three core utilities:

  • Tracing provides a simpler way to send traces from functions to AWS X-Ray. It provides visibility into function calls, interactions with other AWS services, or external HTTP requests. You can add attributes to traces to allow filtering based on key information. For example, when using the Tracing attribute, it creates a ColdStart annotation. You can easily group and analyze traces to understand the initialization process.
  • Logging provides a custom logger that outputs structured JSON. It allows you to pass in strings or more complex objects, and takes care of serializing the log output. The logger handles common use cases, such as logging the Lambda event payload, and capturing cold start information. This includes appending custom keys to the logger.
  • Metrics simplifies collecting custom metrics from your application, without the need to make synchronous requests to external systems. This functionality allows capturing metrics asynchronously using Amazon CloudWatch Embedded Metric Format (EMF) which reduces latency and cost. This provides convenient functionality for common cases, such as validating metrics against CloudWatch EMF specification and tracking cold starts.

Getting started

The following steps explain how to use Powertools to implement structured logging, add custom metrics, and enable tracing with AWS X-Ray. The example application consists of an Amazon API Gateway endpoint, a Lambda function, and an Amazon DynamoDB table. It uses the AWS Serverless Application Model (AWS SAM) to manage the deployment.

When you send a GET request to the API Gateway endpoint, the Lambda function is invoked. This function calls a location API to find the IP address, stores it in the DynamoDB table, and returns it with a greeting message to the client.

Example application

Example application

The AWS Lambda Powertools for .NET utilities are available as NuGet packages. Each core utility has a separate NuGet package. It allows you to add only the packages you need. This helps to make the Lambda package size smaller, which can improve the performance.

To implement each of these core utilities in a separate example, use the Globals sections of the AWS SAM template to configure Powertools environment variables and enable active tracing for all Lambda functions and Amazon API Gateway stages.

Sometimes resources that you declare in an AWS SAM template have common configurations. Instead of duplicating this information in every resource, you can declare them once in the Globals section and let your resources inherit them.

Logging

The following steps explain how to implement structured logging in an application. The logging example shows you how to use the logging feature.

To add the Powertools logging library to your project, install the packages from NuGet gallery, from Visual Studio editor, or by using following .NET CLI command:

dotnet add package AWS.Lambda.Powertools.Logging

Use environment variables in the Globals sections of the AWS SAM template to configure the logging library:

  Globals:
    Function:
      Environment:
        Variables:
          POWERTOOLS_SERVICE_NAME: powertools-dotnet-logging-sample
          POWERTOOLS_LOG_LEVEL: Debug
          POWERTOOLS_LOGGER_CASE: SnakeCase

Decorate the Lambda function handler method with the Logging attribute in the code. This enables the utility and allows you to use the Logger functionality to output structured logs by passing messages as a string. For example:

[Logging]
public async Task<APIGatewayProxyResponse> FunctionHandler
         (APIGatewayProxyRequest apigProxyEvent, ILambdaContext context)
{
  ...
  Logger.LogInformation("Getting ip address from external service");
  var location = await GetCallingIp();
  ...
}

Lambda sends the output to Amazon CloudWatch Logs as a JSON-formatted line.

{
  "cold_start": true,
  "xray_trace_id": "1-621b9125-0a3b544c0244dae940ab3405",
  "function_name": "powertools-dotnet-tracing-sampl-HelloWorldFunction-v0F2GJwy5r1V",
  "function_version": "$LATEST",
  "function_memory_size": 256,
  "function_arn": "arn:aws:lambda:eu-west-2:286043031651:function:powertools-dotnet-tracing-sample-HelloWorldFunction-v0F2GJwy5r1V",
  "function_request_id": "3ad9140b-b156-406e-b314-5ac414fecde1",
  "timestamp": "2022-02-27T14:56:39.2737371Z",
  "level": "Information",
  "service": "powertools-dotnet-sample",
  "name": "AWS.Lambda.Powertools.Logging.Logger",
  "message": "Getting ip address from external service"
}

Another common use case, especially when developing new Lambda functions, is to print a log of the event received by the handler. You can achieve this by enabling LogEvent on the Logging attribute. This is disabled by default to prevent potentially leaking sensitive event data into logs.

[Logging(LogEvent = true)]
public async Task<APIGatewayProxyResponse> FunctionHandler
         (APIGatewayProxyRequest apigProxyEvent, ILambdaContext context)
{
  ...
}

With logs available as structured JSON, you can perform searches on this structured data using CloudWatch Logs Insights. To search for all logs that were output during a Lambda cold start, and display the key fields in the output, run following query:

fields coldStart='true'
| fields @timestamp, function_name, function_version, xray_trace_id
| sort @timestamp desc
| limit 20
CloudWatch Logs Insights query for cold starts

CloudWatch Logs Insights query for cold starts

Tracing

Using the Tracing attribute, you can instruct the library to send traces and metadata from the Lambda function invocation to AWS X-Ray using the AWS X-Ray SDK for .NET. The tracing example shows you how to use the tracing feature.

When your application makes calls to AWS services, the SDK tracks downstream calls in subsegments. AWS services that support tracing, and resources that you access within those services, appear as downstream nodes on the service map in the X-Ray console.

You can instrument all of your AWS SDK for .NET clients by calling RegisterXRayForAllServices before you create them.

public class Function
{
  private static IDynamoDBContext _dynamoDbContext;
  public Function()
  {
    AWSSDKHandler.RegisterXRayForAllServices();
    ...
  }
  ...
}

To add the Powertools tracing library to your project, install the packages from NuGet gallery, from Visual Studio editor, or by using following .NET CLI command:

dotnet add package AWS.Lambda.Powertools.Tracing

Use environment variables in the Globals sections of the AWS SAM template to configure the tracing library.

  Globals:
    Function:
      Tracing: Active
      Environment:
        Variables:
          POWERTOOLS_SERVICE_NAME: powertools-dotnet-tracing-sample
          POWERTOOLS_TRACER_CAPTURE_RESPONSE: true
          POWERTOOLS_TRACER_CAPTURE_ERROR: true

Decorate the Lambda function handler method with the Tracing attribute to enable the utility. To provide more granular details for your traces, you can use the same attribute to capture the invocation of other functions outside of the handler. For example:

[Tracing]
public async Task<APIGatewayProxyResponse> FunctionHandler
         (APIGatewayProxyRequest apigProxyEvent, ILambdaContext context)
{
  ...
  var location = await GetCallingIp().ConfigureAwait(false);
  ...
}

[Tracing(SegmentName = "Location service")]
private static async Task<string?> GetCallingIp()
{
  ...
}

Once traffic is flowing, you see a generated service map in the AWS X-Ray console. Decorating the Lambda function handler method, or any other method in the chain with the Tracing attribute, provides an overview of all the traffic flowing through the application.

AWS X-Ray trace service view

AWS X-Ray trace service view

You can also view the individual traces that are generated, along with a waterfall view of the segments and subsegments that comprise your trace. This data can help you pinpoint the root cause of slow operations or errors within your application.

AWS X-Ray waterfall trace view

AWS X-Ray waterfall trace view

You can also filter traces by annotation and create custom service maps with AWS X-Ray Trace groups. In this example, use the filter expression annotation.ColdStart = true to filter traces based on the ColdStart annotation. The Tracing attribute adds these automatically when used within the handler method.

View trace attributes

View trace attributes

Metrics

CloudWatch offers a number of included metrics to help answer general questions about the application’s throughput, error rate, and resource utilization. However, to understand the behavior of the application better, you should also add custom metrics relevant to your workload.

The metrics utility creates custom metrics asynchronously by logging metrics to standard output using the Amazon CloudWatch Embedded Metric Format (EMF).

In the sample application, you want to understand how often your service is calling the location API to identify the IP addresses. The metrics example shows you how to use the metrics feature.

To add the Powertools metrics library to your project, install the packages from the NuGet gallery, from the Visual Studio editor, or by using the following .NET CLI command:

dotnet add package AWS.Lambda.Powertools.Metrics

Use environment variables in the Globals sections of the AWS SAM template to configure the metrics library:

  Globals:
    Function:
      Environment:
        Variables:
          POWERTOOLS_SERVICE_NAME: powertools-dotnet-metrics-sample
          POWERTOOLS_METRICS_NAMESPACE: AWSLambdaPowertools

To create custom metrics, decorate the Lambda function with the Metrics attribute. This ensures that all metrics are properly serialized and flushed to logs when the function finishes its invocation.

You can then emit custom metrics by calling AddMetric or push a single metric with a custom namespace, service and dimensions by calling PushSingleMetric. You can also enable the CaptureColdStart on the attribute to automatically create a cold start metric.

[Metrics(CaptureColdStart = true)]
public async Task<APIGatewayProxyResponse> FunctionHandler
         (APIGatewayProxyRequest apigProxyEvent, ILambdaContext context)
{
  ...
  // Add Metric to capture the amount of time
  Metrics.PushSingleMetric(
        metricName: "CallingIP",
        value: 1,
        unit: MetricUnit.Count,
        service: "lambda-powertools-metrics-example",
        defaultDimensions: new Dictionary<string, string>
        {
            { "Metric Type", "Single" }
        });
  ...
}

Conclusion

CloudWatch and AWS X-Ray offer functionality that provides comprehensive observability for your applications. Lambda Powertools .NET is now available in preview. The library helps implement observability when running Lambda functions based on .NET 6 while reducing the amount of custom code.

It simplifies implementing the observability best practices defined in the Serverless Applications Lens for the AWS Well-Architected Framework for a serverless application and allows you to focus more time on the business logic.

You can find the full documentation and the source code for Powertools in GitHub. We welcome contributions via pull request, and encourage you to create an issue if you have any feedback for the project. Happy building with AWS Lambda Powertools for .NET.

For more serverless learning resources, visit Serverless Land.

Implementing reactive progress tracking for AWS Step Functions

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/implementing-reactive-progress-tracking-for-aws-step-functions/

This blog post is written by Alexey Paramonov, Solutions Architect, ISV and Maximilian Schellhorn, Solutions Architect ISV

This blog post demonstrates a solution based on AWS Step Functions and Amazon API Gateway WebSockets to track execution progress of a long running workflow. The solution updates the frontend regularly and users are able to track the progress and receive detailed status messages.

Websites with long-running processes often don’t provide feedback to users, leading to a poor customer experience. You might have experienced this when booking tickets, searching for hotels, or buying goods online. These sites often call multiple backend and third-party endpoints and aggregate the results to complete your request, causing the delay. In these long running scenarios, a transparent progress tracking solution can create a better user experience.

Overview

The example provided uses:

  • AWS Serverless Application Model (AWS SAM) for deployment: an open-source framework for building serverless applications.
  • AWS Step Functions for orchestrating the workflow.
  • AWS Lambda for mocking long running processes.
  • API Gateway to provide a WebSocket API for bidirectional communications between clients and the backend.
  • Amazon DynamoDB for storing connection IDs from the clients.

The example provides different options to report the progress back to the WebSocket connection by using Step Functions SDK integration, Lambda integrations, or Amazon EventBridge.

The following diagram outlines the example:

  1. The user opens a connection to WebSocket API. The OnConnect and OnDisconnect Lambda functions in the “WebSocket Connection Management” section persist this connection in DynamoDB (see documentation). The connection is bidirectional, meaning that the user can send requests through the open connection and the backend can respond with a new progress status whenever it is available.
  2. The user sends a new order through the WebSocket API. API Gateway routes the request to the “OnOrder” AWS Lambda function, which starts the state machine execution.
  3. As the request propagates through the state machine, we send progress updates back to the user via the WebSocket API using AWS SDK service integrations.
  4. For more customized status responses, we can use a centralized AWS Lambda function “ReportProgress” that updates the WebSocket API.

How to respond to the client?

To send the status updates back to the client via the WebSocket API, three options are explored:

Option 1: AWS SDK integration with API Gateway invocation

As the diagram shows, the API Gateway workflow tasks starting with the prefix “Report:” send responses directly to the client via the WebSocket API. This is an example of the state machine definition for this step:

          'Report: Workflow started':
            Type: Task
            Resource: arn:aws:states:::apigateway:invoke
            ResultPath: $.Params
            Parameters:
              ApiEndpoint: !Join [ '.',[ !Ref ProgressTrackingWebsocket, execute-api, !Ref 'AWS::Region', amazonaws.com ] ]
              Method: POST
              Stage: !Ref ApiStageName
              Path.$: States.Format('/@connections/{}', $.ConnectionId)
              RequestBody:
                Message: 🥁 Workflow started
                Progress: 10
              AuthType: IAM_ROLE
            Next: 'Mock: Inventory check'

This option reports the progress directly without using any additional Lambda functions. This limits the system complexity, reduces latency between the progress update and the response delivered to the client, and potentially reduces costs by reducing Lambda execution duration. A potential drawback is the limited customization of the response and getting familiar with the definition language.

Option 2: Using a Lambda function for reporting the progress status

To further customize response logic, create a Lambda function for reporting. As shown in point 4 of the diagram, you can also invoke a “ReportProgress” function directly from the state machine. This Python code snippet reports the progress status back to the WebSocket API:

apigw_management_api_client = boto3.client('apigatewaymanagementapi', endpoint_url=api_url)
apigw_management_api_client.post_to_connection(
            ConnectionId=connection_id,
            Data=bytes(json.dumps(event), 'utf-8')
        )

This option allows for more customizations and integration into the business logic of other Lambda functions to track progress in more detail. For example, execution of loops and reporting back on every iteration. The tradeoff is that you must handle exceptions and retries in your code. It also increases overall system complexity and additional costs associated with Lambda execution.

Option 3: Using EventBridge

You can combine option 2 with EventBridge to provide a centralized solution for reporting the progress status. The solution also handles retries with back-off if the “ReportProgress” function can’t communicate with the WebSocket API.

You can also use AWS SDK integrations from the state machine to EventBridge instead of using API Gateway. This has the additional benefit of a loosely coupled and resilient system but you could experience increased latency due to the additional services used. The combination of EventBridge and the Lambda function adds a minimal latency, but it might not be acceptable for short-lived workflows. However, if the workflow takes tens of seconds to complete and involves numerous steps, option 3 may be more suitable.

This is the architecture:

  1. As before.
  2. As before.
  3. AWS SDK integration sends the status message to EventBridge.
  4. The message propagates to the “ReportProgress” Lambda function.
  5. The Lambda function sends the processed message through the WebSocket API back to the client.

Deploying the example

Prerequisites

Make sure you can manage AWS resources from your terminal.

  • AWS CLI and AWS SAM CLI installed.
  • You have an AWS account. If not, visit this page.
  • Your user has sufficient permissions to manage AWS resources.
  • Git is installed.
  • NPM is installed (only for local frontend deployment).

To view the source code and documentation, visit the GitHub repo. This contains both the frontend and backend code.

To deploy:

  1. Clone the repository:
    git clone "https://github.com/aws-samples/aws-step-functions-progress-tracking.git"
  2. Navigate to the root of the repository.
  3. Build and deploy the AWS SAM template:
    sam build && sam deploy --guided
  4. Copy the value of WebSocketURL in the output for later.
  5. The backend is now running. To test it, use a hosted frontend.

Alternatively, you can deploy the React-based frontend on your local machine:

  1. Navigate to “progress-tracker-frontend/”:
    cd progress-tracker-frontend
  2. Launch the react app:
    npm start
  3. The command opens the React app in your default browser. If it does not happen automatically, navigate to http://localhost:3000/ manually.

Now the application is ready to test.

Testing the example application

  1. The webpage requests a WebSocket URL – this is the value from the AWS SAM template deployment. Paste it into Enter WebSocket URL in the browser and choose Connect.
  2. On the next page, choose Send Order and watch how the progress changes.

    This sends the new order request to the state machine. As it progresses, you receive status messages back through the WebSocket API.
  3. Optionally, you can inspect the raw messages arriving to the client. Open the Developer tools in your browser and navigate to the Network tab. Filter for WS (stands for WebSocket) and refresh the page. Specify the WebSocket URL, choose Connect and then choose Send Order.

Cleaning up

The services used in this solution are eligible for AWS Free Tier. To clean up the resources, in the root directory of the repository run:

sam delete

This removes all resources provisioned by the template.yml file.

Conclusion

In this post, you learn how to augment your Step Functions workflows with low latency progress tracking via API Gateway WebSockets. Consider adding the progress tracking to your long running workflows to improve the customer experience and provide a reactive look and feel for your application.

Navigate to the GitHub repository and review the implementation to see how your solution could become more user friendly and responsive. Start with examining the template.yml and the state machine’s definition and see how the frontend handles WebSocket communication and message visualization.

For more serverless learning resources, visit  Serverless Land.

Behind the Scenes at AWS – DynamoDB UpdateTable Speedup

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/behind-the-scenes-at-aws-dynamodb-updatetable-speedup/

We often talk about the Pace of Innovation at AWS, and share the results in this blog, in the AWS What’s New page, and in our weekly AWS on Air streams. Today I would like to talk about a slightly different kind of innovation, the kind that happens behind the scenes.

Each AWS customer uses a different mix of services, and uses those services in unique ways. Every service is instrumented and monitored, and the team responsible for designing, building, running, scaling, and evolving the service pays continuous attention to all of the resulting metrics. The metrics provide insights into how the service is being used, how it performs under load, and in many cases highlights areas for optimization in pursuit of higher availability, better performance, and lower costs.

Once an area for improvement has been identified, a plan is put in to place, changes are made and tested in pre-production environments, then deployed to multiple AWS regions. This happens routinely, and (to date) without fanfare. Each part of AWS gets better and better, with no action on your part.

DynamoDB UpdateTable
In late 2021 we announced the Standard-Infrequent Access table class for Amazon DynamoDB. As Marcia noted in her post, using this class can reduce your storage costs by 60% compared to the existing (Standard) class. She also showed you how you could modify a table to use the new class. The modification operation calls the UpdateTable function, and that function is the topic of this post!

As is the case with just about every AWS launch, customers began to make use of the new table class right away. They created new tables and modified existing ones, benefiting from the lower pricing as soon as the modification was complete.

DynamoDB uses a highly distributed storage architecture. Each table is split into multiple partitions; operations such as changing the storage class are done in parallel across the partitions. After looking at a lot of metrics, the DynamoDB team found ways to increase parallelism and to reduce the amount of time spent managing the parallel operations.

This change had a dramatic effect for Amazon DynamoDB tables over 500 GB in size, reducing the time to update the table class by up to 97%.

Each time we make a change like this, we capture the “before” and “after” metrics, and share the results internally so that other teams can learn from the experience while they are in the process of making similar improvements of their own. Even better, each change that we make opens the door to other ones, creating a positive feedback loop that (once again) benefits everyone that uses a particular service or feature.

Every DynamoDB user can take advantage of this increased performance right away without the need for a version upgrade or downtime for maintenance (DynamoDB does not even have maintenance windows).

Incremental performance and operational improvements like this one are done routinely and without much fanfare. However it is always good to hear back from our customers when their own measurements indicate that some part of AWS became better or faster.

Leadership Principles
As I was thinking about this change while getting ready to write this post, several Amazon Leadership Principles came to mind. The DynamoDB team showed Customer Obsession by implementing a change that would benefit any DynamoDB user with tables over 500 GB in size. To do this they had to Invent and Simplify, coming up with a better way to implement the UpdateTable function.

While you, as an AWS customer, get the benefits with no action needed on your part, this does not mean that you have to wait until we decide to pay special attention to your particular use case. If you are pushing any aspect of AWS to the limit (or want to), I recommend that you make contact with the appropriate service team and let them know what’s going on. You might be running into a quota or other limit, or pushing bandwidth, memory, or other resources to extremes. Whatever the case, the team would love to hear from you!

Stay Tuned
I have a long list of other internal improvements that we have made, and will be working with the teams to share more of them throughout the year.

Jeff;

Text analytics on AWS: implementing a data lake architecture with OpenSearch

Post Syndicated from Francisco Losada original https://aws.amazon.com/blogs/architecture/text-analytics-on-aws-implementing-a-data-lake-architecture-with-opensearch/

Text data is a common type of unstructured data found in analytics. It is often stored without a predefined format and can be hard to obtain and process.

For example, web pages contain text data that data analysts collect through web scraping and pre-process using lowercasing, stemming, and lemmatization. After pre-processing, the cleaned text is analyzed by data scientists and analysts to extract relevant insights.

This blog post covers how to effectively handle text data using a data lake architecture on Amazon Web Services (AWS). We explain how data teams can independently extract insights from text documents using OpenSearch as the central search and analytics service. We also discuss how to index and update text data in OpenSearch and evolve the architecture towards automation.

Architecture overview

This architecture outlines the use of AWS services to create an end-to-end text analytics solution, starting from the data collection and ingestion up to the data consumption in OpenSearch (Figure 1).

Data lake architecture with OpenSearch

Figure 1. Data lake architecture with OpenSearch

  1. Collect data from various sources, such as SaaS applications, edge devices, logs, streaming media, and social networks.
  2. Use tools like AWS Database Migration Service (AWS DMS), AWS DataSync, Amazon Kinesis, Amazon Managed Streaming for Apache Kafka (Amazon MSK), AWS IoT Core, and Amazon AppFlow to ingest the data into the AWS data lake, depending on the data source type.
  3. Store the ingested data in the raw zone of the Amazon Simple Storage Service (Amazon S3) data lake—a temporary area where data is kept in its original form.
  4. Validate, clean, normalize, transform, and enrich the data through a series of pre-processing steps using AWS Glue or Amazon EMR.
  5. Place the data that is ready to be indexed in the indexing zone.
  6. Use AWS Lambda to index the documents into OpenSearch and store them back in the data lake with a unique identifier.
  7. Use the clean zone as the source of truth for teams to consume the data and calculate additional metrics.
  8. Develop, train, and generate new metrics using machine learning (ML) models with Amazon SageMaker or artificial intelligence (AI) services like Amazon Comprehend.
  9. Store the new metrics in the enriching zone along with the identifier of the OpenSearch document.
  10. Use the identifier column from the initial indexing phase to identify the correct documents and update them in OpenSearch with the newly calculated metrics using AWS Lambda.
  11. Use OpenSearch to search through the documents and visualize them with metrics using OpenSearch Dashboards.

Considerations

Data lake orchestration among teams

This architecture allows data teams to work independently on text documents at different stages of their lifecycles. The data engineering team manages the raw and indexing zones, who also handle data ingestion and preprocessing for indexing in OpenSearch.

The cleaned data is stored in the clean zone, where data analysts and data scientists generate insights and calculate new metrics. These metrics are stored in the enrich zone and indexed as new fields in the OpenSearch documents by the data engineering team (Figure 2).

Data lake orchestration among teams

Figure 2. Data lake orchestration among teams

Let’s explore an example. Consider a company that periodically retrieves blog site comments and performs sentiment analysis using Amazon Comprehend. In this case:

  1. The comments are ingested into the raw zone of the data lake.
  2. The data engineering team processes the comments and stores them in the indexing zone.
  3. A Lambda function indexes the comments into OpenSearch, enriches the comments with the OpenSearch document ID, and saves it in the clean zone.
  4. The data science team consumes the comments and performs sentiment analysis using Amazon Comprehend.
  5. The sentiment analysis metrics are stored in the metrics zone of the data lake. A second Lambda function updates the comments in OpenSearch with the new metrics.

If the raw data does not require any preprocessing steps, the indexing and clean zones can be combined. You can explore this specific example, along with code implementation, in the AWS samples repository.

Schema evolution

As your data progresses through data lake stages, the schema changes and gets enriched accordingly. Continuing with our previous example, Figure 3 explains how the schema evolves.

Schema evolution through the data lake stages

Figure 3. Schema evolution through the data lake stages

  1. In the raw zone, there is a raw text field received directly from the ingestion phase. It’s best practice to keep a raw version of the data as a backup, or in case the processing steps need to be repeated later.
  2. In the indexing zone, the clean text field replaces the raw text field after being processed.
  3. In the clean zone, we add a new ID field that is generated during indexing and identifies the OpenSearch document of the text field.
  4. In the enrich zone, the ID field is required. Other fields with metric names are optional and represent new metrics calculated by other teams that will be added to OpenSearch.

Consumption layer with OpenSearch

In OpenSearch, data is organized into indices, which can be thought of as tables in a relational database. Each index consists of documents—similar to table rows—and multiple fields, similar to table columns. You can add documents to an index by indexing and updating them using various client APIs for popular programming languages.

Now, let’s explore how our architecture integrates with OpenSearch in the indexing and updating stage.

Indexing and updating documents using Python

The index document API operation allows you to index a document with a custom ID, or assigns one if none is provided. To speed up indexing, we can use the bulk index API to index multiple documents in one call.

We need to store the IDs back from the index operation to later identify the documents we’ll update with new metrics. Let’s explore two ways of doing this:

  • Use the requests library to call the REST Bulk Index API (preferred): the response returns the auto-generated IDs we need.
  • Use the Python Low-Level Client for OpenSearch: The IDs are not returned and need to be pre-assigned to later store them. We can use an atomic counter in Amazon DynamoDB to do so. This allows multiple Lambda functions to index documents in parallel without ID collisions.

As in Figure 4, the Lambda function:

  1. Increases the atomic counter by the number of documents that will index into OpenSearch.
  2. Gets the value of the counter back from the API call.
  3. Indexes the documents using the range that goes between [current counter value, current counter value – number of documents].
Storing the IDs back from the bulk index operation using the Python Low-Level Client for OpenSearch

Figure 4. Storing the IDs back from the bulk index operation using the Python Low-Level Client for OpenSearch

Data flow automation

As architectures evolve towards automation, the data flow between data lake stages becomes event-driven. Following our previous example, we can automate the processing steps of the data when moving from the raw to the indexing zone (Figure 5).

Event-driven automation for data flow

Figure 5. Event-driven automation for data flow

With Amazon EventBridge and AWS Step Functions, we can automatically trigger our pre-processing AWS Glue jobs so our data gets pre-processed without manual intervention.

The same approach can be applied to the other data lake stages to achieve a fully automated architecture. Explore this implementation for an automated language use case.

Conclusion

In this blog post, we covered designing an architecture to effectively handle text data using a data lake on AWS. We explained how different data teams can work independently to extract insights from text documents at different lifecycle stages using OpenSearch as the search and analytics service.

Genomics workflows, Part 4: processing archival data

Post Syndicated from Rostislav Markov original https://aws.amazon.com/blogs/architecture/genomics-workflows-part-4-processing-archival-data/

Genomics workflows analyze data at petabyte scale. After processing is complete, data is often archived in cold storage classes. In some cases, like studies on the association of DNA variants against larger datasets, archived data is needed for further processing. This means manually initiating the restoration of each archived object and monitoring the progress. Scientists require a reliable process for on-demand archival data restoration so their workflows do not fail.

In Part 4 of this series, we look into genomics workloads processing data that is archived with Amazon Simple Storage Service (Amazon S3). We design a reliable data restoration process that informs the workflow when data is available so it can proceed. We build on top of the design pattern laid out in Parts 1-3 of this series. We use event-driven and serverless principles to provide the most cost-effective solution.

Use case

Our use case focuses on data in Amazon Simple Storage Service Glacier (Amazon S3 Glacier) storage classes. The S3 Glacier Instant Retrieval storage class provides the lowest-cost storage for long-lived data that is rarely accessed but requires retrieval in milliseconds.

The S3 Glacier Flexible Retrieval and S3 Glacier Deep Archive provide further cost savings, with retrieval times ranging from minutes to hours. We focus on the latter in order to provide the most cost-effective solution.

You must first restore the objects before accessing them. Our genomics workflow will pause until the data restore completes. The requirements for this workflow are:

  • Reliable launch of the restore so our workflow doesn’t fail (due to S3 Glacier service quotas, or because not all objects were restored)
  • Event-driven design to mirror the event-driven nature of genomics workflows and perform the retrieval upon request
  • Cost-effective and easy-to-manage by using serverless services
  • Upfront detection of archived data when formulating the genomics workflow task, avoiding idle computational tasks that incur cost
  • Scalable and elastic to meet the restore needs of large, archived datasets

Solution overview

Genomics workflows take multiple input parameters to prepare the initiation, such as launch ID, data path, workflow endpoint, and workflow steps. We store this data, including workflow configurations, in an S3 bucket. An AWS Fargate task reads from the S3 bucket and prepares the workflow. It detects if the input parameters include S3 Glacier URLs.

We use Amazon Simple Queue Service (Amazon SQS) to decouple S3 Glacier index creation from object restore actions (Figure 1). This increases the reliability of our process.

Solution architecture for S3 Glacier object restore

Figure 1. Solution architecture for S3 Glacier object restore

An AWS Lambda function creates the index of all objects in the specified S3 bucket URLs and submits them as an SQS message.

Another Lambda function polls the SQS queue and submits the request(s) to restore the S3 Glacier objects to S3 Standard storage class.

The function writes the job ID of each S3 Glacier restore request to Amazon DynamoDB. After the restore is complete, Lambda sets the status of the workflow to READY. Only then can any computing jobs start, such as with AWS Batch.

Implementation considerations

We consider the use case of Snakemake with Tibanna, which we detailed in Part 2 of this series. This allows us to dive deeper on launch details.

Snakemake is an open-source utility for whole-genome-sequence mapping in directed acyclic graph format. Snakemake uses Snakefiles to declare workflow steps and commands. Tibanna is an open-source, AWS-native software that runs bioinformatics data pipelines. It supports Snakefile syntax, plus other workflow languages, including Common Workflow Language and Workflow Description Language (WDL).

We recommend using Amazon Genomics CLI if Tibanna is not needed for your use case, or Amazon Omics if your workflow definitions are compliant with the supported WDL and Nextflow specifications.

Formulate the restore request

The Snakemake Fargate launch container detects if the S3 objects under the requested S3 bucket URLs are stored in S3 Glacier. The Fargate launch container generates and puts a JSON binary base call (BCL) configuration file into an S3 bucket and exits successfully. This file includes the launch ID of the workflow, corresponding with the DynamoDB item key, plus the S3 URLs to restore.

Query the S3 URLs

Once the JSON BCL configuration file lands in this S3 bucket, the S3 Event Notification PutObject event invokes a Lambda function. This function parses the configuration file and recursively queries for all S3 object URLs to restore.

Initiate the restore

The main Lambda function then submits messages to the SQS queue that contains the full list of S3 URLs that need to be restored. SQS messages also include the launch ID of the workflow. This is to ensure we can bind specific restoration jobs to specific workflow launches. If all S3 Glacier objects belong to Flexible Retrieval storage class, the Lambda function puts the URLs in a single SQS message, enabling restoration with Bulk Glacier Job Tier. The Lambda function also sets the status of the workflow to WAITING in the corresponding DynamoDB item. The WAITING state is used to notify the end user that the job is waiting on the data-restoration process and will continue once the data restoration is complete.

A secondary Lambda function polls for new messages landing in the SQS queue. This Lambda function submits the restoration request(s)—for example, as a free-of-charge Bulk retrieval—using the RestoreObject API. The function subsequently writes the S3 Glacier Job ID of each request in our DynamoDB table. This allows the main Lambda function to check if all Job IDs associated with a workflow launch ID are complete.

Update status

The status of our workflow launch will remain WAITING as long as the Glacier object restore is incomplete. The AWS CloudTrail logs of completed S3 Glacier Job IDs invoke our main Lambda function (via an Amazon EventBridge rule) to update the status of the restoration job in our DynamoDB table. With each invocation, the function checks if all Job IDs associated with a workflow launch ID are complete.

After all objects have been restored, the function updates the workflow launch with status READY. This launches the workflow with the same launch ID prior to the restore.

Conclusion

In this blog post, we demonstrated how life-science research teams can make use of their archival data for genomic studies. We designed an event-driven S3 Glacier restore process, which retrieves data upon request. We discussed how to reliably launch the restore so our workflow doesn’t fail. Also, we determined upfront if an S3 Glacier restore is needed and used the WAITING state to prevent our workflow from failing.

With this solution, life-science research teams can save money using Amazon S3 Glacier without worrying about their day-to-day work or manually administering S3 Glacier object restores.

Related information