Tag Archives: Kinesis Data Streams

Amazon Personalize customer outreach on your ecommerce platform

Post Syndicated from Sridhar Chevendra original https://aws.amazon.com/blogs/architecture/amazon-personalize-customer-outreach-on-your-ecommerce-platform/

In the past, brick-and-mortar retailers leveraged native marketing and advertisement channels to engage with consumers. They have promoted their products and services through TV commercials, and magazine and newspaper ads. Many of them have started using social media and digital advertisements. Although marketing approaches are beginning to modernize and expand to digital channels, businesses still depend on expensive marketing agencies and inefficient manual processes to measure campaign effectiveness and understand buyer behavior. The recent pandemic has forced many retailers to take their businesses online. Those who are ready to embrace these changes have embarked on a technological and digital transformation to connect to their customers. As a result, they have begun to see greater business success compared to their peers.

Digitizing a business can be a daunting task, due to lack of expertise and high infrastructure costs. By using Amazon Web Services (AWS), retailers are able to quickly deploy their products and services online with minimal overhead. They don’t have to manage their own infrastructure. With AWS, retailers have no upfront costs, have minimal operational overhead, and have access to enterprise-level capabilities that scale elastically, based on their customers’ demands. Retailers can gain a greater understanding of customers’ shopping behaviors and personal preferences. Then, they are able to conduct effective marketing and advertisement campaigns, and develop and measure customer outreach. This results in increased satisfaction, higher retention, and greater customer loyalty. With AWS you can manage your supply chain and directly influence your bottom line.

Building a personalized shopping experience

Let’s dive into the components involved in building this experience. The first step in a retailer’s digital transformation journey is to create an ecommerce platform for their customers. This platform enables the organization to capture their customers’ actions, also referred to as ‘events’. Some examples of events are clicking on the shopping site to browse product categories, searching for a particular product, adding an item to the shopping cart, and purchasing a product. Each of these events gives the organization information about their customer’s intent, which is invaluable in creating a personalized experience for that customer. For instance, if a customer is browsing the “baby products” category, it indicates their interest in that category even if a purchase is not made. These insights are typically difficult to capture in an in-store experience. Online shopping makes gaining this knowledge much more straightforward and scalable.

The proposed solution outlines the use of AWS services to create a digital experience for a retailer and consumers. The three key areas are: 1) capturing customer interactions, 2) making real-time recommendations using AWS managed Artificial Intelligence/Machine Learning (AI/ML) services, and 3) creating an analytics platform to detect patterns and adjust customer outreach campaigns. Figure 1 illustrates the solution architecture.

Digital shopping experience architecture

Figure 1. Digital shopping experience architecture

For this use case, let’s assume that you are the owner of a local pizzeria, and you manage deliveries through an ecommerce platform like Shopify or WooCommerce. We will walk you through how to best serve your customer with a personalized experience based on their preferences.

The proposed solution consists of the following components:

  1. Data collection
  2. Promotion campaigns
  3. Recommendation engine
  4. Data analytics
  5. Customer reachability

Let’s explore each of these components separately.

Data collection with Amazon Kinesis Data Streams

When a customer uses your web/mobile application to order a pizza, the application captures their activity as click-stream ‘events’. These events provide valuable insights about your customers’ behavior. You can use these insights to understand the trends and browsing pattern of prospects who visited your web/mobile app, and use the data collected for creating promotion campaigns. As your business scales, you’ll need a durable system to preserve these events against system failures, and scale based on unpredictable traffic on your platform.

Amazon Kinesis is a Multi-AZ, managed streaming service that provides resiliency, scalability, and durability to capture an unlimited number of events without any additional operational overhead. Using Kinesis producers (Kinesis Agent, Kinesis Producer Library, and the Kinesis API), you can configure applications to capture your customer activity. You can ingest these events from the frontend, and then publish them to Amazon Kinesis Data Streams.

Let us start by setting up Amazon Kinesis Data Streams to capture the real-time sales transactions from the online channels like a portal or mobile app. For this blog post, we have used the Kaggle’s public data set as a reference. Figure 2 illustrates a snapshot of sample data to build personalized recommendations for a customer.

Sample sales transaction data

Figure 2. Sample sales transaction data

Promotion campaigns with AWS Lambda

One way to increase customer conversion is by offering discounts. When the customer adds a pizza to their cart, you want to make sure they are receiving the best deal. Let’s assume that by adding an additional item, your customer will receive the best possible discount. Just by knowing the total cost of added items to the cart, you can provide these relevant promotions to this customer.

For this scenario, the AWS Lambda service polls the Amazon Kinesis Data Streams to read all the events in the stream. It then matches the events based on your criteria of items in the cart. In turn, these events will be processed by the Lambda function. The Lambda function will read your up-to-date promotions stored in Amazon DynamoDB. As an option, caching recent or most popular promotions will improve your application response time, as well as improve the customer experience on your platform. Amazon DynamoDB DAX is an integrated caching for DynamoDB that caches the most recent or popular promotions or items.

For example, when the customer added the items to their shopping cart, Lambda will send promotion details to them based on the purchase amount. This can be for free shipping or discount of a certain percentage. Figure 3 illustrates the snapshot of sample promotions.

Promotions table in DynamoDB

Figure 3. Promotions table in DynamoDB

Recommendations engine with Amazon Personalize

In addition to sharing these promotions with your customer, you may also want to share the recommended add-ons. In order to understand your customer preferences, you must gather historical datasets to determine patterns and generate relevant recommendations. Since web activity consists of millions of events, this would be a daunting task for humans to review, determine the patterns, and make recommendations. And since user preferences change, you need a system that can use all this volume of data and provide accurate predictions.

Amazon Personalize is a managed AI/ML service that will help you to train an ML model based on datasets. It provides an inference point for real-time recommendations prior to having ML experience. Based on the datasets, Amazon Personalize also provides recipes to generate recommendations. As your customers interact on the ecommerce platform, your frontend application calls Amazon Personalize inference endpoints. It then retrieves a set of personalized recommendations based on your customer preferences.

Here is the sample Python code to display the list of available recommenders, and associated recommendations.

import boto3
import json
client = boto3.client('personalize')

# Connect to the personalize runtime for the customer recommendations

recomm_endpoint = boto3.client('personalize-runtime')
response = recomm_endpoint.get_recommendations(itemId='79323P',
  recommenderArn='arn:aws:personalize:us-east-1::recommender/my-items',
  numResults=5)

print(json.dumps(response['itemList'], indent=2))

[
  {
    "itemId": "79323W"
  },
  {
    "itemId": "79323GR"
  },
  {
    "itemId": "79323LP"
  },
  {
  "itemId": "79323B"
  },
  {
    "itemId": "79323G"
  }
]

You can use Amazon Kinesis Data Firehose to read the data near real time from the Amazon Kinesis Data Streams collected the data from the front-end applications. Then you can store this data in Amazon Simple Storage Service (S3). Amazon S3 is peta-byte scale storage help you scale and acts as a repository and single source of truth. We use S3 data as seed data to build a personalized recommendation engine using Amazon Personalize. As your customers interact on the ecommerce platform, call the Amazon Personalize inference endpoint to make personalized recommendations based on user preferences.

Customer reachability with Amazon Pinpoint

If a customer adds products to their cart but never checks out, you may want to send them a reminder. You can set up an email to suggest they re-order after a period of time after their first order. Or you may want to send them promotions based on their preferences. And as your customers’ behavior changes, you probably want to adapt your messaging accordingly.

Your customer may have a communication preference, such as phone, email, SMS, or in-app notifications. If an order has an issue, you can inform the customer as soon as possible using their preferred method of communication, and perhaps follow it up with a discount.

Amazon Pinpoint is a flexible and scalable outbound and inbound marketing communications service. You can add users to Audience Segments, create reusable content templates integrated with Amazon Personalize, and run scheduled campaigns. With Amazon Pinpoint journeys, you can send action or time-based notifications to your users.

The following workflow shown in Figure 4, illustrates customer communication workflow for promotion. A journey is created for a cohort of college students: a “Free Drink” promotion is offered with a new order. You can send this promotion over email. If the student opens the email, you can immediately send them a push notification reminding them to place an order. But if they didn’t open this email, you could wait three days, and follow up with a text message.

Promotion workflow in Amazon Pinpoint

Figure 4. Promotion workflow in Amazon Pinpoint

Data analytics with Amazon Athena and Amazon QuickSight

To understand the effectiveness of your campaigns, you can use S3 data as a source for Amazon Athena. Athena is an interactive query service that analyzes data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

There are different ways to create visualizations in Amazon QuickSight. For instance, you can use Amazon S3 as a data lake. One option is to import your data into SPICE (Super-fast, Parallel, In-memory Calculation Engine) to provide high performance and concurrency. You can also create a direct connection to the underlying data source. For this use case, we choose to import to SPICE, which provides faster visualization in a production setup. Schedule consistent refreshes to help ensure that dashboards are referring to the most current data.

Once your data is imported to your SPICE, review QuickSight’s visualization dashboard. Here, you’ll be able to choose from a wide variety of charts and tables, while adding interactive features like drill downs and filters.

The process following illustrates how to create a customer outreach strategy using ZIP codes, and allocate budgets to the marketing campaigns accordingly. First, we use this sample SQL command that we ran in Athena to query for top 10 pizza providers. The results are shown in Figure 5.

SELECT name, count(*) as total_count FROM "analyticsdemodb"."fooddatauswest2"
group by name
order by total_count desc
limit 10

Athena query results for top 10 pizza providers

Figure 5. Athena query results for top 10 pizza providers

Second, here is the sample SQL command that we ran in Athena to find Total pizza counts by postal code (ZIP code). Figure 6 shows a visualization to help create customer outreach strategy per ZIP codes and budget the marketing campaigns accordingly.

SELECT postalcode, count(*) as total_count FROM "analyticsdemodb"."fooddatauswest2"
where postalcode is not null
group by postalcode
order by total_count desc limit 50;

QuickSight visualization showing pizza orders by zip codes

Figure 6. QuickSight visualization showing pizza orders by zip codes

Conclusion

AWS enables you to build an ecommerce platform and scale your existing business with minimal operational overhead and no upfront costs. You can augment your ecommerce platform by building personalized recommendations and effective marketing campaigns based on your customer needs. The solution approach provided in the blog will help organizations build re-usable architecture pattern and personalization using AWS managed services.

How NerdWallet uses AWS and Apache Hudi to build a serverless, real-time analytics platform

Post Syndicated from Kevin Chun original https://aws.amazon.com/blogs/big-data/how-nerdwallet-uses-aws-and-apache-hudi-to-build-a-serverless-real-time-analytics-platform/

This is a guest post by Kevin Chun, Staff Software Engineer in Core Engineering at NerdWallet.

NerdWallet’s mission is to provide clarity for all of life’s financial decisions. This covers a diverse set of topics: from choosing the right credit card, to managing your spending, to finding the best personal loan, to refinancing your mortgage. As a result, NerdWallet offers powerful capabilities that span across numerous domains, such as credit monitoring and alerting, dashboards for tracking net worth and cash flow, machine learning (ML)-driven recommendations, and many more for millions of users.

To build a cohesive and performant experience for our users, we need to be able to use large volumes of varying user data sourced by multiple independent teams. This requires a strong data culture along with a set of data infrastructure and self-serve tooling that enables creativity and collaboration.

In this post, we outline a use case that demonstrates how NerdWallet is scaling its data ecosystem by building a serverless pipeline that enables streaming data from across the company. We iterated on two different architectures. We explain the challenges we ran into with the initial design and the benefits we achieved by using Apache Hudi and additional AWS services in the second design.

Problem statement

NerdWallet captures a sizable amount of spending data. This data is used to build helpful dashboards and actionable insights for users. The data is stored in an Amazon Aurora cluster. Even though the Aurora cluster works well as an Online Transaction Processing (OLTP) engine, it’s not suitable for large, complex Online Analytical Processing (OLAP) queries. As a result, we can’t expose direct database access to analysts and data engineers. The data owners have to solve requests with new data derivations on read replicas. As the data volume and the diversity of data consumers and requests grow, this process gets more difficult to maintain. In addition, data scientists mostly require data files access from an object store like Amazon Simple Storage Service (Amazon S3).

We decided to explore alternatives where all consumers can independently fulfill their own data requests safely and scalably using open-standard tooling and protocols. Drawing inspiration from the data mesh paradigm, we designed a data lake based on Amazon S3 that decouples data producers from consumers while providing a self-serve, security-compliant, and scalable set of tooling that is easy to provision.

Initial design

The following diagram illustrates the architecture of the initial design.

The design included the following key components:

  1. We chose AWS Data Migration Service (AWS DMS) because it’s a managed service that facilitates the movement of data from various data stores such as relational and NoSQL databases into Amazon S3. AWS DMS allows one-time migration and ongoing replication with change data capture (CDC) to keep the source and target data stores in sync.
  2. We chose Amazon S3 as the foundation for our data lake because of its scalability, durability, and flexibility. You can seamlessly increase storage from gigabytes to petabytes, paying only for what you use. It’s designed to provide 11 9s of durability. It supports structured, semi-structured, and unstructured data, and has native integration with a broad portfolio of AWS services.
  3. AWS Glue is a fully managed data integration service. AWS Glue makes it easier to categorize, clean, transform, and reliably transfer data between different data stores.
  4. Amazon Athena is a serverless interactive query engine that makes it easy to analyze data directly in Amazon S3 using standard SQL. Athena scales automatically—running queries in parallel—so results are fast, even with large datasets, high concurrency, and complex queries.

This architecture works fine with small testing datasets. However, the team quickly ran into complications with the production datasets at scale.

Challenges

The team encountered the following challenges:

  • Long batch processing time and complexed transformation logic – A single run of the Spark batch job took 2–3 hours to complete, and we ended up getting a fairly large AWS bill when testing against billions of records. The core problem was that we had to reconstruct the latest state and rewrite the entire set of records per partition for every job run, even if the incremental changes were a single record of the partition. When we scaled that to thousands of unique transactions per second, we quickly saw the degradation in transformation performance.
  • Increased complexity with a large number of clients – This workload contained millions of clients, and one common query pattern was to filter by single client ID. There were numerous optimizations that we were forced to tack on, such as predicate pushdowns, tuning the Parquet file size, using a bucketed partition scheme, and more. As more data owners adopted this architecture, we would have to customize each of these optimizations for their data models and consumer query patterns.
  • Limited extendibility for real-time use cases – This batch extract, transform, and load (ETL) architecture wasn’t going to scale to handle hourly updates of thousands of records upserts per second. In addition, it would be challenging for the data platform team to keep up with the diverse real-time analytical needs. Incremental queries, time-travel queries, improved latency, and so on would require heavy investment over a long period of time. Improving on this issue would open up possibilities like near-real-time ML inference and event-based alerting.

With all these limitations of the initial design, we decided to go all-in on a real incremental processing framework.

Solution

The following diagram illustrates our updated design. To support real-time use cases, we added Amazon Kinesis Data Streams, AWS Lambda, Amazon Kinesis Data Firehose and Amazon Simple Notification Service (Amazon SNS) into the architecture.

The updated components are as follows:

  1. Amazon Kinesis Data Streams is a serverless streaming data service that makes it easy to capture, process, and store data streams. We set up a Kinesis data stream as a target for AWS DMS. The data stream collects the CDC logs.
  2. We use a Lambda function to transform the CDC records. We apply schema validation and data enrichment at the record level in the Lambda function. The transformed results are published to a second Kinesis data stream for the data lake consumption and an Amazon SNS topic so that changes can be fanned out to various downstream systems.
  3. Downstream systems can subscribe to the Amazon SNS topic and take real-time actions (within seconds) based on the CDC logs. This can support use cases like anomaly detection and event-based alerting.
  4. To solve the problem of long batch processing time, we use Apache Hudi file format to store the data and perform streaming ETL using AWS Glue streaming jobs. Apache Hudi is an open-source transactional data lake framework that greatly simplifies incremental data processing and data pipeline development. Hudi allows you to build streaming data lakes with incremental data pipelines, with support for transactions, record-level updates, and deletes on data stored in data lakes. Hudi integrates well with various AWS analytics services such as AWS Glue, Amazon EMR, and Athena, which makes it a straightforward extension of our previous architecture. While Apache Hudi solves the record-level update and delete challenges, AWS Glue streaming jobs convert the long-running batch transformations into low-latency micro-batch transformations. We use the AWS Glue Connector for Apache Hudi to import the Apache Hudi dependencies in the AWS Glue streaming job and write transformed data to Amazon S3 continuously. Hudi does all the heavy lifting of record-level upserts, while we simply configure the writer and transform the data into Hudi Copy-on-Write table type. With Hudi on AWS Glue streaming jobs, we reduce the data freshness latency for our core datasets from hours to under 15 minutes.
  5. To solve the partition challenges for high cardinality UUIDs, we use the bucketing technique. Bucketing groups data based on specific columns together within a single partition. These columns are known as bucket keys. When you group related data together into a single bucket (a file within a partition), you significantly reduce the amount of data scanned by Athena, thereby improving query performance and reducing cost. Our existing queries are filtered on the user ID already, so we significantly improve the performance of our Athena usage without having to rewrite queries by using bucketed user IDs as the partition scheme. For example, the following code shows total spending per user in specific categories:
    SELECT ID, SUM(AMOUNT) SPENDING
    FROM "{{DATABASE}}"."{{TABLE}}"
    WHERE CATEGORY IN (
    'ENTERTAINMENT',
    'SOME_OTHER_CATEGORY')
    AND ID_BUCKET ='{{ID_BUCKET}}'
    GROUP BY ID;

  1. Our data scientist team can access the dataset and perform ML model training using Amazon SageMaker.
  2. We maintain a copy of the raw CDC logs in Amazon S3 via Amazon Kinesis Data Firehose.

Conclusion

In the end, we landed on a serverless stream processing architecture that can scale to thousands of writes per second within minutes of freshness on our data lakes. We’ve rolled out to our first high-volume team! At our current scale, the Hudi job is processing roughly 1.75 MiB per second per AWS Glue worker, which can automatically scale up and down (thanks to AWS Glue auto scaling). We’ve also observed an outstanding improvement of end-to-end freshness at less than 5 minutes due to Hudi’s incremental upserts vs. our first attempt.

With Hudi on Amazon S3, we’ve built a high-leverage foundation to personalize our users’ experiences. Teams that own data can now share their data across the organization with reliability and performance characteristics built into a cookie-cutter solution. This enables our data consumers to build more sophisticated signals to provide clarity for all of life’s financial decisions.

We hope that this post will inspire your organization to build a real-time analytics platform using serverless technologies to accelerate your business goals.


About the authors

Kevin Chun is a Staff Software Engineer in Core Engineering at NerdWallet. He builds data infrastructure and tooling to help NerdWallet provide clarity for all of life’s financial decisions.

Dylan Qu is a Specialist Solutions Architect focused on big data and analytics with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Best practices to optimize cost and performance for AWS Glue streaming ETL jobs

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/best-practices-to-optimize-cost-and-performance-for-aws-glue-streaming-etl-jobs/

AWS Glue streaming extract, transform, and load (ETL) jobs allow you to process and enrich vast amounts of incoming data from systems such as Amazon Kinesis Data Streams, Amazon Managed Streaming for Apache Kafka (Amazon MSK), or any other Apache Kafka cluster. It uses the Spark Structured Streaming framework to perform data processing in near-real time.

This post covers use cases where data needs to be efficiently processed, delivered, and possibly actioned in a limited amount of time. This can cover a wide range of cases, such as log processing and alarming, continuous data ingestion and enrichment, data validation, internet of things, machine learning (ML), and more.

We discuss the following topics:

  • Development tools that help you code faster using our newly launched AWS Glue Studio notebooks
  • How to monitor and tune your streaming jobs
  • Best practices for sizing and scaling your AWS Glue cluster, using our newly launched features like auto scaling and the small worker type G 0.25X

Development tools

AWS Glue Studio notebooks can speed up the development of your streaming job by allowing data engineers to work using an interactive notebook and test code changes to get quick feedback—from business logic coding to testing configuration changes—as part of tuning.

Before you run any code in the notebook (which would start the session), you need to set some important configurations.

The magic %streaming creates the session cluster using the same runtime as AWS Glue streaming jobs. This way, you interactively develop and test your code using the same runtime that you use later in the production job.

Additionally, configure Spark UI logs, which will be very useful for monitoring and tuning the job.

See the following configuration:

%streaming
%%configure
{
"--enable-spark-ui": "true",
"--spark-event-logs-path": "s3://your_bucket/sparkui/"
}

For additional configuration options such as version or number of workers, refer to Configuring AWS Glue interactive sessions for Jupyter and AWS Glue Studio notebooks.

To visualize the Spark UI logs, you need a Spark history server. If you don’t have one already, refer to Launching the Spark History Server for deployment instructions.

Structured Streaming is based on streaming DataFrames, which represent micro-batches of messages.
The following code is an example of creating a stream DataFrame using Amazon Kinesis as the source:

kinesis_options = {
  "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
  "startingPosition": "TRIM_HORIZON",
  "inferSchema": "true",
  "classification": "json"
}
kinesisDF = glueContext.create_data_frame_from_options(
   connection_type="kinesis",
   connection_options=kinesis_options
)

The AWS Glue API helps you create the DataFrame by doing schema detection and auto decompression, depending on the format. You can also build it yourself using the Spark API directly:

kinesisDF = spark.readStream.format("kinesis").options(**kinesis_options).load()

After your run any code cell, it triggers the startup of the session, and the application soon appears in the history server as an incomplete app (at the bottom of the page there is a link to display incomplete apps) named GlueReplApp, because it’s a session cluster. For a regular job, it’s listed with the job name given when it was created.

History server home page

From the notebook, you can take a sample of the streaming data. This can help development and give an indication of the type and size of the streaming messages, which might impact performance.

Monitor the cluster with Structured Streaming

The best way to monitor and tune your AWS Glue streaming job is using the Spark UI; it gives you the overall streaming job trends on the Structured Streaming tab and the details of each individual micro-batch processing job.

Overall view of the streaming job

On the Structured Streaming tab, you can see a summary of the streams running in the cluster, as in the following example.

Normally there is just one streaming query, representing a streaming ETL. If you start multiple in parallel, it’s good if you give it a recognizable name, calling queryName() if you use the writeStream API directly on the DataFrame.

After a good number of batches are complete (such as 10), enough for the averages to stabilize, you can use Avg Input/sec column to monitor how many events or messages the job is processing. This can be confusing because the column to the right, Avg Process/sec, is similar but often has a higher number. The difference is that this process time tells us how efficient our code is, whereas the average input tells us how many messages the cluster is reading and processing.

The important thing to note is that if the two values are similar, it means the job is working at maximum capacity. It’s making the best use of the hardware but it likely won’t be able to cope with an increase in volume without causing delays.

In the last column is the latest batch number. Because they’re numbered incrementally from zero, this tells us how many batches the query has processed so far.

When you choose the link in the “Run ID” column of a streaming query, you can review the details with graphs and histograms, as in the following example.

The first two rows correspond to the data that is used to calculate the averages shown on the summary page.

For Input Rate, each data point is calculated by dividing the number of events read for the batch by the time passed between the current batch start and the previous batch start. In a healthy system that is able to keep up, this is equal to the configured trigger interval (in the GlueContext.forEachBatch() API, this is set using the option windowSize).

Because it uses the current batch rows with the previous batch latency, this graph is often unstable in the first batches until the Batch Duration (the last line graph) stabilizes.

In this example, when it stabilizes, it gets completely flat. This means that either the influx of messages is constant or the job is hitting the limit per batch set (we discuss how to do this later in the post).

Be careful if you set a limit per batch that is constantly hit, you could be silently building a backlog, but everything could look good in the job metrics. To monitor this, have a metric of latency measuring the difference between the message timestamp when it gets created and the time it’s processed.

Process Rate is calculated by dividing the number of messages in a batch by the time it took to process that batch. For instance, if the batch contains 1,000 messages, and the trigger interval is 10 seconds but the batch only needed 5 seconds to process it, the process rate would be 1000/5 = 200 msg/sec. while the input rate for that batch (assuming the previous batch also ran within the interval) is 1000/10 = 100 msg/sec.

This metric is useful to measure how efficient our code processing the batch is, and therefore it can get higher than the input rate (this doesn’t mean it’s processing more messages, just using less time). As mentioned earlier, if both metrics get close, it means the batch duration is close to the interval and therefore additional traffic is likely to start causing batch trigger delays (because the previous batch is still running) and increase latency.

Later in this post, we show how auto scaling can help prevent this situation.

Input Rows shows the number of messages read for each batch, like input rate, but using volume instead of rate.

It’s important to note that if the batch processes the data multiple times (for example, writing to multiple destinations), the messages are counted multiple times. If the rates are greater than the expected, this could be the reason. In general, to avoid reading messages multiple times, you should cache the batch while processing it, which is the default when you use the GlueContext.forEachBatch() API.

The last two rows tell us how long it takes to process each batch and how is that time spent. It’s normal to see the first batches take much longer until the system warms up and stabilizes.
The important thing to look for is that the durations are roughly stable and well under the configured trigger interval. If that’s not the case, the next batch gets delayed and could start a compounding delay by building a backlog or increasing batch size (if the limit allows taking the extra messages pending).

In Operation Duration, the majority of time should be spent on addBatch (the mustard color), which is the actual work. The rest are fixed overhead, therefore the smaller the batch process, the more percentage of time that will take. This represents the trade-off between small batches with lower latency or bigger batches but more computing efficient.

Also, it’s normal for the first batch to spend significant time in the latestOffset (the brown bar), locating the point at which it needs to start processing when there is no checkpoint.

The following query statistics show another example.

In this case, the input has some variation (meaning it’s not hitting the batch limit). Also, the process rate is roughly the same as the input rate. This tells us the system is at max capacity and struggling to keep up. By comparing the input rows and input rate, we can guess that the interval configured is just 3 seconds and the batch duration is barely able to meet that latency.

Finally, in Operation Duration, you can observe that because the batches are so frequent, a significant amount of time (proportionally speaking) is spent saving the checkpoint (the dark green bar).

With this information, we can probably improve the stability of the job by increasing the trigger interval to 5 seconds or more. This way, it checkpoints less often and has more time to process data, which might be enough to get batch duration consistently under the interval. The trade-off is that the latency between when a message is published and when it’s processed is longer.

Monitor individual batch processing

On the Jobs tab, you can see how long each batch is taking and dig into the different steps the processing involves to understand how the time is spent. You can also check if there are tasks that succeed after retry. If this happens continuously, it can silently hurt performance.

For instance, the following screenshot shows the batches on the Jobs tab of the Spark UI of our streaming job.

Each batch is considered a job by Spark (don’t confuse the job ID with the batch number; they only match if there is no other action). The job group is the streaming query ID (this is important only when running multiple queries).

The streaming job in this example has a single stage with 100 partitions. Both batches processed them successfully, so the stage is marked as succeeded and all the tasks completed (100/100 in the progress bar).

However, there is a difference in the first batch: there were 20 task failures. You know all the failed tasks succeeded in the retries, otherwise the stage would have been marked as failed. For the stage to fail, the same task would have to fail four times (or as configured by spark.task.maxFailures).

If the stage fails, the batch fails as well and possibly the whole job; if the job was started by using GlueContext.forEachBatch(), it has a number of retries as per the batchMaxRetries parameter (three by default).

These failures are important because they have two effects:

  • They can silently cause delays in the batch processing, depending on how long it took to fail and retry.
  • They can cause records to be sent multiple times if the failure is in the last stage of the batch, depending on the type of output. If the output is files, in general it won’t cause duplicates. However, if the destination is Amazon DynamoDB, JDBC, Amazon OpenSearch Service, or another output that uses batching, it’s possible that some part of the output has already been sent. If you can’t tolerate any duplicates, the destination system should handle this (for example, being idempotent).

Choosing the description link takes you to the Stages tab for that job. Here you can dig into the failure: What is the exception? Is it always in the same executor? Does it succeed on the first retry or took multiple?

Ideally, you want to identify these failures and solve them. For example, maybe the destination system is throttling us because doesn’t have enough provisioned capacity, or a larger timeout is needed. Otherwise, you should at least monitor it and decide if it is systemic or sporadic.

Sizing and scaling

Defining how to split the data is a key element in any distributed system to run and scale efficiently. The design decisions on the messaging system will have a strong influence on how the streaming job will perform and scale, and thereby affect the job parallelism.

In the case of AWS Glue Streaming, this division of work is based on Apache Spark partitions, which define how to split the work so it can be processed in parallel. Each time the job reads a batch from the source, it divides the incoming data into Spark partitions.

For Apache Kafka, each topic partition becomes a Spark partition; similarly, for Kinesis, each stream shard becomes a Spark partition. To simplify, I’ll refer to this parallelism level as number of partitions, meaning Spark partitions that will be determined by the input Kafka partitions or Kinesis shards on a one-to-one basis.

The goal is to have enough parallelism and capacity to process each batch of data in less time than the configured batch interval and therefore be able to keep up. For instance, with a batch interval of 60 seconds, the job lets 60 seconds of data build up and then processes that data. If that work takes more than 60 seconds, the next batch waits until the previous batch is complete before starting a new batch with the data that has built up since the previous batch started.

It’s a good practice to limit the amount of data to process in a single batch, instead of just taking everything that has been added since the last one. This helps make the job more stable and predictable during peak times. It allows you to test that the job can handle volume of data without issues (for example, memory or throttling).

To do so, specify a limit when defining the source stream DataFrame:

  • For Kinesis, specify the limit using kinesis.executor.maxFetchRecordsPerShard, and revise this number if the number of shards changes substantially. You might need to increase kinesis.executor.maxFetchTimeInMs as well, in order to allow more time to read the batch and make sure it’s not truncated.
  • For Kafka, set maxOffsetsPerTrigger, which divides that allowance equally between the number of partitions.

The following is an example of setting this config for Kafka (for Kinesis, it’s equivalent but using Kinesis properties):

kafka_properties= {
  "kafka.bootstrap.servers": "bootstrapserver1:9092",
  "subscribe": "mytopic",
  "startingOffsets": "latest",
  "maxOffsetsPerTrigger": "5000000"
}
# Pass the properties as options when creating the DataFrame
spark.spark.readStream.format("kafka").options(**kafka_properties).load()

Initial benchmark

If the events can be processed individually (no interdependency such as grouping), you can get a rough estimation of how many messages a single Spark core can handle by running with a single partition source (one Kafka partition or one Kinesis shard stream) with data preloaded into it and run batches with a limit and the minimum interval (1 second). This simulates a stress test with no downtime between batches.

For these repeated tests, clear the checkpoint directory, use a different one (for example, make it dynamic using the timestamp in the path), or just disable the checkpointing (if using the Spark API directly), so you can reuse the same data.
Leave a few batches to run (at least 10) to give time for the system and the metrics to stabilize.

Start with a small limit (using the limit configuration properties explained in the previous section) and do multiple reruns, increasing the value. Record the batch duration for that limit and the throughput input rate (because it’s a stress test, the process rate should be similar).

In general, larger batches tend to be more efficient up to a point. This is because the fixed overhead taken for each to checkpoint, plan, and coordinate the nodes is more significant if the batches are smaller and therefore more frequent.

Then pick your reference initial settings based on the requirements:

  • If a goal SLA is required, use the largest batch size whose batch duration is less than half the latency SLA. This is because in the worst case, a message that is stored just after a batch is triggered has to wait at least the interval and then the processing time (which should be less than the interval). When the system is keeping up, the latency in this worst case would be close to twice the interval, so aim for the batch duration to be less than half the target latency.
  • In the case where the throughput is the priority over latency, just pick the batch size that provides a higher average process rate and define an interval that allows some buffer over the observed batch duration.

Now you have an idea of the number of messages per core our ETL can handle and the latency. These numbers are idealistic because the system won’t scale perfectly linearly when you add more partitions and nodes. You can use the messages per core obtained to divide the total number of messages per second to process and get the minimum number of Spark partitions needed (each core handles one partition in parallel).

With this number of estimated Spark cores, calculate the number of nodes needed depending on the type and version, as summarized in the following table.

AWS Glue Version Worker Type vCores Spark Cores per Worker
2 G 1X 4 8
2 G 2X 8 16
3 G 0.25X 2 2
3 G 1X 4 4
3 G 2X 8 8

Using the newer version 3 is preferable because it includes more optimizations and features like auto scaling (which we discuss later). Regarding size, unless the job has some operation that is heavy on memory, it’s preferable to use the smaller instances so there aren’t so many cores competing for memory, disk, and network shared resources.

Spark cores are equivalent to threads; therefore, you can have more (or less) than the actual cores available in the instance. This doesn’t mean that having more Spark cores is going to necessarily be faster if they’re not backed by physical cores, it just means you have more parallelism competing for the same CPU.

Sizing the cluster when you control the input message system

This is the ideal case because you can optimize the performance and the efficiency as needed.

With the benchmark information you just gathered, you can define your initial AWS Glue cluster size and configure Kafka or Kinesis with the number of partitions or topics estimated, plus some buffer. Test this baseline setup and adjust as needed until the job can comfortably meet the total volume and required latency.

For instance, if we have determined that we need 32 cores to be well within the latency requirement for the volume of data to process, then we can create an AWS Glue 3.0 cluster with 9 G.1X nodes (a driver and 8 workers with 4 cores = 32) which reads from a Kinesis data stream with 32 shards.

Imagine that the volume of data in that stream doubles and we want to keep the latency requirements. To do so, we double the number of workers (16 + 1 driver = 17) and the number of shards on the stream (now 64). Remember this is just a reference and needs to be validated; in practice you might need more or less nodes depending on the cluster size, if the destination system can keep up, complexity of transformations, or other parameters.

Sizing the cluster when you don’t control the message system configuration

In this case, your options for tuning are much more limited.

Check if a cluster with the same number of Spark cores as existing partitions (determined by the message system) is able to keep up with the expected volume of data and latency, plus some allowance for peak times.

If that’s not the case, adding more nodes alone won’t help. You need to repartition the incoming data inside AWS Glue. This operation adds an overhead to redistribute the data internally, but it’s the only way the job can scale out in this scenario.

Let’s illustrate with an example. Imagine we have a Kinesis data stream with one shard that we don’t control, and there isn’t enough volume to justify asking the owner to increase the shards. In the cluster, significant computing for each message is needed; for each message, it runs heuristics and other ML techniques to take action depending on the calculations. After running some benchmarks, the calculations can be done promptly for the expected volume of messages using 8 cores working in parallel. By default, because there is only one shard, only one core will process all the messages sequentially.

To solve this scenario, we can provision an AWS Glue 3.0 cluster with 3 G 1X nodes to have 8 worker cores available. In the code repartition, the batch distributes the messages randomly (as evenly as possible) between them:

def batch_function(data_frame, batch_id):
    # Repartition so the udf is called in parallel for each partition
    data_frame.repartition(8).foreach(process_event_udf)

glueContext.forEachBatch(frame=streaming_df, batch_function=batch_function)

If the messaging system resizes the number of partitions or shards, the job picks up this change on the next batch. You might need to adjust the cluster capacity accordingly with the new data volume.

The streaming job is able to process more partitions than Spark cores are available, but might cause inefficiencies because the additional partitions will be queued and won’t start being processed until others finish. This might result in many nodes being idle while the remaining partitions finish and the next batch can be triggered.

When the messages have processing interdependencies

If the messages to be processed depend on other messages (either in the same or previous batches), that’s likely to be a limiting factor on the scalability. In that case, it might help to analyze a batch (job in Spark UI) to see where the time is spent and if there are imbalances by checking the task duration percentiles on the Stages tab (you can also reach this page by choosing a stage on the Jobs tab).

Auto scaling

Up to now, you have seen sizing methods to handle a stable stream of data with the occasional peak.
However, for variable incoming volumes of data, this isn’t cost-effective because you need to size for the worst-case scenario or accept higher latency at peak times.

This is where AWS Glue Streaming 3.0 auto scaling comes in. You can enable it for the job and define the maximum number of workers you want to allow (for example, using the number you have determined needed for the peak times).

The runtime monitors the trend of time spent on batch processing and compares it with the configured interval. Based on that, it makes a decision to increase or decrease the number of workers as needed, being more aggressive as the batch times get near or go over the allowed interval time.

The following screenshot is an example of a streaming job with auto scaling enabled.

Splitting workloads

You have seen how to scale a single job by adding nodes and partitioning the data as needed, which is enough on most cases. As the cluster grows, there is still a single driver and the nodes have to wait for the others to complete the batch before they can take additional work. If it reaches a point that increasing the cluster size is no longer effective, you might want to consider splitting the workload between separate jobs.

In the case of Kinesis, you need to divide the data into multiple streams, but for Apache Kafka, you can divide a topic into multiple jobs by assigning partitions to each one. To do so, instead of the usual subscribe or subscribePattern where the topics are listed, use the property assign to assign using JSON a subset of the topic partitions that the job will handle (for example, {"topic1": [0,1,2]}). At the time of this writing, it’s not possible to specify a range, so you need to list all the partitions, for instance building that list dynamically in the code.

Sizing down

For low volumes of traffic, AWS Glue Streaming has a special type of small node: G 0.25X, which provides two cores and 4 GB RAM for a quarter of the cost of a DPU, so it’s very cost-effective. However, even with that frugal capacity, if you have many small streams, having a small cluster for each one is still not practical.

For such situations, there are currently a few options:

  • Configure the stream DataFrame to feed from multiple Kafka topics or Kinesis streams. Then in the DataFrame, use the columns topic and streamName, for Kafka and Kinesis sources respectively, to determine how to handle the data (for example, different transformations or destinations). Make sure the DataFrame is cached, so you don’t read the data multiple times.
  • If you have a mix of Kafka and Kinesis sources, you can define a DataFrame for each, join them, and process as needed using the columns mentioned in the previous point.
  • The preceding two cases require all the sources to have the same batch interval and links their processing (for example, a busier stream can delay a slower one). To have independent stream processing inside the same cluster, you can trigger the processing of separate stream’s DataFrames using separate threads. Each stream is monitored separately in the Spark UI, but you’re responsible for starting and managing those threads and handle errors.

Settings

In this post, we showed some config settings that impact performance. The following table summarizes the ones we discussed and other important config properties to use when creating the input stream DataFrame.

Property Applies to Remarks
maxOffsetsPerTrigger Kafka Limit of messages per batch. Divides the limit evenly among partitions.
kinesis.executor.maxFetchRecordsPerShard Kinesis Limit per each shard, therefore should be revised if the number of shards changes.
kinesis.executor.maxFetchTimeInMs Kinesis When increasing the batch size (either by increasing the batch interval or the previous property), the executor might need more time, allotted by this property.
startingOffsets Kafka Normally you want to read all the data available and therefore use earliest. However, if there is a big backlog, the system might take a long time to catch up and instead use latest to skip the history.
startingposition Kinesis Similar to startingOffsets, in this case the values to use are TRIM_HORIZON to backload and LATEST to start processing from now on.
includeHeaders Kafka Enable this flag if you need to merge and split multiple topics in the same job (see the previous section for details).
kinesis.executor.maxconnections Kinesis When writing to Kinesis, by default it uses a single connection. Increasing this might improve performance.
kinesis.client.avoidEmptyBatches Kinesis It’s best to set it to true to avoid wasting resources (for example, generating empty files) when there is no data (like the Kafka connector does). GlueContext.forEachBatch prevents empty batches by default.

Further optimizations

In general, it’s worth doing some compression on the messages to save on transfer time (at the expense of some CPU, depending on the compression type used).

If the producer compresses the messages individually, AWS Glue can detect it and decompress automatically in most cases, depending on the format and type. For more information, refer to Adding Streaming ETL Jobs in AWS Glue.

If using Kafka, you have the option to compress the topic. This way, the compression is more effective because it’s done in batches, end-to-end, and it’s transparent to the producer and consumer.

By default, the GlueContext.forEachBatch function caches the incoming data. This is helpful if the data needs to be sent to multiple sinks (for example, as Amazon S3 files and also to update a DynamoDB table) because otherwise the job would read the data multiple times from the source. But it can be detrimental to performance if the volume of data is big and there is only one output.

To disable this option, set persistDataFrame as false:

glueContext.forEachBatch(
    frame=myStreamDataFrame,
    batch_function=processBatch,
    options={
        "windowSize": "30 seconds",
        "checkpointLocation": myCheckpointPath,
        "persistDataFrame":  "false"
    }
)

In streaming jobs, it’s common to have to join streaming data with another DataFrame to do enrichment (for example, lookups). In that case, you want to avoid any shuffle if possible, because it splits stages and causes data to be moved between nodes.

When the DataFrame you’re joining to is relatively small to fit in memory, consider using a broadcast join. However, bear in mind it will be distributed to the nodes on every batch, so it might not be worth it if the batches are too small.

If you need to shuffle, consider enabling the Kryo serializer (if using custom serializable classes you need to register them first to use it).

As in any AWS Glue jobs, avoid using custom udf() if you can do the same with the provided API like Spark SQL. User-defined functions (UDFs) prevent the runtime engine from performing many optimizations (the UDF code is a black box for the engine) and in the case of Python, it forces the movement of data between processes.

Avoid generating too many small files (especially columnar like Parquet or ORC, which have overhead per file). To do so, it might be a good idea to coalesce the micro-batch DataFrame before writing the output. If you’re writing partitioned data to Amazon S3, repartition based on columns can significantly reduce the number of output files created.

Conclusion

In this post, you saw how to approach sizing and tuning an AWS Glue streaming job in different scenarios, including planning considerations, configuration, monitoring, tips, and pitfalls.

You can now use these techniques to monitor and improve your existing streaming jobs or use them when designing and building new ones.


About the author

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

Analyzing Amazon SES event data with AWS Analytics Services

Post Syndicated from Oscar Mendoza original https://aws.amazon.com/blogs/messaging-and-targeting/analyzing-amazon-ses-event-data-with-aws-analytics-services/

In this post, we will walk through using AWS Services, such as, Amazon Kinesis Firehose, Amazon Athena and Amazon QuickSight to monitor Amazon SES email sending events with the granularity and level of detail required to get insights from your customers engage with the emails you send.

Nowadays, email Marketers rely on internal applications to create their campaigns or any communications requirements, such us newsletters or promotional content. From those activities, they need to collect as much information as possible to analyze and improve their pipeline to get better interaction with the customers. Data such us bounces, rejections, success reception, delivery delays, complaints or open rate can be a powerful tool to understand the customers. Usually applications work with high-level data points without detailed logging or granular information that could help improve even better the effectiveness of their campaigns.

Amazon Simple Email Service (SES) is a smart tool for companies that wants a cost-effective, flexible, and scalable email service solution to easily integrate with their own products. Amazon SES provides methods to control your sending activity with built-in integration with Amazon CloudWatch Metrics and also provides a mechanism to collect the email sending events data.

In this post, we propose an architecture and step-by-step guide to track your email sending activities at a granular level, where you can configure several types of email sending events, including sends, deliveries, opens, clicks, bounces, complaints, rejections, rendering failures, and delivery delays. We will use the configuration set feature of Amazon SES to send detailed logging to our analytics services to store, query and create dashboards for a detailed view.

Overview of solution

This architecture uses Amazon SES built-in features and AWS analytics services to provide a quick and cost-effective solution to address your mail tracking requirements. The following services will be implemented or configured:

The following diagram shows the architecture of the solution:

Serverless Architecture to Analyze Amazon SES events

Figure 1. Serverless Architecture to Analyze Amazon SES events

The flow of the events starts when a customer uses Amazon SES to send an email. Each of those send events will be capture by the configuration set feature and forward the events to a Kinesis Firehose delivery stream to buffer and store those events on an Amazon S3 bucket.

After storing the events, it will be required to create a database and table schema and store it on AWS Glue Data Catalog in order for Amazon Athena to be able to properly query those events on S3. Finally, we will use Amazon QuickSight to create interactive dashboard to search and visualize all your sending activity with an email level of detailed.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Walkthrough

Step 1: Use AWS CloudFormation to deploy some additional prerequisites

You can get started with our sample AWS CloudFormation template that includes some prerequisites. This template creates an Amazon S3 Bucket, an IAM role needed to access from Amazon SES to Amazon Kinesis Data Firehose.

To download the CloudFormation template, run one of the following commands, depending on your operating system:

In Windows:

curl https://raw.githubusercontent.com/aws-samples/amazon-ses-analytics-blog/main/SES-Blog-PreRequisites.yml -o SES-Blog-PreRequisites.yml

In MacOS

wget https://raw.githubusercontent.com/aws-samples/amazon-ses-analytics-blog/main/SES-Blog-PreRequisites.yml

To deploy the template, use the following AWS CLI command:

aws cloudformation deploy --template-file ./SES-Blog-PreRequisites.yml --stack-name ses-dashboard-prerequisites --capabilities CAPABILITY_NAMED_IAM

After the template finishes creating resources, you see the IAM Service role and the Delivery Stream on the stack Outputs tab. You are going to use these resources in the following steps.

IAM Service role and Delivery Stream created by CloudFormation template

Figure 2. CloudFormation template outputs

Step 2: Creating a configuration set in SES and setting the default configuration set for a verified identity

SES can track the number of send, delivery, open, click, bounce, and complaint events for each email you send. You can use event publishing to send information about these events to other AWS service. In this case we are going to send the events to Kinesis Firehose. To do this, a configuration set is required.

To create a configuration set, complete the following steps:

  1. On the AWS Console, choose the Amazon Simple Email Service.
  2. Choose Configuration sets.
  3. Click on Create set.

    Create a configuration set in Amazon SES

    Figure 3. Amazon SES Create Configuration Set

  4. Set a Configuration set name.
  5. Leave the other configurations by default.

    Write a name for your configuration set

    Figure 4. Configuration Set Name

  6. Once the configuration set is created, select Event destinations

    Configuration set created successfully

    Figure 5. Configuration set created successfully

  7. Click on Add destination
  8. Select the event types you would like to analyze and then click on next.

    Sending Events to analyze

    Figure 6. Sending Events to analyze

  9. Select Amazon Kinesis Data Firehose as the destination, choose the delivery stream and the IAM role created previously, click on next and in the review page, click on Add destination.

    Destination for Amazon SES sending events

    Figure 7. Destination for Amazon SES sending events

  10. Once you have created the configuration set and added the event destination, you can define the Default configuration set for the verified identity (domain or email address). In the SES console, choose Verified identities.

    Amazon SES Verified Identity

    Figure 8 Amazon SES Verified Identity

  11. Choose the verified identity from which you want to collect events and select Configuration set. Click on Edit.

    Edit Configuration Set for Verified Identity

    Figure 9. Edit Configuration Set for Verified Identity

  12. Click on the checkbox Assign a default configuration set and choose the configuration set created previously.

    Assign default configuration set

    Figure 10. Assign default configuration set

  13. Once you have completed the previous steps, your events will be sent to Amazon S3. Due to the buffer’s configuration on the Kinesis Delivery Stream, the data will be loaded every 5 minutes or every 5 MiB to Amazon S3. You can check the structure created on the bucket and see json logs with SES events data.

    Amazon S3 bucket structure

    Figure 11. Amazon S3 bucket structure

Step 3: Using Amazon Athena to query the SES event logs

Amazon SES publishes email sending event records to Amazon Kinesis Data Firehose in JSON format. The top-level JSON object contains an eventType string, a mail object, and either a Bounce, Complaint, Delivery, Send, Reject, Open, Click, Rendering Failure, or DeliveryDelay object, depending on the type of event.

  1. In order to simplify the analysis of email sending events, create the sesmaster table by running the following script in Amazon Athena. Don’t forget to change the location in the following script with your own bucket containing the data of email sending events.
    CREATE EXTERNAL TABLE sesmaster (
    
    eventType string,
    
    complaint struct<arrivaldate:string,
    complainedrecipients:array<struct<emailaddress:string>>,
    complaintfeedbacktype:string,
    feedbackid:string,
    `timestamp`:string,
    useragent:string>,
    
    bounce struct<bouncedrecipients:array<struct<action:string,
    diagnosticcode:string,
    emailaddress:string,
    status:string>>,
    bouncesubtype:string,
    bouncetype:string,
    feedbackid:string,
    reportingmta:string,
    `timestamp`:string>,
    
    mail struct<`timestamp`:string,
    source:string,
    sourceArn:string,
    sendingAccountId:string,
    messageId:string,
    destination:string,
    headersTruncated:boolean,
    headers:array<struct<name:string,
    value:string>>,
    commonHeaders:struct<`from`:array<string>,
    to:array<string>,
    messageId:string,
    subject:string>,
    tags:struct<ses_configurationset:string,
    ses_source_ip:string,
    ses_outgoing_ip:string,
    ses_from_domain:string,
    ses_caller_identity:string> >,
    
    send string,
    
    delivery struct<processingtimemillis:int,
    recipients:array<string>,
    reportingmta:string,
    smtpresponse:string,
    `timestamp`:string>,
    
    open struct<ipaddress:string,
    `timestamp`:string,
    userAgent:string>,
    
    reject struct<reason:string>,
    
    click struct<ipAddress:string,
    `timestamp`:string,
    userAgent:string,
    link:string>
    
    ) 
    ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES (
    "mapping.ses_configurationset"="ses:configuration-set" , "mapping.ses_source_ip"="ses:source-ip" , 
    "mapping.ses_from_domain"="ses:from-domain" , "mapping.ses_caller_identity"="ses:caller-identity" , 
    "mapping.ses_outgoing_ip"="ses:outgoing-ip" ) LOCATION 's3://aws-s3-ses-analytics-<aws-account-number>/'

    The sesmaster table uses the org.openx.data.jsonserde.JsonSerDe SerDe library to deserialize the JSON data.

    We have leveraged the support for JSON arrays and maps and the support for nested data structures. Those features ease the process of preparation and visualization of data.

    In the sesmaster table, the following mappings were applied to avoid errors due to name of JSON fields containing colons.

    • “mapping.ses_configurationset”=”ses:configuration-set”
    • “mapping.ses_source_ip”=”ses:source-ip”
    • “mapping.ses_from_domain”=”ses:from-domain”
    • “mapping.ses_caller_identity”=”ses:caller-identity” “mapping.ses_outgoing_ip”=”ses:outgoing-ip”
  2. Once the sesmaster table is ready, it is a good strategy to create curated views of its data. The first view called vwSESMaster contains all the records of email sending events and all the fields which are unique on each event. Create the vwSESMaster view by running the following script in Amazon Athena.
    CREATE OR REPLACE VIEW vwSESMaster AS
    SELECT
    eventtype as eventtype
    , mail.messageId as mailmessageid
    , mail.timestamp as mailtimestamp
    , mail.source as mailsource
    , mail.sendingAccountId as mailsendingAccountId
    , mail.commonHeaders.subject as mailsubject
    , mail.tags.ses_configurationset as mailses_configurationset
    , mail.tags.ses_source_ip as mailses_source_ip
    , mail.tags.ses_from_domain as mailses_from_domain
    , mail.tags.ses_outgoing_ip as mailses_outgoing_ip
    , delivery.processingtimemillis as deliveryprocessingtimemillis
    , delivery.reportingmta as deliveryreportingmta
    , delivery.smtpresponse as deliverysmtpresponse
    , delivery.timestamp as deliverytimestamp
    , delivery.recipients[1] as deliveryrecipient
    , open.ipaddress as openipaddress
    , open.timestamp as opentimestamp
    , open.userAgent as openuseragent
    , bounce.bounceType as bouncebounceType
    , bounce.bouncesubtype as bouncebouncesubtype
    , bounce.feedbackid as bouncefeedbackid
    , bounce.timestamp as bouncetimestamp
    , bounce.reportingMTA as bouncereportingmta
    , click.ipAddress as clickipaddress
    , click.timestamp as clicktimestamp
    , click.userAgent as clickuseragent
    , click.link as clicklink
    , complaint.timestamp as complainttimestamp
    , complaint.userAgent as complaintuseragent
    , complaint.complaintFeedbackType as complaintcomplaintfeedbacktype
    , complaint.arrivalDate as complaintarrivaldate
    , reject.reason as rejectreason
    FROM
    sesmaster

    The sesmaster table contains some fields which are represented by nested arrays, so it is necessary to flatten them into multiples rows. Following you can see the event types and the fields which need to be flatten.

    • Event type SEND: field mail.commonHeaders
    • Event type BOUNCE: field bounce.bouncedrecipients
    • Event type COMPLAINT: field complaint.complainedrecipients

    To flatten those arrays into multiple rows, we used the CROSS JOIN in conjunction with the UNNEST operator using the following strategy for all the three events:

    • Create a temporal view with the mail.messageID and the field to be flattened.
    • Create another temporal view with the array flattened into multiple rows.
    • Create the final view joining the sesmaster table with the second temporal view by event type and mail.messageID.

    To create those views, follow the next steps.

  3. Run the following scripts in Amazon Athena to flat the mail.commonHeaders array in the SEND event type
    CREATE OR REPLACE VIEW vwSendMailTmpSendTo AS 
    SELECT
    mail.messageId as messageid
    , mail.commonHeaders.to as recipients
    FROM
    sesmaster
    WHERE 
    eventtype='Send'
    
    CREATE OR REPLACE VIEW vwsendmailrecipients AS 
    SELECT
    messageid
    , recipient
    FROM
    ("vwSendMailTmpSendTo"
    CROSS JOIN UNNEST(recipients) t (recipient))
    
    CREATE OR REPLACE VIEW vwSentMails AS
    SELECT 
    eventtype as eventtype
    , mail.messageId as mailmessageid
    , mail.timestamp as mailtimestamp
    , mail.source as mailsource
    , mail.sendingAccountId as mailsendingAccountId
    , mail.commonHeaders.subject as mailsubject
    , mail.tags.ses_configurationset as mailses_configurationset
    , mail.tags.ses_source_ip as mailses_source_ip
    , mail.tags.ses_from_domain as mailses_from_domain
    , mail.tags.ses_outgoing_ip as mailses_outgoing_ip
    , dest.recipient as mailto
    FROM
    sesmaster as sm
    ,vwsendmailrecipients as dest
    WHERE
    sm.eventtype = 'Send'
    and sm.mail.messageid = dest.messageid
  4. Run the following scripts in Amazon Athena to flat the bounce.bouncedrecipients array in the BOUNCE event type
    CREATE OR REPLACE VIEW vwbouncemailtmprecipients AS 
    SELECT
    mail.messageId as messageid
    , bounce.bouncedrecipients
    FROM
    sesmaster
    WHERE (eventtype = 'Bounce')
    
    CREATE OR REPLACE VIEW vwbouncemailrecipients AS 
    SELECT
    messageid
    , recipient.action
    , recipient.diagnosticcode
    , recipient.emailaddress
    FROM
    (vwbouncemailtmprecipients
    CROSS JOIN UNNEST(bouncedrecipients) t (recipient))
    
    CREATE OR REPLACE VIEW vwBouncedMails AS
    SELECT
    eventtype as eventtype
    , mail.messageId as mailmessageid
    , mail.timestamp as mailtimestamp
    , mail.source as mailsource
    , mail.sendingAccountId as mailsendingAccountId
    , mail.commonHeaders.subject as mailsubject
    , mail.tags.ses_configurationset as mailses_configurationset
    , mail.tags.ses_source_ip as mailses_source_ip
    , mail.tags.ses_from_domain as mailses_from_domain
    , mail.tags.ses_outgoing_ip as mailses_outgoing_ip
    , bounce.bounceType as bouncebounceType
    , bounce.bouncesubtype as bouncebouncesubtype
    , bounce.feedbackid as bouncefeedbackid
    , bounce.timestamp as bouncetimestamp
    , bounce.reportingMTA as bouncereportingmta
    , bd.action as bounceaction
    , bd.diagnosticcode as bouncediagnosticcode
    , bd.emailaddress as bounceemailaddress
    FROM
    sesmaster as sm
    ,vwbouncemailrecipients as bd
    WHERE
    sm.eventtype = 'Bounce'
    and sm.mail.messageid = bd.messageid
    
  5. Run the following scripts in Amazon Athena to flat the complaint.complainedrecipients array in the COMPLAINT event type
    CREATE OR REPLACE VIEW vwcomplainttmprecipients AS 
    SELECT
    mail.messageId as messageid
    , complaint.complainedrecipients
    FROM
    sesmaster
    WHERE (eventtype = 'Complaint')
    
    CREATE OR REPLACE VIEW vwcomplainedrecipients AS 
    SELECT
    messageid
    , recipient.emailaddress
    FROM
    (vwcomplainttmprecipients 
    CROSS JOIN UNNEST(complainedrecipients) t (recipient))
    

    At the end we have one table and four views which can be used in Amazon QuickSight to analyze email sending events:

    • Table sesmaster
    • View vwSESMaster
    • View vwSentMails
    • View vwBouncedMails
    • View vwComplainedemails

Step 4: Analyze and visualize data with Amazon QuickSight

 In this blog post, we use Amazon QuickSight to analyze and to visualize email sending events from the sesmaster and the four curated views created previously. Amazon QuickSight can directly access data through Athena. Its pay-per-session pricing enables you to put analytical insights into the hands of everyone in your organization.

Let’s set this up together. We first need to select our table and our views to create new data sources in Athena and then we use these data sources to populate the visualization. We are creating just an example of visualization. Feel free to create your own visualization based on your information needs.

Before we can use the data in Amazon QuickSight, we need to first grant access to the underlying S3 bucket. If you haven’t done so already for other analyses, see our documentation on how to do so.

  1. On the Amazon QuickSight home page, choose Datasets from the menu on the left side, then choose New dataset from the upper-right corner, set and pick Athena as data source. In the following dialog box, give the data source a descriptive name and choose Create data source.

    Create New Athena Data Source

    Figure 12. Create New Athena Data Source

  2. In the following dialog box, select the Catalog and the Database containing your sesmaster and curated views. Let’s select the sesmaster table in order to create some basic Key Performance Indicators. Select the table sesmaster and click on the Select

    Select Sesmaster Table

    Figure 13. Select Sesmaster Table

  3. Our sesmaster table now is a data source for Amazon QuickSight and we can turn to visualizing the data.

    QuickSight Visualize Data

    Figure 14. QuickSight Visualize Data

  4. You can see the list fields on the left. The canvas on the right is still empty. Before we populate it with data, let’s select Key Performance Indicator from the available visual types.

    QuickSight Visual Types

    Figure 15. QuickSight Visual Types

  5. To populate the graph, drag and drop the fields from the field list on the left onto their respective destinations. In our case, we put the field send onto the value well and use count as aggregation.

    Add Send field to visualization

    Figure 16. Add Send field to visualization

  6. Add another visual from the left-upper side and select Key Performance Indicator as visual type.
    Add a new visual

    Figure 17. Add a new visual

    Key Performance Indicator Visual Type

    Figure 18. Key Performance Indicator Visual Type

  7. Put the field Delivery onto the value well and use count as aggregation.

    Add Delivery Field to visualization

    Figure 19. Add Delivery Field to visualization

  8. Repeat the same procedure, (steps 1 to 4) to count the number of Open, Click, Bounce, Complaint and Reject Events. At the end, you should see something similar to the following visualization. After resizing and rearranging the visuals, you should get an analysis like the shown in the image below.

    Preview of Key Performance Indicators

    Figure 20. Preview of Key Performance Indicators

  9. Let´s add another dataset by clicking the pencil on the right of the current Dataset.

    Add a New Dataset

    Figure 21. Add a New Dataset

  10. On the following dialog box, select Add Dataset.

    Add a New Dataset

    Figure 22. Add a New Dataset

  11. Select the view called vwsesmaster and click Select.
    Add vwsesmaster dataset

    Figure 23. Add vwsesmaster dataset

    Now you can see all the available fields of the vwsesmaster view.

    New fields from vwsesmaster dataset

    Figure 24. New fields from vwsesmaster dataset

  12. Let’s create a new visual and select the Table visual type.

    QuickSight Visual Types

    Figure 25. QuickSight Visual Types

  13. Drag and drop the fields from the field list on the left onto their respective destinations. In our case, we put the fields eventtype, mailmessageid, and mailsubject onto the Group By well, but you can add as many fields as you need.

    Add eventtype, mailmessageid and mailsubject fields

    Figure 26. Add eventtype, mailmessageid and mailsubject fields

  14. Now let’s create a filter for this visual in order to filter by type of event. Be sure you select the table and then click on Filter on the left menu.

    Add a Filter

    Figure 27. Add a Filter

  15. Click on Create One and select the field eventtype on the popup window. Now select the eventtype filter to see the following options.

    Create eventtype filter

    Figure 28. Create eventtype filter

  16. Click on the dots on the right of the eventtype filter and select Add to Sheet.

    Add filter to sheet

    Figure 29. Add filter to sheet

  17. Leave all the default values, scroll down and select Apply

    Apply filters with default values

    Figure 30. Apply filters with default values

  18. Now you can filter the vwsesmaster view by eventtype.

    Filter vwsesmasterview by eventtype

    Figure 31. Filter vwsesmasterview by eventtype

  19. You can continue customizing your visualization with all the available data in the sesmaster table, the vwsesmaster view and even add more datasets to include data from the vwSentMails, vwBouncedMails, and vwComplainedemails views. Below, you can see some other visualizations created from those views.
    Final visualization 1

    Figure 32. Final visualization 1

    Final visualization 2

    Figure 33. Final visualization 2

    Final visualization 3

    Figure 34. Final visualization 3

Clean up

To avoid ongoing charges, clean up the resources you created as part of this post:

  1. Delete the visualizations created in Amazon Quicksight.
  2. Unsubscribe from Amazon QuickSight if you are not using it for other projects.
  3. Delete the views and tables created in Amazon Athena.
  4. Delete the Amazon SES configuration set.
  5. Delete the Amazon SES events stored in S3.
  6. Delete the CloudFormation stack in order to delete the Amazon Kinesis Delivery Stream.

Conclusion

In this blog we showed how you can use AWS native services and features to quickly create an email tracking solution based on Amazon SES events to have a more detailed view on your sending activities. This solution uses a full serverless architecture without having to manage the underlying infrastructure and giving you the flexibility to use the solution for small, medium or intense Amazon SES usage, without having to take care of any servers.

We showed you some samples of dashboards and analysis that can be built for most of customers requirements, but of course you can evolve this solution and customize it according to your needs, adding or removing charts, filters or events to the dashboard. Please refer to the following documentation for the available Amazon SES Events, their structure and also how to create analysis and dashboards on Amazon QuickSight:

From a performance and cost efficiency perspective there are still several configurations that can be done to improve the solution, for example using a columnar file formant like parquet, compressing with snappy or setting your S3 partition strategy according to your email sending usage. Another improvement could be importing data into SPICE to read data in Amazon Quicksight. Using SPICE results in the data being loaded from Athena only once, until it is either manually refreshed or automatically refreshed using a schedule.

You can use this walkthrough to configure your first SES dashboard and start visualizing events detail. You can adjust the services described in this blog according to your company requirements.

About the authors

Oscar Mendoza AWS Solutions Architect Oscar Mendoza is a Solutions Architect at AWS based in Bogotá, Colombia. Oscar works with our customers to provide guidance in architectural best practices and to build Well Architected solutions on the AWS platform. He enjoys spending time with his family and his dog and playing music.
Luis Eduardo Torres AWS Solutions Architect Luis Eduardo Torres is a Solutions Architect at AWS based in Bogotá, Colombia. He helps companies to build their business using the AWS cloud platform. He has a great interest in Analytics and has been leading the Analytics track of AWS Podcast in Spanish.
Santiago Benavidez AWS Solutions Architect Santiago Benavídez is a Solutions Architect at AWS based in Buenos Aires, Argentina, with more than 13 years of experience in IT, currently helping DNB/ISV customers to achieve their business goals using the breadth and depth of AWS services, designing highly available, resilient and cost-effective architectures.

Stream change data to Amazon Kinesis Data Streams with AWS DMS

Post Syndicated from Sukhomoy Basak original https://aws.amazon.com/blogs/big-data/stream-change-data-to-amazon-kinesis-data-streams-with-aws-dms/

In this post, we discuss how to use AWS Database Migration Service (AWS DMS) native change data capture (CDC) capabilities to stream changes into Amazon Kinesis Data Streams.

AWS DMS is a cloud service that makes it easy to migrate relational databases, data warehouses, NoSQL databases, and other types of data stores. You can use AWS DMS to migrate your data into the AWS Cloud or between combinations of cloud and on-premises setups. AWS DMS also helps you replicate ongoing changes to keep sources and targets in sync.

CDC refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in real time to a downstream system. Capturing every change from transactions in a source database and moving them to the target in real time keeps the systems synchronized, and helps with real-time analytics use cases and zero-downtime database migrations.

Kinesis Data Streams is a fully managed streaming data service. You can continuously add various types of data such as clickstreams, application logs, and social media to a Kinesis stream from hundreds of thousands of sources. Within seconds, the data will be available for your Kinesis applications to read and process from the stream.

AWS DMS can do both replication and migration. Kinesis Data Streams is most valuable in the replication use case because it lets you react to replicated data changes in other integrated AWS systems.

This post is an update to the post Use the AWS Database Migration Service to Stream Change Data to Amazon Kinesis Data Streams. This new post includes steps required to configure AWS DMS and Kinesis Data Streams for a CDC use case. With Kinesis Data Streams as a target for AWS DMS, we make it easier for you to stream, analyze, and store CDC data. AWS DMS uses best practices to automatically collect changes from a data store and stream them to Kinesis Data Streams.

With the addition of Kinesis Data Streams as a target, we’re helping customers build data lakes and perform real-time processing on change data from your data stores. You can use AWS DMS in your data integration pipelines to replicate data in near-real time directly into Kinesis Data Streams. With this approach, you can build a decoupled and eventually consistent view of your database without having to build applications on top of a database, which is expensive. You can refer to the AWS whitepaper AWS Cloud Data Ingestion Patterns and Practices for more details on data ingestion patters.

AWS DMS sources for real-time change data

The following diagram illustrates that AWS DMS can use many of the most popular database engines as a source for data replication to a Kinesis Data Streams target. The database source can be a self-managed engine running on an Amazon Elastic Compute Cloud (Amazon EC2) instance or an on-premises database, or it can be on Amazon Relational Database Service (Amazon RDS), Amazon Aurora, or Amazon DocumentDB (with MongoDB availability).

Kinesis Data Streams can collect, process, and store data streams at any scale in real time and write to AWS Glue, which is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. You can use Amazon EMR for big data processing, Amazon Kinesis Data Analytics to process and analyze streaming data , Amazon Kinesis Data Firehose to run ETL (extract, transform, and load) jobs on streaming data, and AWS Lambda as a serverless compute for further processing, transformation, and delivery of data for consumption.

You can store the data in a data warehouse like Amazon Redshift, which is a cloud-scale data warehouse, and in an Amazon Simple Storage Service (Amazon S3) data lake for consumption. You can use Kinesis Data Firehose to capture the data streams and load the data into S3 buckets for further analytics.

Once the data is available in Kinesis Data Streams targets (as shown in the following diagram), you can visualize it using Amazon QuickSight; run ad hoc queries using Amazon Athena; access, process, and analyze it using an Amazon SageMaker notebook instance; and efficiently query and retrieve structured and semi-structured data from files in Amazon S3 without having to load the data into Amazon Redshift tables using Amazon Redshift Spectrum.

Solution overview

In this post, we describe how to use AWS DMS to load data from a database to Kinesis Data Streams in real time. We use a SQL Server database as example, but other databases like Oracle, Microsoft Azure SQL, PostgreSQL, MySQL, SAP ASE, MongoDB, Amazon DocumentDB, and IBM DB2 also support this configuration.

You can use AWS DMS to capture data changes on the database and then send this data to Kinesis Data Streams. After the streams are ingested in Kinesis Data Streams, they can be consumed by different services like Lambda, Kinesis Data Analytics, Kinesis Data Firehose, and custom consumers using the Kinesis Client Library (KCL) or the AWS SDK.

The following are some use cases that can use AWS DMS and Kinesis Data Streams:

  • Triggering real-time event-driven applications – This use case integrates Lambda and Amazon Simple Notification Service (Amazon SNS).
  • Simplifying and decoupling applications – For example, moving from monolith to microservices. This solution integrates Lambda and Amazon API Gateway.
  • Cache invalidation, and updating or rebuilding indexes – Integrates Amazon OpenSearch Service (successor to Amazon Elasticsearch Service) and Amazon DynamoDB.
  • Data integration across multiple heterogeneous systems – This solution sends data to DynamoDB or another data store.
  • Aggregating data and pushing it to downstream system – This solution uses Kinesis Data Analytics to analyze and integrate different sources and load the results in another data store.

To facilitate the understanding of the integration between AWS DMS, Kinesis Data Streams, and Kinesis Data Firehose, we have defined a business case that you can solve. In this use case, you are the data engineer of an energy company. This company uses Amazon Relational Database Service (Amazon RDS) to store their end customer information, billing information, and also electric meter and gas usage data. Amazon RDS is their core transaction data store.

You run a batch job weekly to collect all the transactional data and send it to the data lake for reporting, forecasting, and even sending billing information to customers. You also have a trigger-based system to send emails and SMS periodically to the customer about their electricity usage and monthly billing information.

Because the company has millions of customers, processing massive amounts of data every day and sending emails or SMS was slowing down the core transactional system. Additionally, running weekly batch jobs for analytics wasn’t giving accurate and latest results for the forecasting you want to do on customer gas and electricity usage. Initially, your team was considering rebuilding the entire platform and avoiding all those issues, but the core application is complex in design, and running in production for many years and rebuilding the entire platform will take years and cost millions.

So, you took a new approach. Instead of running batch jobs on the core transactional database, you started capturing data changes with AWS DMS and sending that data to Kinesis Data Streams. Then you use Lambda to listen to a particular data stream and generate emails or SMS using Amazon SNS to send to the customer (for example, sending monthly billing information or notifying when their electricity or gas usage is higher than normal). You also use Kinesis Data Firehose to send all transaction data to the data lake, so your company can run forecasting immediately and accurately.

The following diagram illustrates the architecture.

In the following steps, you configure your database to replicate changes to Kinesis Data Streams, using AWS DMS. Additionally, you configure Kinesis Data Firehose to load data from Kinesis Data Streams to Amazon S3.

It’s simple to set up Kinesis Data Streams as a change data target in AWS DMS and start streaming data. For more information, see Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service.

To get started, you first create a Kinesis data stream in Kinesis Data Streams, then an AWS Identity and Access Management (IAM) role with minimal access as described in Prerequisites for using a Kinesis data stream as a target for AWS Database Migration Service. After you define your IAM policy and role, you set up your source and target endpoints and replication instance in AWS DMS. Your source is the database that you want to move data from, and the target is the database that you’re moving data to. In our case, the source database is a SQL Server database on Amazon RDS, and the target is the Kinesis data stream. The replication instance processes the migration tasks and requires access to the source and target endpoints inside your VPC.

A Kinesis delivery stream (created in Kinesis Data Firehose) is used to load the records from the database to the data lake hosted on Amazon S3. Kinesis Data Firehose can load data also to Amazon Redshift, Amazon OpenSearch Service, an HTTP endpoint, Datadog, Dynatrace, LogicMonitor, MongoDB Cloud, New Relic, Splunk, and Sumo Logic.

Configure the source database

For testing purposes, we use the database democustomer, which is hosted on a SQL Server on Amazon RDS. Use the following command and script to create the database and table, and insert 10 records:

create database democustomer

use democustomer

create table invoices (
	invoice_id INT,
	customer_id INT,
	billing_date DATE,
	due_date DATE,
	balance INT,
	monthly_kwh_use INT,
	total_amount_due VARCHAR(50)
);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (1, 1219578, '4/15/2022', '4/30/2022', 25, 6, 28);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (2, 1365142, '4/15/2022', '4/28/2022', null, 41, 20.5);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (3, 1368834, '4/15/2022', '5/5/2022', null, 31, 15.5);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (4, 1226431, '4/15/2022', '4/28/2022', null, 47, 23.5);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (5, 1499194, '4/15/2022', '5/1/2022', null, 39, 19.5);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (6, 1221240, '4/15/2022', '5/2/2022', null, 38, 19);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (7, 1235442, '4/15/2022', '4/27/2022', null, 50, 25);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (8, 1306894, '4/15/2022', '5/2/2022', null, 16, 8);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (9, 1343570, '4/15/2022', '5/3/2022', null, 39, 19.5);
insert into invoices (invoice_id, customer_id, billing_date, due_date, balance, monthly_kwh_use, total_amount_due) values (10, 1465198, '4/15/2022', '5/4/2022', null, 47, 23.5);

To capture the new records added to the table, enable MS-CDC (Microsoft Change Data Capture) using the following commands at the database level (replace SchemaName and TableName). This is required if ongoing replication is configured on the task migration in AWS DMS.

EXEC msdb.dbo.rds_cdc_enable_db 'democustomer';
GO
EXECUTE sys.sp_cdc_enable_table @source_schema = N'SchemaName', @source_name =N'TableName', @role_name = NULL;
GO
EXEC sys.sp_cdc_change_job @job_type = 'capture' ,@pollinginterval = 3599;
GO

You can use ongoing replication (CDC) for a self-managed SQL Server database on premises or on Amazon Elastic Compute Cloud (Amazon EC2), or a cloud database such as Amazon RDS or an Azure SQL managed instance. SQL Server must be configured for full backups, and you must perform a backup before beginning to replicate data.

For more information, see Using a Microsoft SQL Server database as a source for AWS DMS.

Configure the Kinesis data stream

Next, we configure our Kinesis data stream. For full instructions, see Creating a Stream via the AWS Management Console. Complete the following steps:

  1. On the Kinesis Data Streams console, choose Create data stream.
  2. For Data stream name¸ enter a name.
  3. For Capacity mode, select On-demand.When you choose on-demand capacity mode, Kinesis Data Streams instantly accommodates your workloads as they ramp up or down. For more information, refer to Choosing the Data Stream Capacity Mode.
  4. Choose Create data stream.
  5. When the data stream is active, copy the ARN.

Configure the IAM policy and role

Next, you configure your IAM policy and role.

  1. On the IAM console, choose Policies in the navigation pane.
  2. Choose Create policy.
  3. Select JSON and use the following policy as a template, replacing the data stream ARN:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "kinesis:PutRecord",
                    "kinesis:PutRecords",
                    "kinesis:DescribeStream"
                ],
                "Resource": "<streamArn>"
            }
        ]
    }

  4. In the navigation pane, choose Roles.
  5. Choose Create role.
  6. Select AWS DMS, then choose Next: Permissions.
  7. Select the policy you created.
  8. Assign a role name and then choose Create role.

Configure the Kinesis delivery stream

We use a Kinesis delivery stream to load the information from the Kinesis data stream to Amazon S3. To configure the delivery stream, complete the following steps:

  1. On the Kinesis console, choose Delivery streams.
  2. Choose Create delivery stream.
  3. For Source, choose Amazon Kinesis Data Streams.
  4. For Destination, choose Amazon S3.
  5. For Kinesis data stream, enter the ARN of the data stream.
  6. For Delivery stream name, enter a name.
  7. Leave the transform and convert options at their defaults.
  8. Provide the destination bucket and specify the bucket prefixes for the events and errors.
  9. Under Buffer hints, compression and encryption, change the buffer size to 1 MB and buffer interval to 60 seconds.
  10. Leave the other configurations at their defaults.

Configure AWS DMS

We use an AWS DMS instance to connect to the SQL Server database and then replicate the table and future transactions to a Kinesis data stream. In this section, we create a replication instance, source endpoint, target endpoint, and migration task. For more information about endpoints, refer to Creating source and target endpoints.

  1. Create a replication instance in a VPC with connectivity to the SQL Server database and associate a security group with enough permissions to access to the database.
  2. On the AWS DMS console, choose Endpoints in the navigation pane.
  3. Choose Create endpoint.
  4. Select Source endpoint.
  5. For Endpoint identifier, enter a label for the endpoint.
  6. For Source engine, choose Microsoft SQL Server.
  7. For Access to endpoint database, select Provide access information manually.
  8. Enter the endpoint database information.
  9. Test the connectivity to the source endpoint.
    Now we create the target endpoint.
  10. On the AWS DMS console, choose Endpoints in the navigation pane.
  11. Choose Create endpoint.
  12. Select Target endpoint.
  13. For Endpoint identifier, enter a label for the endpoint.
  14. For Target engine, choose Amazon Kinesis.
  15. Provide the AWS DMS service role ARN and the data stream ARN.
  16. Test the connectivity to the target endpoint.

    The final step is to create a database migration task. This task replicates the existing data from the SQL Server table to the data stream and replicates the ongoing changes. For more information, see Creating a task.
  17. On the AWS DMS console, choose Database migration tasks.
  18. Choose Create task.
  19. For Task identifier, enter a name for your task.
  20. For Replication instance, choose your instance.
  21. Choose the source and target database endpoints you created.
  22. For Migration type, choose Migrate existing data and replicate ongoing changes.
  23. In Task settings, use the default settings.
  24. In Table mappings, add a new selection rule and specify the schema and table name of the SQL Server database. In this case, our schema name is dbo and the table name is invoices.
  25. For Action, choose Include.

When the task is ready, the migration starts.

After the data has been loaded, the table statistics are updated and you can see the 10 records created initially.

As the Kinesis delivery stream reads the data from Kinesis Data Streams and loads it in Amazon S3, the records are available in the bucket you defined previously.

To check that AWS DMS ongoing replication and CDC are working, use this script to add 1,000 records to the table.

You can see 1,000 inserts on the Table statistics tab for the database migration task.

After about 1 minute, you can see the records in the S3 bucket.

At this point the replication has been activated, and a Lambda function can start consuming the data streams to send emails SMS to the customers through Amazon SNS. More information, refer to Using AWS Lambda with Amazon Kinesis.

Conclusion

With Kinesis Data Streams as an AWS DMS target, you now have a powerful way to stream change data from a database directly into a Kinesis data stream. You can use this method to stream change data from any sources supported by AWS DMS to perform real-time data processing. Happy streaming!

If you have any questions or suggestions, please leave a comment.


About the Authors

Luis Eduardo Torres is a Solutions Architect at AWS based in Bogotá, Colombia. He helps companies to build their business using the AWS cloud platform. He has a great interest in Analytics and has been leading the Analytics track of AWS Podcast in Spanish.

Sukhomoy Basak is a Solutions Architect at Amazon Web Services, with a passion for Data and Analytics solutions. Sukhomoy works with enterprise customers to help them architect, build, and scale applications to achieve their business outcomes.

A serverless operational data lake for retail with AWS Glue, Amazon Kinesis Data Streams, Amazon DynamoDB, and Amazon QuickSight

Post Syndicated from Gandhi Raketla original https://aws.amazon.com/blogs/big-data/a-serverless-operational-data-lake-for-retail-with-aws-glue-amazon-kinesis-data-streams-amazon-dynamodb-and-amazon-quicksight/

Do you want to reduce stockouts at stores? Do you want to improve order delivery timelines? Do you want to provide your customers with accurate product availability, down to the millisecond? A retail operational data lake can help you transform the customer experience by providing deeper insights into a variety of operational aspects of your supply chain.

In this post, we demonstrate how to create a serverless operational data lake using AWS services, including AWS Glue, Amazon Kinesis Data Streams, Amazon DynamoDB, Amazon Athena, and Amazon QuickSight.

Retail operations is a critical functional area that gives retailers a competitive edge. An efficient retail operation can optimize the supply chain for a better customer experience and cost reduction. An optimized retail operation can reduce frequent stockouts and delayed shipments, and provide accurate inventory and order details. Today, a retailer’s channels aren’t just store and web—they include mobile apps, chatbots, connected devices, and social media channels. The data is both structured and unstructured. This coupled with multiple fulfillment options like buy online and pick up at store, ship from store, or ship from distribution centers, which increases the complexity of retail operations.

Most retailers use a centralized order management system (OMS) for managing orders, inventory, shipments, payments, and other operational aspects. These legacy OMSs are unable to scale in response to the rapid changes in retail business models. The enterprise applications that are key for efficient and smooth retail operations rely on a central OMS. Applications for ecommerce, warehouse management, call centers, and mobile all require an OMS to get order status, inventory positions of different items, shipment status, and more. Another challenge with legacy OMSs is they’re not designed to handle unstructured data like weather data and IoT data that could impact inventory and order fulfillment. A legacy OMS that can’t scale prohibits you from implementing new business models that could transform your customer experience.

A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. An operational data lake addresses this challenge by providing easy access to structured and unstructured operational data in real time from various enterprise systems. You can store your data as is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning (ML)—to guide better decisions. This can ease the burden on OMSs that can instead focus on order orchestration and management.

Solution overview

In this post, we create an end-to-end pipeline to ingest, store, process, analyze, and visualize operational data like orders, inventory, and shipment updates. We use the following AWS services as key components:

  • Kinesis Data Streams to ingest all operational data in real time from various systems
  • DynamoDB, Amazon Aurora, and Amazon Simple Storage Service (Amazon S3) to store the data
  • AWS Glue DataBrew to clean and transform the data
  • AWS Glue crawlers to catalog the data
  • Athena to query the processed data
  • A QuickSight dashboard that provides insights into various operational metrics

The following diagram illustrates the solution architecture.

The data pipeline consists of stages to ingest, store, process, analyze, and finally visualize the data, which we discuss in more detail in the following sections.

Data ingestion

Orders and inventory data is ingested in real time from multiple sources like web applications, mobile apps, and connected devices into Kinesis Data Streams. Kinesis Data Streams is a massively scalable and durable real-time data streaming service. Kinesis Data Streams can continuously capture gigabytes of data per second from hundreds of thousands of sources, such as web applications, database events, inventory transactions, and payment transactions. Frontend systems like ecommerce applications and mobile apps ingest the order data as soon as items are added to a cart or an order is created. The OMS ingests orders when the order status changes. OMSs, stores, and third-party suppliers ingest inventory updates into the data stream.

To simulate orders, an AWS Lambda function is triggered by a scheduled Amazon CloudWatch event every minute to ingest orders to a data stream. This function simulates the typical order management system lifecycle (order created, scheduled, released, shipped, and delivered). Similarly, a second Lambda function is triggered by a CloudWatch event to generate inventory updates. This function simulates different inventory updates such as purchase orders created from systems like the OMS or third-party suppliers. In a production environment, this data would come from frontend applications and a centralized order management system.

Data storage

There are two types of data: hot and cold data. Hot data is consumed by frontend applications like web applications, mobile apps, and connected devices. The following are some example use cases for hot data:

  • When a customer is browsing products, the real-time availability of the item must be displayed
  • Customers interacting with Alexa to know the status of the order
  • A call center agent interacting with a customer needs to know the status of the customer order or its shipment details

The systems, APIs, and devices that consume this data need the data within seconds or milliseconds of the transactions.

Cold data is used for long-term analytics like orders over a period of time, orders by channel, top 10 items by number of orders, or planned vs. available inventory by item, warehouse, or store.

For this solution, we store orders hot data in DynamoDB. DynamoDB is a fully managed NoSQL database that delivers single-digit millisecond performance at any scale. A Lambda function processes records in the Kinesis data stream and stores it in a DynamoDB table.

Inventory hot data is stored in an Amazon Aurora MySQL-Compatible Edition database. Inventory is transactional data that requires high consistency so that customers aren’t over-promised or under-promised when they place orders. Aurora MySQL is fully managed database that is up to five times faster than standard MySQL databases and three times faster than standard PostgreSQL databases. It provides the security, availability, and reliability of commercial databases at a tenth of the cost.

Amazon S3 is object storage built to store and retrieve any amount of data from anywhere. It’s a simple storage service that offers industry-leading durability, availability, performance, security, and virtually unlimited scalability at very low cost. Order and inventory cold data is stored in Amazon S3.

Amazon Kinesis Data Firehose reads the data from the Kinesis data stream and stores it in Amazon S3. Kinesis Data Firehose is the easiest way to load streaming data into data stores and analytics tools. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, Amazon OpenSearch Service, and Splunk, enabling near-real-time analytics.

Data processing

The data processing stage involves cleaning, preparing, and transforming the data to help downstream analytics applications easily query the data. Each frontend system might have a different data format. In the data processing stage, data is cleaned and converted into a common canonical form.

For this solution, we use DataBrew to clean and convert orders into a common canonical form. DataBrew is a visual data preparation tool that makes it easy for data analysts and data scientists to prepare data with an interactive, point-and-click visual interface without writing code. DataBrew provides over 250 built-in transformations to combine, pivot, and transpose the data without writing code. The cleaning and transformation steps in DataBrew are called recipes. A scheduled DataBrew job applies the recipes to the data in an S3 bucket and stores the output in a different bucket.

AWS Glue crawlers can access data stores, extract metadata, and create table definitions in the AWS Glue Data Catalog. You can schedule a crawler to crawl the transformed data and create or update the Data Catalog. The AWS Glue Data Catalog is your persistent metadata store. It’s a managed service that lets you store, annotate, and share metadata in the AWS Cloud in the same way you would in an Apache Hive metastore. We use crawlers to populate the Data Catalog with tables.

Data analysis

We can query orders and inventory data from S3 buckets using Athena. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Views are created in Athena that can be consumed by business intelligence (BI) services like QuickSight.

Data visualization

We generate dashboards using QuickSight. QuickSight is a scalable, serverless, embeddable BI service powered by ML and built for the cloud. QuickSight lets you easily create and publish interactive BI dashboards that include ML-powered insights.

QuickSight also has features to forecast orders, detect anomalies in the order, and provide ML-powered insights. We can create analyses such as orders over a period of time, orders split by channel, top 10 locations for orders, or order fulfillment timelines (the time it took from order creation to order delivery).

Walkthrough overview

To implement this solution, you complete the following high-level steps:

  1. Create solution resources using AWS CloudFormation.
  2. Connect to the inventory database.
  3. Load the inventory database with tables.
  4. Create a VPC endpoint using Amazon Virtual Private Cloud (Amazon VPC).
  5. Create gateway endpoints for Amazon S3 on the default VPC.
  6. Enable CloudWatch rules via Amazon EventBridge to ingest the data.
  7. Transform the data using AWS Glue.
  8. Visualize the data with QuickSight.

Prerequisites

Complete the following prerequisite steps:

  1. Create AWS account if you don’t have done already.
  2. Sign up for QuickSight if you’ve never used QuickSight in this account before. To use the forecast ability in QuickSight, sign up for the Enterprise Edition.

Create resources with AWS CloudFormation

To launch the provided CloudFormation template, complete the following steps:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter a name.
  4. Provide the following parameters:
    1. The name of the S3 bucket that holds all the data for the data lake.
    2. The name of the database that holds the inventory tables.
    3. The database user name.
    4. The database password.
  5. Enter any tags you want to assign to the stack and choose Next.
  6. Select the acknowledgement check boxes and choose Create stack.

The stack takes 5–10 minutes to complete.

On the AWS CloudFormation console, you can navigate to the stack’s Outputs tab to review the resources you created.

If you open the S3 bucket you created, you can observe its folder structure. The stack creates sample order data for the last 7 days.

Connect to the inventory database

To connect to your database in the query editor, complete the following steps:

  1. On the Amazon RDS console, choose the Region you deployed the stack in.
  2. In the navigation pane, choose Query Editor.

    If you haven’t connected to this database before, the Connect to database page opens.
  3. For Database instance or cluster, choose your database.
  4. For Database username, choose Connect with a Secrets Manager ARN.
    The database user name and password provided during stack creation are stored in AWS Secrets Manager. Alternatively, you can choose Add new database credentials and enter the database user name and password you provided when creating the stack.
  5. For Secrets Manager ARN, enter the value for the key InventorySecretManager from the CloudFormation stack outputs.
  6. Optionally, enter the name of your database.
  7. Choose Connect to database.

Load the inventory database with tables

Enter the following DDL statement in the query editor and choose Run:

CREATE TABLE INVENTORY (
    ItemID varchar(25) NOT NULL,
    ShipNode varchar(25) NOT NULL,
    SupplyType varchar(25) NOT NULL,
    SupplyDemandType varchar(25) NOT NULL,
    ItemName varchar(25),
    UOM varchar(10),
    Quantity int(11) NOT NULL,
    ETA varchar(25)	 ,
    UpdatedDate DATE,
    PRIMARY KEY (ItemID,ShipNode,SupplyType)
);

Create a VPC endpoint

To create your VPC endpoint, complete the following steps:

  1. On the Amazon VPC console, choose VPC Dashboard.
  2. Choose Endpoints in the navigation pane.
  3. Choose Create Endpoint.
  4. For Service category, select AWS services.
  5. For Service name, search for rds and choose the service name ending with rds-data.
  6. For VPC, choose the default VPC.
  7. Leave the remaining settings at their default and choose Create endpoint.

Create a gateway endpoint for Amazon S3

To create your gateway endpoint, complete the following steps:

  1. On the Amazon VPC console, choose VPC Dashboard.
  2. Choose Endpoints in the navigation pane.
  3. Choose Create Endpoint.
  4. For Service category, select AWS services.
  5. For Service name, search for S3 and choose the service name with type Gateway.
  6. For VPC, choose the default VPC.
  7. For Configure route tables, select the default route table.
  8. Leave the remaining settings at their default and choose Create endpoint.

Wait for both the gateway endpoint and VPC endpoint status to change to Available.

Enable CloudWatch rules to ingest the data

We created two CloudWatch rules via the CloudFormation template to ingest the order and inventory data to Kinesis Data Streams. To enable the rules via EventBridge, complete the following steps:

  1. On the CloudWatch console, under Events in the navigation pane, choose Rules.
  2. Make sure you’re in the Region where you created the stack.
  3. Choose Go to Amazon EventBridge.
  4. Select the rule Ingest-Inventory-Update-Schedule-Rule and choose Enable.
  5. Select the rule Ingest-Order-Schedule-Rule and choose Enable.

After 5–10 minutes, the Lambda functions start ingesting orders and inventory updates to their respective streams. You can check the S3 buckets orders-landing-zone and inventory-landing-zone to confirm that the data is being populated.

Perform data transformation

Our CloudFormation stack included a DataBrew project, a DataBrew job that runs every 5 minutes, and two AWS Glue crawlers. To perform data transformation using our AWS Glue resources, complete the following steps:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Choose the project OrderDataTransform.

    You can review the project and its recipe on this page.
  3. In the navigation pane, choose Jobs.
  4. Review the job status to confirm it’s complete.
  5. On the AWS Glue console, choose Crawlers in the navigation pane.
    The crawlers crawl the transformed data and update the Data Catalog.
  6. Review the status of the two crawlers, which run every 15 minutes.
  7. Choose Tables in the navigation pane to view the two tables the crawlers created.
    If you don’t see these tables, you can run the crawlers manually to create them.

    You can query the data in the tables with Athena.
  8. On the Athena console, choose Query editor.
    If you haven’t created a query result location, you’re prompted to do that first.
  9. Choose View settings or choose the Settings tab.
  10. Choose Manage.
  11. Select the S3 bucket to store the results and choose Choose.
  12. Choose Query editor in the navigation pane.
  13. Choose either table (right-click) and choose Preview Table to view the table contents.

Visualize the data

If you have never used QuickSight in this account before, complete the prerequisite step to sign up for QuickSight. To use the ML capabilities of QuickSight (such as forecasting) sign up for the Enterprise Edition using the steps in this documentation.

While signing up for QuickSight, make sure to use the same region where you created the CloudFormation stack.

Grant QuickSight permissions

To visualize your data, you must first grant relevant permissions to QuickSight to access your data.

  1. On the QuickSight console, on the Admin drop-down menu, choose Manage QuickSight.
  2. In the navigation pane, choose Security & permissions.
  3. Under QuickSight access to AWS services, choose Manage.
  4. Select Amazon Athena.
  5. Select Amazon S3 to edit QuickSight access to your S3 buckets.
  6. Select the bucket you specified during stack creation (for this post, operational-datalake).
  7. Choose Finish.
  8. Choose Save.

Prepare the datasets

To prepare your datasets, complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. Choose Athena.
  4. For Data source name, enter retail-analysis.
  5. Choose Validate connection.
  6. After your connection is validated, choose Create data source.
  7. For Database, choose orderdatalake.
  8. For Tables, select orders_clean.
  9. Choose Edit/Preview data.
  10. For Query mode, select SPICE.
    SPICE (Super-fast, Parallel, In-memory Calculation Engine) is the robust in-memory engine that QuickSight uses.
  11. Choose the orderdatetime field (right-click), choose Change data type, and choose Date.
  12. Enter the date format as MM/dd/yyyy HH:mm:ss.
  13. Choose Validate and Update.
  14. Change the data types of the following fields to QuickSight geospatial data types:
    1. billingaddress.zipcode – Postcode
    2. billingaddress.city – City
    3. billingaddress.country – Country
    4. billingaddress.state – State
    5. shippingaddress.zipcode – Postcode
    6. shippingaddress.city – City
    7. shippingaddress.country – Country
    8. shippingaddress.state – State
  15. Choose Save & publish.
  16. Choose Cancel to exit this page.

    Let’s create another dataset for the Athena table inventory_landing_zone.
  17. Follow steps 1–7 to create a new dataset. For Table selection, choose inventory_landing_zone.
  18. Choose Edit/Preview data.
  19. For Query mode, select SPICE.
  20. Choose Save & publish.
  21. Choose Cancel to exit this page.

    Both datasets should now be listed on the Datasets page.
  22. Choose each dataset and choose Refresh now.
  23. Select Full refresh and choose Refresh.

To set up a scheduled refresh, choose Schedule a refresh and provide your schedule details.

Create an analysis

To create an analysis in QuickSight, complete the following steps:

  1. On the QuickSight console, choose Analyses in the navigation pane.
  2. Choose New analysis.
  3. Choose the orders_clean dataset.
  4. Choose Create analysis.
  5. To adjust the theme, choose Themes in the navigation pane, choose your preferred theme, and choose Apply.
  6. Name the analysis retail-analysis.

Add visualizations to the analysis

Let’s start creating visualizations. The first visualization shows orders created over time.

  1. Choose the empty graph on the dashboard and for Visual type¸ choose the line chart.
    For more information about visual types, see Visual types in Amazon QuickSight.
  2. Under Field wells, drag orderdatetime to X axis and ordernumber to Value.
  3. Set ordernumber to Aggregate: Count distinct.

    Now we can filter these orders by Created status.
  4. Choose Filter in the navigation pane and choose Create one.
  5. Search for and choose status.
  6. Choose the status filter you just created.
  7. Select Created from the filter list and choose Apply.
  8. Choose the graph (right-click) and choose Add forecast.
    The forecasting ability is only available in the Enterprise Edition. QuickSight uses a built-in version of the Random Cut Forest (RCF) algorithm. For more information, refer to Understanding the ML algorithm used by Amazon QuickSight.
  9. Leave the settings as default and choose Apply.
  10. Rename the visualization to “Orders Created Over Time.”

If the forecast is applied successfully, the visualization shows the expected number of orders as well as upper and lower bounds.

If you get the following error message, allow for the data to accumulate for a few days before adding the forecast.

Let’s create a visualization on orders by location.

  1. On the Add menu, choose Add visual.
  2. Choose the points on map visual type.
  3. Under Field wells, drag shippingaddress.zipcode to Geospatial and ordernumber to Size.
  4. Change ordernumber to Aggregate: Count distinct.

    You should now see a map indicating the orders by location.
  5. Rename the visualization accordingly.

    Next, we create a drill-down visualization on the inventory count.
  6. Choose the pencil icon.
  7. Choose Add dataset.
  8. Select the inventory_landing_zone dataset and choose Select.
  9. Choose the inventory_landing_zone dataset.
  10. Add the vertical bar chart visual type.
  11. Under Field wells, drag itemname, shipnode, and invtype to X axis, and quantity to Value.
  12. Make sure that quantity is set to Sum.

    The following screenshot shows an example visualization of order inventory.
  13. To determine how many face masks were shipped out from each ship node, choose Face Masks (right-click) and choose Drill down to shipnode.
  14. You can drill down even further to invtype to see how many face masks in a specific ship node are in which status.

The following screenshot shows this drilled-down inventory count.

As a next step, you can create a QuickSight dashboard from the analysis you created. For instructions, refer to Tutorial: Create an Amazon QuickSight dashboard.

Clean up

To avoid any ongoing charges, on the AWS CloudFormation console, select the stack you created and choose Delete. This deletes all the created resources. On the stack’s Events tab, you can track the progress of the deletion, and wait for the stack status to change to DELETE_COMPLETE.

The Amazon EventBridge rules generate orders and inventory data every 15 minutes, to avoid generating huge amount of data, please ensure to delete the stack after testing the blog.

If the deletion of any resources fails, ensure that you delete them manually. For deleting Amazon QuickSight datasets, you can follow these instructions. You can delete the QuickSight Analysis using these steps. For deleting the QuickSight subscription and closing the account, you can follow these instructions.

Conclusion

In this post, we showed you how to use AWS analytics and storage services to build a serverless operational data lake. Kinesis Data Streams lets you ingest large volumes of data, and DataBrew lets you cleanse and transform the data visually. We also showed you how to analyze and visualize the order and inventory data using AWS Glue, Athena, and QuickSight. For more information and resources for data lakes on AWS, visit Analytics on AWS.


About the Authors

Gandhi Raketla is a Senior Solutions Architect for AWS. He works with AWS customers and partners on cloud adoption, as well as architecting solutions that help customers foster agility and innovation. He specializes in the AWS data analytics domain.

Sindhura Palakodety is a Solutions Architect at AWS. She is passionate about helping customers build enterprise-scale Well-Architected solutions on the AWS Cloud and specializes in the containers and data analytics domains.

Introducing Protocol buffers (protobuf) schema support in Amazon Glue Schema Registry

Post Syndicated from Vikas Bajaj original https://aws.amazon.com/blogs/big-data/introducing-protocol-buffers-protobuf-schema-support-in-amazon-glue-schema-registry/

AWS Glue Schema Registry now supports Protocol buffers (protobuf) schemas in addition to JSON and Avro schemas. This allows application teams to use protobuf schemas to govern the evolution of streaming data and centrally control data quality from data streams to data lake. AWS Glue Schema Registry provides an open-source library that includes Apache-licensed serializers and deserializers for protobuf that integrate with Java applications developed for Apache Kafka, Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Kinesis Data Streams, and Kafka Streams. Similar to Avro and JSON schemas, Protocol buffers schemas also support compatibility modes, schema sourcing via metadata, auto-registration of schemas, and AWS Identity and Access Management (IAM) compatibility.

In this post, we focus on Protocol buffers schema support in AWS Glue Schema Registry and how to use Protocol buffers schemas in stream processing Java applications that integrate with Apache Kafka, Amazon Managed Streaming for Apache Kafka and Amazon Kinesis Data Streams

Introduction to Protocol buffers

Protocol buffers is a language and platform-neutral, extensible mechanism for serializing and deserializing structured data for use in communications protocols and data storage. A protobuf message format is defined in the .proto file. Protobuf is recommended over other data formats when you need language interoperability, faster serialization and deserialization, type safety, schema adherence between data producer and consumer applications, and reduced coding effort. With protobuf, you can use generated code from the schema using the protobuf compiler (protoc) to easily write and read your data to and from data streams using a variety of languages. You can also use build tools plugins such as Maven and Gradle to generate code from protobuf schemas as part of your CI/CD pipelines. ​We use the following schema for code examples in this post, which defines an employee with a gRPC service definition to find an employee by ID:

Employee.proto

syntax = "proto2";
package gsr.proto.post;

import "google/protobuf/wrappers.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/type/money.proto";

service EmployeeSearch {
    rpc FindEmployee(EmployeeSearchParams) returns (Employee);
}
message EmployeeSearchParams {
    required int32 id = 1;
}
message Employee {
    required int32 id = 1;
    required string name = 2;
    required string address = 3;
    required google.protobuf.Int32Value employee_age = 4;
    required google.protobuf.Timestamp start_date = 5;
    required google.protobuf.Duration total_time_span_in_company = 6;
    required google.protobuf.BoolValue is_certified = 7;
    required Team team = 8;
    required Project project = 9;
    required Role role = 10;
    required google.type.Money total_award_value = 11;
}
message Team {
    required string name = 1;
    required string location = 2;
}
message Project {
    required string name = 1;
    required string state = 2;
}
enum Role {
    MANAGER = 0;
    DEVELOPER = 1;
    ARCHITECT = 2;
}

AWS Glue Schema Registry supports both proto2 and proto3 syntax. The preceding protobuf schema using version 2 contains three message types: Employee, Team, and Project using scalar, composite, and enumeration data types. Each field in the message definitions has a unique number, which is used to identify fields in the message binary format, and should not be changed once your message type is in use. In a proto2 message, a field can be required, optional, or repeated; in proto3, the options are repeated and optional. The package declaration makes sure generated code is namespaced to avoid any collisions. In addition to scalar, composite, and enumeration types, AWS Glue Schema Registry also supports protobuf schemas with common types such as Money, PhoneNumber,Timestamp, Duration, and nullable types such as BoolValue and Int32Value. It also supports protobuf schemas with gRPC service definitions with compatibility rules, such as EmployeeSearch, in the preceding schema. To learn more about the Protocol buffers, refer to its documentation.

Supported Protocol buffers specification and features

AWS Glue Schema Registry supports all the features of Protocol buffers for versions 2 and 3 except for groups, extensions, and importing definitions. AWS Glue Schema Registry APIs and its open-source library supports the latest protobuf runtime version. The protobuf schema operations in AWS Glue Schema Registry are supported via the AWS Management Console, AWS Command Line Interface (AWS CLI), AWS Glue Schema Registry API, AWS SDK, and AWS CloudFormation.

How AWS Glue Schema Registry works

The following diagram illustrates a high-level view of how AWS Glue Schema Registry works. AWS Glue Schema Registry allows you to register and evolve JSON, Apache Avro, and Protocol buffers schemas with compatibility modes. You can register multiple versions of each schema as the business needs or stream processing application’s requirements evolve. The AWS Glue Schema Registry open-source library provides JSON, Avro, and protobuf serializers and deserializers that you configure in producer and consumer stream processing applications, as shown in the following diagram. The open-source library also supports optional compression and caching configuration to save on data transfers.

To accommodate various business use cases, AWS Glue Schema Registry supports multiple compatibility modes. For example, if a consumer application is updated to a new schema version but is still able to consume and process messages based on the previous version of the same schema, then the schema is backward-compatible. However, if a schema version has bumped up in the producer application and the consumer application is not updated yet but can still consume and process the old and new message, then the schema is configured as forward-compatible. For more information, refer to How the Schema Registry Works.

Create a Protocol buffers schema in AWS Glue Schema Registry

In this section, we create a protobuf schema in AWS Glue Schema Registry via the console and AWS CLI.

Create a schema via the console

Make sure you have the required AWS Glue Schema Registry IAM permissions.

  1. On the AWS Glue console, choose Schema registries in the navigation pane.
  2. Click Add registry.
  3. For Registry name, enter employee-schema-registry.
  4. Click Add Registry.
  5. After the registry is created, click Add schema to register a new schema.
  6. For Schema name, enter Employee.proto.

The schema must be either Employee.proto or Employee if the protobuf schema doesn’t have the options option java_multiple_files = true; and option java_outer_classname = "<Outer class name>"; and if you decide to use protobuf schema generated code (POJOs) in your stream processing applications. We cover this with an example in a subsequent section of this post.­ For more information on protobuf options, refer to Options.

  1. For Registry, choose the registry employee-schema-registry.
  2. For Data format, choose Protocol buffers.
  3. For Compatibility mode, choose Backward.

You can choose other compatibility modes as per your use case.

  1. For First schema version, enter the preceding protobuf schema, then click Create schema and version.

After the schema is registered successfully, its status will be Available, as shown in the following screenshot.

Create a schema via the AWS CLI

Make sure you have IAM credentials with AWS Glue Schema Registry permissions.

  1. Run the following AWS CLI command to create a schema registry employee-schema-registry (for this post, we use the Region us-east-2):
    aws glue create-registry \
    --registry-name employee-schema-registry \
    --region us-east-2

The AWS CLI command returns the newly created schema registry ARN in response.

  1. Copy the RegistryArn value from the response to use in the following AWS CLI command.
  2. In the following command, use the preceding protobuf schema and schema name Employee.proto:
    aws glue create-schema --schema-name Employee.proto \
    --registry-id RegistryArn=<Schema Registry ARN that you copied from response of create registry CLI command> \
    --compatibility BACKWARD \
    --data-format PROTOBUF \
    --schema-definition file:///<project-directory>/Employee.proto \
    --region us-east-2

You can also use AWS CloudFormation to create schemas in AWS Glue Schema Registry.

Using a Protocol buffers schema with Amazon MSK and Kinesis Data Streams

Like Apache Avro’s SpecificRecord and GenericRecord, protobuf also supports working with POJOs to ensure type safety and DynamicMessage to create generic data producer and consumer applications. The following examples showcase the use of a protobuf schema registered in AWS Glue Schema Registry with Kafka and Kinesis Data Streams producer and consumer applications.

Use a protobuf schema with Amazon MSK

Create an Amazon MSK or Apache Kafka cluster with a topic called protobuf-demo-topic. If creating an Amazon MSK cluster, you can use the console. For instructions, refer to Getting Started Using Amazon MSK.

Use protobuf schema-generated POJOs

To use protobuf schema-generated POJOs, complete the following steps:

  1. Install the protobuf compiler (protoc) on your local machine from GitHub and add it in the PATH variable.
  2. Add the following plugin configuration to your application’s pom.xml file. We use the xolstice protobuf Maven plugin for this post to generate code from the protobuf schema.
    <plugin>
       <!-- https://www.xolstice.org/protobuf-maven-plugin/usage.html -->
       <groupId>org.xolstice.maven.plugins</groupId>
       <artifactId>protobuf-maven-plugin</artifactId>
       <version>0.6.1</version>
       <configuration>
           <protoSourceRoot>${basedir}/src/main/resources/proto</protoSourceRoot>
           <outputDirectory>${basedir}/src/main/java</outputDirectory>
           <clearOutputDirectory>false</clearOutputDirectory>
       </configuration>
       <executions>
           <execution>
               <goals>
                   <goal>compile</goal>
               </goals>
           </execution>
       </executions>
    </plugin>

  3. Add the following dependencies to your application’s pom.xml file:
    <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
    <dependency>
       <groupId>com.google.protobuf</groupId>
       <artifactId>protobuf-java</artifactId>
       <version>3.19.4</version>
    </dependency>
    
    <!-- https://mvnrepository.com/artifact/software.amazon.glue/schema-registry-serde -->
    <dependency>
       <groupId>software.amazon.glue</groupId>
       <artifactId>schema-registry-serde</artifactId>
       <version>1.1.9</version>
    </dependency>	

  4. Create a schema registry employee-schema-registry in AWS Glue Schema Registry and register the Employee.proto protobuf schema with it. Name your schema Employee.proto (or Employee).
  5. Run the following command to generate the code from Employee.proto. Make sure you have the schema file in the ${basedir}/src/main/resources/proto directory or change it as per your application directory structure in the application’s pom.xml <protoSourceRoot> tag value:
    mvn clean compile

Next, we configure the Kafka producer publishing protobuf messages to the Kafka topic on Amazon MSK.

  1. Configure the Kafka producer properties:
private Properties getProducerConfig() {
    Properties props = new Properties();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
    props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.PROTOBUF.name());
    props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
    props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "employee-schema-registry");
    props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "Employee.proto");
    props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.POJO.getName());
    return props;
}

The VALUE_SERIALIZER_CLASS_CONFIG configuration specifies the AWS Glue Schema Registry serializer, which serializes the protobuf message.

  1. Use the schema-generated code (POJOs) to create a protobuf message:
    public EmployeeOuterClass.Employee createEmployeeRecord(int employeeId){
        EmployeeOuterClass.Employee employee =
                EmployeeOuterClass.Employee.newBuilder()
                        .setId(employeeId)
                        .setName("Dummy")
                        .setAddress("Melbourne, Australia")
                        .setEmployeeAge(Int32Value.newBuilder().setValue(32).build())
                        .setStartDate(Timestamp.newBuilder().setSeconds(235234532434L).build())
                        .setTotalTimeSpanInCompany(Duration.newBuilder().setSeconds(3453245345L).build())
                        .setIsCertified(BoolValue.newBuilder().setValue(true).build())
                        .setRole(EmployeeOuterClass.Role.ARCHITECT)
                        .setProject(EmployeeOuterClass.Project.newBuilder()
                                .setName("Protobuf Schema Demo")
                                .setState("GA").build())
                        .setTotalAwardValue(Money.newBuilder()
                                            .setCurrencyCode("USD")
                                            .setUnits(5)
                                            .setNanos(50000).build())
                        .setTeam(EmployeeOuterClass.Team.newBuilder()
                                .setName("Solutions Architects")
                                .setLocation("Australia").build()).build();
        return employee;
    }

  2. Publish the protobuf messages to the protobuf-demo-topic topic on Amazon MSK:
    public void startProducer() throws InterruptedException {
        String topic = "protobuf-demo-topic";
        KafkaProducer<String, EmployeeOuterClass.Employee> producer = new KafkaProducer<String, EmployeeOuterClass.Employee>(getProducerConfig());
        logger.info("Starting to send records...");
        int employeeId = 0;
        while(employeeId < 100)
        {
            EmployeeOuterClass.Employee person = createEmployeeRecord(employeeId);
            String key = "key-" + employeeId;
            ProducerRecord<String,  EmployeeOuterClass.Employee> record = new ProducerRecord<String,  EmployeeOuterClass.Employee>(topic, key, person);
            producer.send(record, new ProducerCallback());
            employeeId++;
        }
    }
    private class ProducerCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetaData, Exception e){
            if (e == null) {
                logger.info("Received new metadata. \n" +
                        "Topic:" + recordMetaData.topic() + "\n" +
                        "Partition: " + recordMetaData.partition() + "\n" +
                        "Offset: " + recordMetaData.offset() + "\n" +
                        "Timestamp: " + recordMetaData.timestamp());
            }
            else {
                logger.info("There's been an error from the Producer side");
                e.printStackTrace();
            }
        }
    }

  3. Start the Kafka producer:
    public static void main(String args[]) throws InterruptedException {
        ProducerProtobuf producer = new ProducerProtobuf();
        producer.startProducer();
    }

  4. In the Kafka consumer application’s pom.xml, add the same plugin and dependencies as the Kafka producer’s pom.xml.

Next, we configure the Kafka consumer consuming protobuf messages from the Kafka topic on Amazon MSK.

  1. Configure the Kafka consumer properties:
    private Properties getConsumerConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-consumer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
        props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.POJO.getName());
        return props;
    }

The VALUE_DESERIALIZER_CLASS_CONFIG config specifies the AWS Glue Schema Registry deserializer that deserializes the protobuf messages.

  1. Consume the protobuf message (as a POJO) from the protobuf-demo-topic topic on Amazon MSK:
    public void startConsumer() {
        logger.info("starting consumer...");
        String topic = "protobuf-demo-topic";
        KafkaConsumer<String, EmployeeOuterClass.Employee> consumer = new KafkaConsumer<String, EmployeeOuterClass.Employee>(getConsumerConfig());
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            final ConsumerRecords<String, EmployeeOuterClass.Employee> records = consumer.poll(Duration.ofMillis(1000));
            for (final ConsumerRecord<String, EmployeeOuterClass.Employee> record : records) {
                final EmployeeOuterClass.Employee employee = record.value();
                logger.info("Employee Id: " + employee.getId() + " | Name: " + employee.getName() + " | Address: " + employee.getAddress() +
                        " | Age: " + employee.getEmployeeAge().getValue() + " | Startdate: " + employee.getStartDate().getSeconds() +
                        " | TotalTimeSpanInCompany: " + employee.getTotalTimeSpanInCompany() +
                        " | IsCertified: " + employee.getIsCertified().getValue() + " | Team: " + employee.getTeam().getName() +
                        " | Role: " + employee.getRole().name() + " | Project State: " + employee.getProject().getState() +
                        " | Project Name: " + employee.getProject().getName() + "| Award currency code: " + employee.getTotalAwardValue().getCurrencyCode() +
                        " | Award units : " + employee.getTotalAwardValue().getUnits() + " | Award nanos " + employee.getTotalAwardValue().getNanos());
            }
        }
    }

  2. Start the Kafka consumer:
    public static void main(String args[]){
        ConsumerProtobuf consumer = new ConsumerProtobuf();
        consumer.startConsumer();
    }

Use protobuf’s DynamicMessage

You can use DynamicMessage to create generic producer and consumer applications without generating the code from the protobuf schema. To use DynamicMessage, you first need to create a protobuf schema file descriptor.

  1. Generate a file descriptor from the protobuf schema using the following command:
    protoc --include_imports --proto_path=proto --descriptor_set_out=proto/Employeeproto.desc proto/Employee.proto

The option --descritor_set_out has the descriptor file name that this command generates. The protobuf schema Employee.proto is in the proto directory.

  1. Make sure you have created a schema registry and registered the preceding protobuf schema with it.

Now we configure the Kafka producer publishing DynamicMessage to the Kafka topic on Amazon MSK.

  1. Create the Kafka producer configuration. The PROTOBUF_MESSAGE_TYPE configuration is DYNAMIC_MESSAGE instead of POJO.
    private Properties getProducerConfig() {
       Properties props = new Properties();
       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
       props.put(ProducerConfig.ACKS_CONFIG, "-1");
       props.put(ProducerConfig.CLIENT_ID_CONFIG,"protobuf-dynamicmessage-record-producer");
       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
       props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,GlueSchemaRegistryKafkaSerializer.class.getName());
       props.put(AWSSchemaRegistryConstants.DATA_FORMAT, DataFormat.PROTOBUF.name());
       props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
       props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "employee-schema-registry");
       props.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "Employee.proto");
       props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.DYNAMIC_MESSAGE.getName());
       return props;
        }

  2. Create protobuf dynamic messages and publish them to the Kafka topic on Amazon MSK:
    public void startProducer() throws Exception {
        Descriptor desc = getDescriptor();
        String topic = "protobuf-demo-topic";
        KafkaProducer<String, DynamicMessage> producer = new KafkaProducer<String, DynamicMessage>(getProducerConfig());
        logger.info("Starting to send records...");
        int i = 0;
        while (i < 100) {
            DynamicMessage dynMessage = DynamicMessage.newBuilder(desc)
                    .setField(desc.findFieldByName("id"), 1234)
                    .setField(desc.findFieldByName("name"), "Dummy Name")
                    .setField(desc.findFieldByName("address"), "Melbourne, Australia")
                    .setField(desc.findFieldByName("employee_age"), Int32Value.newBuilder().setValue(32).build())
                    .setField(desc.findFieldByName("start_date"), Timestamp.newBuilder().setSeconds(235234532434L).build())
                    .setField(desc.findFieldByName("total_time_span_in_company"), Duration.newBuilder().setSeconds(3453245345L).build())
                    .setField(desc.findFieldByName("is_certified"), BoolValue.newBuilder().setValue(true).build())
    		.setField(desc.findFieldByName("total_award_value"), Money.newBuilder().setCurrencyCode("USD")
    						.setUnits(1).setNanos(50000).build())
                    .setField(desc.findFieldByName("team"), createTeam(desc.findFieldByName("team").getMessageType()))
                    .setField(desc.findFieldByName("project"), createProject(desc.findFieldByName("project").getMessageType()))
                    .setField(desc.findFieldByName("role"), desc.findFieldByName("role").getEnumType().findValueByName("ARCHITECT"))
                    .build();
            String key = "key-" + i;
            ProducerRecord<String, DynamicMessage> record = new ProducerRecord<String, DynamicMessage>(topic, key, dynMessage);
            producer.send(record, new ProtobufProducer.ProducerCallback());
            Thread.sleep(1000);
            i++;
        }
    }
    private static DynamicMessage createTeam(Descriptor desc) {
        DynamicMessage dynMessage = DynamicMessage.newBuilder(desc)
                .setField(desc.findFieldByName("name"), "Solutions Architects")
                .setField(desc.findFieldByName("location"), "Australia")
                .build();
        return dynMessage;
    }
    
    private static DynamicMessage createProject(Descriptor desc) {
        DynamicMessage dynMessage = DynamicMessage.newBuilder(desc)
                .setField(desc.findFieldByName("name"), "Protobuf Schema Demo")
                .setField(desc.findFieldByName("state"), "GA")
                .build();
        return dynMessage;
    }
    
    private class ProducerCallback implements Callback {
        @Override
        public void onCompletion(RecordMetadata recordMetaData, Exception e) {
            if (e == null) {
                logger.info("Received new metadata. \n" +
                        "Topic:" + recordMetaData.topic() + "\n" +
                        "Partition: " + recordMetaData.partition() + "\n" +
                        "Offset: " + recordMetaData.offset() + "\n" +
                        "Timestamp: " + recordMetaData.timestamp());
            } else {
                logger.info("There's been an error from the Producer side");
                e.printStackTrace();
            }
        }
    }

  3. Create a descriptor using the Employeeproto.desc file that we generated from the Employee.proto schema file in the previous steps:
    private Descriptor getDescriptor() throws Exception {
        InputStream inStream = ProtobufProducer.class.getClassLoader().getResourceAsStream("proto/Employeeproto.desc");
        DescriptorProtos.FileDescriptorSet fileDescSet = DescriptorProtos.FileDescriptorSet.parseFrom(inStream);
        Map<String, DescriptorProtos.FileDescriptorProto> fileDescProtosMap = new HashMap<String, DescriptorProtos.FileDescriptorProto>();
        List<DescriptorProtos.FileDescriptorProto> fileDescProtos = fileDescSet.getFileList();
        for (DescriptorProtos.FileDescriptorProto fileDescProto : fileDescProtos) {
            fileDescProtosMap.put(fileDescProto.getName(), fileDescProto);
        }
        DescriptorProtos.FileDescriptorProto fileDescProto = fileDescProtosMap.get("Employee.proto");
        FileDescriptor[] dependencies = getProtoDependencies(fileDescProtosMap, fileDescProto);
        FileDescriptor fileDesc = FileDescriptor.buildFrom(fileDescProto, dependencies);
        Descriptor desc = fileDesc.findMessageTypeByName("Employee");
        return desc;
    }
    
    public static FileDescriptor[] getProtoDependencies(Map<String, FileDescriptorProto> fileDescProtos, 
    				  FileDescriptorProto fileDescProto) throws Exception {
    
        if (fileDescProto.getDependencyCount() == 0)
            return new FileDescriptor[0];
    
        ProtocolStringList dependencyList = fileDescProto.getDependencyList();
        String[] dependencyArray = dependencyList.toArray(new String[0]);
        int noOfDependencies = dependencyList.size();
    
        FileDescriptor[] dependencies = new FileDescriptor[noOfDependencies];
        for (int i = 0; i < noOfDependencies; i++) {
            FileDescriptorProto dependencyFileDescProto = fileDescProtos.get(dependencyArray[i]);
            FileDescriptor dependencyFileDesc = FileDescriptor.buildFrom(dependencyFileDescProto, 
    					     getProtoDependencies(fileDescProtos, dependencyFileDescProto));
            dependencies[i] = dependencyFileDesc;
        }
        return dependencies;
    }

  4. Start the Kafka producer:
    public static void main(String args[]) throws InterruptedException {
    	 ProducerProtobuf producer = new ProducerProtobuf();
             producer.startProducer();
    }

Now we configure the Kafka consumer consuming dynamic messages from the Kaka topic on Amazon MSK.

  1. Enter the following Kafka consumer configuration:
    private Properties getConsumerConfig() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "protobuf-record-consumer");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
        props.put(AWSSchemaRegistryConstants.AWS_REGION,"us-east-2");
        props.put(AWSSchemaRegistryConstants.PROTOBUF_MESSAGE_TYPE, ProtobufMessageType.DYNAMIC_MESSAGE.getName());
        return props;
    }

  2. Consume protobuf dynamic messages from the Kafka topic protobuf-demo-topic. Because we’re using DYNAMIC_MESSAGE, the retrieved objects are of type DynamicMessage.
    public void startConsumer() {
        logger.info("starting consumer...");
        String topic = "protobuf-demo-topic";
        KafkaConsumer<String, DynamicMessage> consumer = new KafkaConsumer<String, DynamicMessage>(getConsumerConfig());
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
            final ConsumerRecords<String, DynamicMessage> records = consumer.poll(Duration.ofMillis(1000));
            for (final ConsumerRecord<String, DynamicMessage> record : records) {
                for (Descriptors.FieldDescriptor field : record.value().getAllFields().keySet()) {
                    logger.info(field.getName() + ": " + record.value().getField(field));
                }
            }
        }
    }

  3. Start the Kafka consumer:
    public static void main(String args[]){
            ConsumerProtobuf consumer = new ConsumerProtobuf();
            consumer.startConsumer();
         }

Use a protobuf schema with Kinesis Data Streams

You can use the protobuf schema-generated POJOs with the Kinesis Producer Library (KPL) and Kinesis Client Library (KCL).

  1. Install the protobuf compiler (protoc) on your local machine from GitHub and add it in the PATH variable.
  2. Add the following plugin configuration to your application’s pom.xml file. We’re using the xolstice protobuf Maven plugin for this post to generate code from the protobuf schema.
    <plugin>
       <!-- https://www.xolstice.org/protobuf-maven-plugin/usage.html -->
       <groupId>org.xolstice.maven.plugins</groupId>
       <artifactId>protobuf-maven-plugin</artifactId>
       <version>0.6.1</version>
       <configuration>
           <protoSourceRoot>${basedir}/src/main/resources/proto</protoSourceRoot>
           <outputDirectory>${basedir}/src/main/java</outputDirectory>
           <clearOutputDirectory>false</clearOutputDirectory>
       </configuration>
       <executions>
           <execution>
               <goals>
                   <goal>compile</goal>
               </goals>
           </execution>
       </executions>
    </plugin>

  3. Because the KPL and KCL latest versions have the AWS Glue Schema Registry open-source library (schema-registry-serde) and protobuf runtime (protobuf-java) included, you only need to add the following dependencies to your application’s pom.xml:
    <!-- https://mvnrepository.com/artifact/com.amazonaws/amazon-kinesis-producer -->
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>amazon-kinesis-producer</artifactId>
        <version>0.14.11</version>
    	</dependency>
    	<!-- https://mvnrepository.com/artifact/software.amazon.kinesis/amazon-kinesis-client -->
    <dependency>
        <groupId>software.amazon.kinesis</groupId>
        <artifactId>amazon-kinesis-client</artifactId>
        <version>2.4.0version>
    </dependency>

  4. Create a schema registry employee-schema-registry and register the Employee.proto protobuf schema with it. Name your schema Employee.proto (or Employee).
  5. Run the following command to generate the code from Employee.proto. Make sure you have the schema file in the ${basedir}/src/main/resources/proto directory or change it as per your application directory structure in the application’s pom.xml <protoSourceRoot> tag value.
    mvn clean compile

The following Kinesis producer code with the KPL uses the Schema Registry open-source library to publish protobuf messages to Kinesis Data Streams.

  1. Start the Kinesis Data Streams producer:
    private static final String PROTO_SCHEMA_FILE = "proto/Employee.proto";
    private static final String SCHEMA_NAME = "Employee.proto";
    private static String REGION_NAME = "us-east-2";
    private static String REGISTRY_NAME = "employee-schema-registry";
    private static String STREAM_NAME = "employee_data_stream";
    private static int NUM_OF_RECORDS = 100;
    private static String REGISTRY_ENDPOINT = "https://glue.us-east-2.amazonaws.com";
    
    public static void main(String[] args) throws Exception {
         ProtobufKPLProducer producer = new ProtobufKPLProducer();
         producer.startProducer();
     }
    }

  2. Configure the Kinesis producer:
public void startProducer() throws Exception {
    logger.info("Starting KPL client with Glue Schema Registry Integration...");
    GlueSchemaRegistryConfiguration schemaRegistryConfig = new GlueSchemaRegistryConfiguration(REGION_NAME);
    schemaRegistryConfig.setCompressionType(AWSSchemaRegistryConstants.COMPRESSION.ZLIB);
    schemaRegistryConfig.setSchemaAutoRegistrationEnabled(false);
    schemaRegistryConfig.setCompatibilitySetting(Compatibility.BACKWARD);
    schemaRegistryConfig.setEndPoint(REGISTRY_ENDPOINT);
    schemaRegistryConfig.setProtobufMessageType(ProtobufMessageType.POJO);
    schemaRegistryConfig.setRegistryName(REGISTRY_NAME);
	
    //Setting Glue Schema Registry configuration in Kinesis Producer Configuration along with other configs
    KinesisProducerConfiguration config = new KinesisProducerConfiguration()
                                        .setRecordMaxBufferedTime(3000)
                                        .setMaxConnections(1)
                                        .setRequestTimeout(60000)
                                        .setRegion(REGION_NAME)
                                        .setRecordTtl(60000)
                                        .setGlueSchemaRegistryConfiguration(schemaRegistryConfig);

    FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {
        @Override public void onFailure(Throwable t) {
              t.printStackTrace();
        };
        @Override public void onSuccess(UserRecordResult result) {
            logger.info("record sent successfully. Sequence Number: " + result.getSequenceNumber() + " | Shard Id : " + result.getShardId());
        };
    };
    
	//Creating schema definition object from the Employee.proto schema file.
    Schema gsrSchema = getSchemaDefinition();
    final KinesisProducer producer = new KinesisProducer(config);
    int employeeCount = 1;
    while(true) {
        //Creating and serializing schema generated POJO object (protobuf message)

        EmployeeOuterClass.Employee employee = createEmployeeRecord(employeeCount);
        byte[] serializedBytes = employee.toByteArray();
        ByteBuffer data = ByteBuffer.wrap(serializedBytes);
        Instant timestamp = Instant.now();

        //Publishing protobuf message to the Kinesis Data Stream
        ListenableFuture<UserRecordResult> f =
                    producer.addUserRecord(STREAM_NAME,
                                        Long.toString(timestamp.toEpochMilli()),
                                        new BigInteger(128, new Random()).toString(10),
                                        data,
                                        gsrSchema);
        Futures.addCallback(f, myCallback, MoreExecutors.directExecutor());
        employeeCount++;
        if(employeeCount > NUM_OF_RECORDS)
            break;
    }
    List<Future<UserRecordResult>> putFutures = new LinkedList<>();
    for (Future<UserRecordResult> future : putFutures) {
        UserRecordResult userRecordResult = future.get();
        logger.info(userRecordResult.getShardId() + userRecordResult.getSequenceNumber());
    }
}
  1. Create a protobuf message using schema-generated code (POJOs):
    public EmployeeOuterClass.Employee createEmployeeRecord(int count){
        EmployeeOuterClass.Employee employee =
                EmployeeOuterClass.Employee.newBuilder()
                .setId(count)
                .setName("Dummy")
                .setAddress("Melbourne, Australia")
                .setEmployeeAge(Int32Value.newBuilder().setValue(32).build())
                .setStartDate(Timestamp.newBuilder().setSeconds(235234532434L).build())
                .setTotalTimeSpanInCompany(Duration.newBuilder().setSeconds(3453245345L).build())
                .setIsCertified(BoolValue.newBuilder().setValue(true).build())
                .setRole(EmployeeOuterClass.Role.ARCHITECT)
                .setProject(EmployeeOuterClass.Project.newBuilder()
                            .setName("Protobuf Schema Demo")
                            .setState("GA").build())
                .setTotalAwardValue(Money.newBuilder()
                            .setCurrencyCode("USD")
                            .setUnits(5)
                            .setNanos(50000).build())
                .setTeam(EmployeeOuterClass.Team.newBuilder()
                            .setName("Solutions Architects")
                            .setLocation("Australia").build()).build();
        return employee;
    }

  2. Create the schema definition from Employee.proto:
    private Schema getSchemaDefinition() throws IOException {
        InputStream inputStream = ProtobufKPLProducer.class.getClassLoader().getResourceAsStream(PROTO_SCHEMA_FILE);
        StringBuilder resultStringBuilder = new StringBuilder();
        try (BufferedReader br = new BufferedReader(new InputStreamReader(inputStream))) {
            String line;
            while ((line = br.readLine()) != null) {
                resultStringBuilder.append(line).append("\n");
            }
        }
        String schemaDefinition = resultStringBuilder.toString();
        logger.info("Schema Definition " + schemaDefinition);
        Schema gsrSchema =
                new Schema(schemaDefinition, DataFormat.PROTOBUF.toString(), SCHEMA_NAME);
        return gsrSchema;
    }

The following is the Kinesis consumer code with the KCL using the Schema Registry open-source library to consume protobuf messages from the Kinesis Data Streams.

  1. Initialize the application:
    public void run(){
        logger.info("Starting KCL client with Glue Schema Registry Integration...");
        Region region = Region.of(ObjectUtils.firstNonNull(REGION_NAME, "us-east-2"));
        KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
    
        EmployeeRecordProcessorFactory employeeRecordProcessorFactory = new EmployeeRecordProcessorFactory();
        ConfigsBuilder configsBuilder =
                new ConfigsBuilder(STREAM_NAME,
                        APPLICATION_NAME,
                        kinesisClient,
                        dynamoClient,
                        cloudWatchClient,
                        APPLICATION_NAME,
                        employeeRecordProcessorFactory);
    
        //Creating Glue Schema Registry configuration and Glue Schema Registry Deserializer object.
        GlueSchemaRegistryConfiguration gsrConfig = new GlueSchemaRegistryConfiguration(region.toString());
        gsrConfig.setEndPoint(REGISTRY_ENDPOINT);
        gsrConfig.setProtobufMessageType(ProtobufMessageType.POJO);
        GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer =
                new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), gsrConfig);
        /*
         Setting Glue Schema Registry deserializer in the Retrieval Config for
         Kinesis Client Library to use it while deserializing the protobuf messages.
         */
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(STREAM_NAME, kinesisClient));
        retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
    
        Scheduler scheduler = new Scheduler(
                		configsBuilder.checkpointConfig(),
                		configsBuilder.coordinatorConfig(),
               		configsBuilder.leaseManagementConfig(),
                		configsBuilder.lifecycleConfig(),
                		configsBuilder.metricsConfig(),
                		configsBuilder.processorConfig(),
                		retrievalConfig);
    
        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    
        logger.info("Press enter to shutdown");
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        try {
            reader.readLine();
            Future<Boolean> gracefulShutdownFuture = scheduler.startGracefulShutdown();
            logger.info("Waiting up to 20 seconds for shutdown to complete.");
            gracefulShutdownFuture.get(20, TimeUnit.SECONDS);
        } catch (Exception e) {
            logger.info("Interrupted while waiting for graceful shutdown. Continuing.");
        }
        logger.info("Completed, shutting down now.");
    }

  2. Consume protobuf messages from Kinesis Data Streams:
    public static class EmployeeRecordProcessorFactory implements ShardRecordProcessorFactory {
        @Override
        public ShardRecordProcessor shardRecordProcessor() {
            return new EmployeeRecordProcessor();
        }
    }
    public static class EmployeeRecordProcessor implements ShardRecordProcessor {
        private static final Logger logger = Logger.getLogger(EmployeeRecordProcessor.class.getSimpleName());
        public void initialize(InitializationInput initializationInput) {}
        public void processRecords(ProcessRecordsInput processRecordsInput) {
            try {
                logger.info("Processing " + processRecordsInput.records().size() + " record(s)");
                for (KinesisClientRecord r : processRecordsInput.records()) {
    			
                    //Deserializing protobuf message into schema generated POJO
                    EmployeeOuterClass.Employee employee = EmployeeOuterClass.Employee.parseFrom(r.data().array());
                    
                   logger.info("Processed record: " + employee);
                    logger.info("Employee Id: " + employee.getId() + " | Name: "  + employee.getName() + " | Address: " + employee.getAddress() +
                            " | Age: " + employee.getEmployeeAge().getValue() + " | Startdate: " + employee.getStartDate().getSeconds() +
                            " | TotalTimeSpanInCompany: " + employee.getTotalTimeSpanInCompany() +
                            " | IsCertified: " + employee.getIsCertified().getValue() + " | Team: " + employee.getTeam().getName() +
                            " | Role: " + employee.getRole().name() + " | Project State: " + employee.getProject().getState() +
                            " | Project Name: " + employee.getProject().getName() + " | Award currency code: " +    
                           employee.getTotalAwardValue().getCurrencyCode() + " | Award units : " + employee.getTotalAwardValue().getUnits() + 
    		      " | Award nanos " + employee.getTotalAwardValue().getNanos());
                }
            } catch (Exception e) {
                logger.info("Failed while processing records. Aborting" + e);
                Runtime.getRuntime().halt(1);
            }
        }
        public void leaseLost(LeaseLostInput leaseLostInput) {. . .}
        public void shardEnded(ShardEndedInput shardEndedInput) {. . .}
        public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {. . .}
    }

  3. Start the Kinesis Data Streams consumer:
    private static final Logger logger = Logger.getLogger(ProtobufKCLConsumer.class.getSimpleName());
    private static String REGION_NAME = "us-east-2";
    private static String STREAM_NAME = "employee_data_stream";
    private static final String APPLICATION_NAME =  "protobuf-demo-kinesis-kpl-consumer";
    private static String REGISTRY_ENDPOINT = "https://glue.us-east-2.amazonaws.com";
    
    public static void main(String[] args) throws ParseException {
        new ProtobufKCLConsumer().run();
    }
    

Enhance your protobuf schema

We covered examples of data producer and consumer applications integrating with Amazon MSK, Apache Kafka, and Kinesis Data Streams, and using a Protocol buffers schema registered with AWS Glue Schema Registry. You can further enhance these examples with schema evolution using the following rules, which are supported by AWS Glue Schema Registry. For example, the following protobuf schema shown is a backward-compatible updated version of Employee.proto. We have added another gRPC service definition CreateEmployee under EmployeeSearch and added an Optional field in the Employee message type. If you upgrade the consumer application with this version of the protobuf schema, the consumer application can still consume old and new protobuf messages.

Employee.proto (version-2)

syntax = "proto2";
package gsr.proto.post;

import "google/protobuf/wrappers.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";
import "google/type/money.proto";

service EmployeeSearch {
    rpc FindEmployee(EmployeeSearchParams) returns (Employee);
    rpc CreateEmployee(EmployeeSearchParams) returns (google.protobuf.Empty);
}
message EmployeeSearchParams {
    required int32 id = 1;
}
message Employee {
    required int32 id = 1;
    required string name = 2;
    required string address = 3;
    required google.protobuf.Int32Value employee_age = 4;
    required google.protobuf.Timestamp start_date = 5;
    required google.protobuf.Duration total_time_span_in_company = 6;
    required google.protobuf.BoolValue is_certified = 7;
    required Team team = 8;
    required Project project = 9;
    required Role role = 10;
    required google.type.Money total_award_value = 11;
    optional string title = 12;
}
message Team {
    required string name = 1;
    required string location = 2;
}
message Project {
    required string name = 1;
    required string state = 2;
}
enum Role {
    MANAGER = 0;
    DEVELOPER = 1;
    ARCHITECT = 2;
}

Conclusion

In this post, we introduced Protocol buffers schema support in AWS Glue Schema Registry. AWS Glue Schema Registry now supports Apache Avro, JSON, and Protocol buffers schemas with different compatible modes. The examples in this post demonstrated how to use Protocol buffers schemas registered with AWS Glue Schema Registry in stream processing applications integrated with Apache Kafka, Amazon MSK, and Kinesis Data Streams. We used the schema-generated POJOs for type safety and protobuf’s DynamicMessage to create generic producer and consumer applications. The examples in this post contain the basic components of the stream processing pattern; you can adapt these examples to your use case needs.

To learn more, refer to the following resources:


About the Author

Vikas Bajaj is a Principal Solutions Architect at AWS. Vikas works with digital native customers and advises them on technology architecture and solutions to meet strategic business objectives.

Mainframe offloading and modernization: Using mainframe data to build cloud native services with AWS

Post Syndicated from Malathi Pinnamaneni original https://aws.amazon.com/blogs/architecture/mainframe-offloading-and-modernization-using-mainframe-data-to-build-cloud-native-services-with-aws/

Many companies in the financial services and insurance industries rely on mainframes for their most business-critical applications and data. But mainframe workloads typically lack agility. This is one reason that organizations struggle to innovate, iterate, and pivot quickly to develop new applications or release new capabilities. Unlocking this mainframe data can be the first step in your modernization journey.

In this blog post, we will discuss some typical offloading patterns. Whether your goal is developing new applications using mainframe data or modernizing with the Strangler Fig Application pattern, you might want some guidance on how to begin.

Refactoring mainframe applications to the cloud

Refactoring mainframe applications to cloud-native services on AWS is a common industry pattern and a long-term goal for many companies to remain competitive. But this takes an investment of time, money, and organizational change management to realize the full benefits. We see customers start their modernization journey by offloading data from the mainframe to AWS to reduce risks and create new capabilities.

The mainframe data offloading patterns that we will discuss in this post use software services that facilitate data replication to Amazon Web Services (AWS):

  • File-based data synchronization
  • Change data capture
  • Event-sourced replication

Once data is liberated from the mainframe, you can develop new agile applications for deeper insights using analytics and machine learning (ML). You could create a microservices-based, or voice-based mobile application. For example, if a bank could access their historical mainframe data to analyze customer behavior, they could develop a new solution based on profiles to use for loan recommendations.

The patterns we illustrate can be used as a reference to begin your modernization efforts with reduced risk. The long-term goal is to rewrite the mainframe applications and modernize them workload by workload.

Solution overview: Mainframe offloading and modernization

This figure shows the flow of data being replicated from mainframe using integration services and consumed in AWS

Figure 1. Mainframe offloading and modernization conceptual flow

Mainframe modernization: Architecture reference patterns

File-based batch integration

Modernization scenarios often require replicating files to AWS, or synchronizing between on-premises and AWS. Use cases include:

  • Analyzing current and historical data to enhance business analytics
  • Providing data for further processing on downstream or upstream dependent systems. This is necessary for exchanging data between applications running on the mainframe and applications running on AWS
This diagram shows a file-based integration pattern on how data can be replicated to AWS for interactive data analytics

Figure 2. File-based batch ingestion pattern for interactive data analytics

File-based batch integration – Batch ingestion for interactive data analytics (Figure 2)

  1. Data ingestion. In this example, we show how data can be ingested to Amazon S3 using AWS Transfer Family Services or AWS DataSync. Mainframe data is typically encoded in extended binary-coded decimal interchange code (EBCDIC) format. Prescriptive guidance exists to convert EBCDIC to ASCII format.
  2. Data transformation. Before moving data to AWS data stores, transformation of the data may be necessary to use it for analytics. AWS analytics services like AWS Glue and AWS Lambda can be used to transform the data. For large volume processing, use Apache Spark on AWS Elastic Map Reduce (Amazon EMR), or a custom Spring Boot application running on Amazon EC2 to perform these transformations. This process can be orchestrated using AWS Step Functions or AWS Data Pipeline.
  3. Data store. Data is transformed into a consumable format that can be stored in Amazon S3.
  4. Data consumption. You can use AWS analytics services like Amazon Athena for interactive ad-hoc query access, Amazon QuickSight for analytics, and Amazon Redshift for complex reporting and aggregations.
This diagram shows a file-based integration pattern on how data can be replicated to AWS for further processing by downstream systems

Figure 3. File upload to operational data stores for further processing

File-based batch integration – File upload to operational data stores for further processing (Figure 3)

  1. Using AWS File Transfer Services, upload CSV files to Amazon S3.
  2. Once the files are uploaded, S3’s event notification can invoke AWS Lambda function to load to Amazon Aurora. For low latency data access requirements, you can use a scalable serverless import pattern with AWS Lambda and Amazon SQS to load into Amazon DynamoDB.
  3. Once the data is in data stores, it can be consumed for further processing.

Transactional replication-based integration (Figure 4)

Several modernization scenarios require continuous near-real-time replication of relational data to keep a copy of the data in the cloud. Change Data Capture (CDC) for near-real-time transactional replication works by capturing change log activity to drive changes in the target dataset. Use cases include:

  • Command Query Responsibility Segregation (CQRS) architectures that use AWS to service all read-only and retrieve functions
  • On-premises systems with tightly coupled applications that require a phased modernization
  • Real-time operational analytics
This diagram shows a transaction-based replication (CDC) integration pattern on how data can be replicated to AWS for building reporting and read-only functions

Figure 4. Transactional replication (CDC) pattern

  1. Partner CDC tools in the AWS Marketplace can be used to manage real-time data movement between the mainframe and AWS.
  2. You can use a fan-out pattern to read once from the mainframe to reduce processing requirements and replicate data to multiple data stores based on your requirements:
    • For low latency requirements, replicate to Amazon Kinesis Data Streams and use AWS Lambda to store in Amazon DynamoDB.
    • For critical business functionality with complex logic, use Amazon Aurora or Amazon Relational Database Service (RDS) as targets.
    • To build data lake or use as an intermediary for ETL processing, customers can replicate to S3 as target.
  3. Once the data is in AWS, customers can build agile microservices for read-only functions.

Message-oriented middleware (event sourcing) integration (Figure 5)

With message-oriented middleware (MOM) systems like IBM MQ on mainframe, several modernization scenarios require integrating with cloud-based streaming and messaging services. These act as a buffer to keep your data in sync. Use cases include:

  • Consume data from AWS data stores to enable new communication channels. Examples of new channels can be mobile or voice-based applications and can be innovations based on ML
  • Migrate the producer (senders) and consumer (receivers) applications communicating with on-premises MOM platforms to AWS with an end goal to retire on-premises MOM platform
This diagram shows an event-sourcing integration reference pattern for customers using middleware systems like IBM MQ on-premises with AWS services

Figure 5. Event-sourcing integration pattern

  1. Mainframe transactions from IBM MQ can be read using a connector or a bridge solution. They can then be published to Amazon MQ queues or Amazon Managed Streaming for Apache Kakfa (MSK) topics.
  2. Once the data is published to the queue or topic, consumers encoded in AWS Lambda functions or Amazon compute services can process, map, transform, or filter the messages. They can store the data in Amazon RDS, Amazon ElastiCache, S3, or DynamoDB.
  3. Now that the data resides in AWS, you can build new cloud-native applications and do the following:

Conclusion

Mainframe offloading and modernization using AWS services enables you to reduce cost, modernize your architectures, and integrate your mainframe and cloud-native technologies. You’ll be able to inform your business decisions with improved analytics, and create new opportunities for innovation and the development of modern applications.

More posts for Women’s History Month!

Other ways to participate

Make data available for analysis in seconds with Upsolver low-code data pipelines, Amazon Redshift Streaming Ingestion, and Amazon Redshift Serverless

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/make-data-available-for-analysis-in-seconds-with-upsolver-low-code-data-pipelines-amazon-redshift-streaming-ingestion-and-amazon-redshift-serverless/

Amazon Redshift is the most widely used cloud data warehouse. Amazon Redshift makes it easy and cost-effective to perform analytics on vast amounts of data. Amazon Redshift launched Streaming Ingestion for Amazon Kinesis Data Streams, which enables you to load data into Amazon Redshift with low latency and without having to stage the data in Amazon Simple Storage Service (Amazon S3). This new capability enables you to build reports and dashboards and perform analytics using fresh and current data, without needing to manage custom code that periodically loads new data.

Upsolver is an AWS Advanced Technology Partner that enables you to ingest data from a wide range of sources, transform it, and load the results into your target of choice, such as Kinesis Data Streams and Amazon Redshift. Data analysts, engineers, and data scientists define their transformation logic using SQL, and Upsolver automates the deployment, scheduling, and maintenance of the data pipeline. It’s pipeline ops simplified!

There are multiple ways to stream data to Amazon Redshift and in this post we will cover two options that Upsolver can help you with: First, we show you how to configure Upsolver to stream events to Kinesis Data Streams that are consumed by Amazon Redshift using Streaming Ingestion. Second, we demonstrate how to write event data to your data lake and consume it using Amazon Redshift Serverless so you can go from raw events to analytics-ready datasets in minutes.

Prerequisites

Before you get started, you need to install Upsolver. You can sign up for Upsolver and deploy it directly into your VPC to securely access Kinesis Data Streams and Amazon Redshift.

Configure Upsolver to stream events to Kinesis Data Streams

The following diagram represents the architecture to write events to Kinesis Data Streams and Amazon Redshift.

To implement this solution, you complete the following high-level steps:

  1. Configure the source Kinesis data stream.
  2. Execute the data pipeline.
  3. Create an Amazon Redshift external schema and materialized view.

Configure the source Kinesis data stream

For the purpose of this post, you create an Amazon S3 data source that contains sample retail data in JSON format. Upsolver ingests this data as a stream; as new objects arrive, they’re automatically ingested and streamed to the destination.

  1. On the Upsolver console, choose Data Sources in the navigation sidebar.
  2. Choose New.
  3. Choose Amazon S3 as your data source.
  4. For Bucket, you can use the bucket with the public dataset or a bucket with your own data.
  5. Choose Continue to create the data source.
  6. Create a data stream in Kinesis Data Streams, as shown in the following screenshot.

This is the output stream Upsolver uses to write events that are consumed by Amazon Redshift.

Next, you create a Kinesis connection in Upsolver. Creating a connection enables you to define the authentication method Upsolver uses—for example, an AWS Identity and Access Management (IAM) access key and secret key or an IAM role.

  1. On the Upsolver console, choose More in the navigation sidebar.
  2. Choose Connections.
  3. Choose New Connection.
  4. Choose Amazon Kinesis.
  5. For Region, enter your AWS Region.
  6. For Name, enter a name for your connection (for this post, we name it upsolver_redshift).
  7. Choose Create.

Before you can consume the events in Amazon Redshift, you must write them to the output Kinesis data stream.

  1. On the Upsolver console, navigate to Outputs and choose Kinesis.
  2. For Data Sources, choose the Kinesis data source you created in the previous step.
  3. Depending on the structure of your event data, you have two choices:
    1. If the event data you’re writing to the output doesn’t contain any nested fields, select Tabular. Upsolver automatically flattens nested data for you.
    2. To write your data in a nested format, select Hierarchical.
  4. Because we’re working with Kinesis Data Streams, select Hierarchical.

Execute the data pipeline

Now that the stream is connected from the source to an output, you must select which fields of the source event you wish to pass through. You can also choose to apply transformations to your data—for example, adding correct timestamps, masking sensitive values, and adding computed fields. For more information, refer to Quick guide: SQL data transformation.

After adding the columns you want to include in the output and applying transformations, choose Run to start the data pipeline. As new events arrive in the source, Upsolver automatically transforms them and forwards the results to the output stream. There is no need to schedule or orchestrate the pipeline; it’s always on.

Create an Amazon Redshift external schema and materialized view

First, create an IAM role with the appropriate permissions (for more information, refer to Streaming ingestion). Now you can use the Amazon Redshift query editor, AWS Command Line Interface (AWS CLI), or API to run the following SQL statements.

  1. Create an external schema that is backed by Kinesis Data Streams. The following command requires you to include the IAM role you created earlier:
    CREATE EXTERNAL SCHEMA upsolver
    FROM KINESIS
    IAM_ROLE 'arn:aws:iam::123456789012:role/redshiftadmin';

  2. Create a materialized view that allows you to run a SELECT statement against the event data that Upsolver produces:
    CREATE MATERIALIZED VIEW mv_orders AS
    SELECT ApproximateArrivalTimestamp, SequenceNumber,
       json_extract_path_text(from_varbyte(Data, 'utf-8'), 'orderId') as order_id,
       json_extract_path_text(from_varbyte(Data, 'utf-8'), 'shipmentStatus') as shipping_status
    FROM upsolver.upsolver_redshift;

  3. Instruct Amazon Redshift to materialize the results to a table called mv_orders:
    REFRESH MATERIALIZED VIEW mv_orders;

  4. You can now run queries against your streaming data, such as the following:
    SELECT * FROM mv_orders;

Use Upsolver to write data to a data lake and query it with Amazon Redshift Serverless

The following diagram represents the architecture to write events to your data lake and query the data with Amazon Redshift.

To implement this solution, you complete the following high-level steps:

  1. Configure the source Kinesis data stream.
  2. Connect to the AWS Glue Data Catalog and update the metadata.
  3. Query the data lake.

Configure the source Kinesis data stream

We already completed this step earlier in the post, so you don’t need to do anything different.

Connect to the AWS Glue Data Catalog and update the metadata

To update the metadata, complete the following steps:

  1. On the Upsolver console, choose More in the navigation sidebar.
  2. Choose Connections.
  3. Choose the AWS Glue Data Catalog connection.
  4. For Region, enter your Region.
  5. For Name, enter a name (for this post, we call it redshift serverless).
  6. Choose Create.
  7. Create a Redshift Spectrum output, following the same steps from earlier in this post.
  8. Select Tabular as we’re writing output in table-formatted data to Amazon Redshift.
  9. Map the data source fields to the Redshift Spectrum output.
  10. Choose Run.
  11. On the Amazon Redshift console, create an Amazon Redshift Serverless endpoint.
  12. Make sure you associate your Upsolver role to Amazon Redshift Serverless.
  13. When the endpoint launches, open the new Amazon Redshift query editor to create an external schema that points to the AWS Glue Data Catalog (see the following screenshot).

This enables you to run queries against data stored in your data lake.

Query the data lake

Now that your Upsolver data is being automatically written and maintained in your data lake, you can query it using your preferred tool and the Amazon Redshift query editor, as shown in the following screenshot.

Conclusion

In this post, you learned how to use Upsolver to stream event data into Amazon Redshift using streaming ingestion for Kinesis Data Streams. You also learned how you can use Upsolver to write the stream to your data lake and query it using Amazon Redshift Serverless.

Upsolver makes it easy to build data pipelines using SQL and automates the complexity of pipeline management, scaling, and maintenance. Upsolver and Amazon Redshift enable you to quickly and easily analyze data in real time.

If you have any questions, or wish to discuss this integration or explore other use cases, start the conversation in our Upsolver Community Slack channel.


About the Authors

Roy Hasson is the Head of Product at Upsolver. He works with customers globally to simplify how they build, manage and deploy data pipelines to deliver high quality data as a product. Previously, Roy was a Product Manager for AWS Glue and AWS Lake Formation.

Mei Long is a Product Manager at Upsolver. She is on a mission to make data accessible, usable and manageable in the cloud. Previously, Mei played an instrumental role working with the teams that contributed to the Apache Hadoop, Spark, Zeppelin, Kafka, and Kubernetes projects.

Maneesh Sharma is a Senior Database Engineer  at AWS with more than a decade of experience designing and implementing large-scale data warehouse and analytics solutions. He collaborates with various Amazon Redshift Partners and customers to drive better integration.

How Cynamics built a high-scale, near-real-time, streaming AI inference system using AWS

Post Syndicated from Aviv Yehezkel original https://aws.amazon.com/blogs/big-data/how-cynamics-built-a-high-scale-near-real-time-streaming-ai-inference-system-using-aws/

This post is co-authored by Dr. Yehezkel Aviv, Co-Founder and CTO of Cynamics and Sapir Kraus, Head of Engineering at Cynamics.

Cynamics provides a new paradigm of cybersecurity — predicting attacks long before they hit by collecting small network samples (less than 1%), inferring from them how the full network (100%) behaves, and predicting threats using unique AI breakthroughs. The sample approach allows Cynamics to be generic, agnostic, and work for any client’s network architecture, no matter how messy the mix between legacy, private, and public clouds. Furthermore, the solution is scalable and provides full cover to the client’s network, no matter how large it is in volume and size. Moreover, because any network gateway (physical or virtual, legacy or cloud) supports one of the standard sampling protocols and APIs, Cynamics doesn’t require any installation of appliances nor agents, as well as no network changes and modifications, and the onboarding usually takes less than an hour.

In the crowded cybersecurity market, Cynamics is the first-ever solution based on small network samples, which has been considered a hard and unsolved challenge in academia (our academic paper “Network anomaly detection using transfer learning based on auto-encoders loss normalization” was recently presented in ACM CCS AISec 2021) and industry to this day.

The problem Cynamics faced

Early in the process, with the growth of our customer base, we were required to seamlessly support the increased scale and network throughput by our unique AI algorithms. We faced a few different challenges:

  • How can we perform near-real-time analysis on our streaming clients’ incoming data into our AI inference system to predict threats and attacks?
  • How can we seamlessly auto scale our solution to be cost-efficient with no impact on the platform ingestion rate?
  • Because many of our customers are from the public sector, how can we do this while supporting both AWS commercial and government environments (GovCloud)?

This post shows how we used AWS managed services and in particular Amazon Kinesis Data Streams and Amazon EMR to build a near-real-time streaming AI inference system serving hundreds of production customers in both AWS commercial and government environments, while seamlessly auto scaling.

Overview of solution

The following diagram illustrates our solution architecture:

To provide a cost-efficient, highly available solution that scales easily with user growth, while having no impact on near-real-time performance, we turned to Amazon EMR.

We currently process over 50 million records per day, which translates to just over 5 billion flows, and keeps growing on a daily basis. Using Amazon EMR along with Kinesis Data Streams provided the scalability we needed to achieve inference times of just a few seconds.

Although this technology was new to us, we minimized our learning curve by turning to the available guides from AWS for best practices on scale, partitioning, and resource management.

Workflow

Our workflow contains the following steps:

  1. Flow samples are sent by the client’s network devices directly to the Cynamics cloud. A network flow (or connection) is a set of packets with the same five-tuple ID: source-IP-address, destination-IP-address, source-port, destination-port, and protocol.
  2. The samples are analyzed by Network Load Balancers, which forward them into an auto scaling group of stateless flow transformers running on Graviton-powered Amazon Elastic Compute Cloud (Amazon EC2) instances. With Graviton-based processors in the flow transformers, we reduced our operational costs by over 30%.
  3. The flows are transformed to the Cynamics data format and enriched with additional information from Cynamics’ databases and in-house sources such as IP resolutions, intelligence, and reputation.

The following figures show the network scale for a single flow transformer machine over a week. The first figure illustrates incoming network packets for a single flow transformer machine.

The following shows outcoming network packets for a single flow transformer machine.

The following shows incoming network bytes for a single flow transformer machine.

The following shows outcoming network bytes for a single flow transformer machine.

  1. The flows are sent using Kinesis Data Streams to the real-time analysis engine.
  2. The Amazon EMR-based real-time engine consumes records in a few seconds batches using Yarn/Spark. The sampling rate of each client is dynamically tuned according to its throughput to ensure a fixed incoming data rate for all clients. We achieved this using Amazon EMR Managed Scaling with a custom policy (available with Amazon EMR versions 5.30.1 and later), which allows us to scale EMR nodes in or out based on Amazon CloudWatch metrics, with two different rules for scale-out and scale-in. The metric we created is based on the Amazon EMR running time, because our real-time AI threat detection runs on a sliding window interval of a few seconds.
    1. The scale-out policy tracks the average running time over a period of 10 minutes, and scales the EMR nodes if it’s longer than 95% of the required interval. This allows us to prevent processing delays.
    2. Similarly, the scale-in policy uses the same metric but measures the average over a 30-minute period, and scales the cluster accordingly. This enables us to optimize cluster costs and reduce the number of EMR nodes in off-hours.
  3. To optimize and seamlessly scale our AI inference calls, these were made available through an ALB and another auto scaling group of servers (AI model-service).
  4. We use Amazon DynamoDB as a fast and highly available states table.

The following figure shows the number of records processed by the Kinesis data stream over a single day.

The following shows the Kinesis data streams records rate per minute.

AI predictions and threat detections are sent to continued processing and alerting, and are saved in Amazon DocumentDB (with MongoDB compatibility).

Conclusion

With the approach described in this post, Cynamics has been providing threat prediction based on near-real-time analysis of its unique AI algorithms for a constantly growing customer base in a seamless and automatically scalable way. Since first implementing the solution, we’ve managed to easily and linearly scale our architecture, and were able to further optimize our costs by transitioning to Graviton-based processors in the flow transformers, which reduced over 30% of our flow transformers costs.

We’re considering the following next steps:

  • An automatic machine learning lifecycle using an Amazon SageMaker Studio pipeline, which includes the following steps:
  • Additional cost reduction by moving the EMR instances to be Graviton-based as well, which should yield an additional 20% reduction.

About the Authors

Dr. Yehezkel Aviv is the co-founder and CTO of Cynamics, leading the company innovation and technology. Aviv holds a PhD in Computer Science from the Technion, specializing in cybersecurity, AI, and ML.

Sapir Kraus is Head of Engineering at Cynamics, where his core focus is managing the software development lifecycle. His responsibilities also include software architecture and providing technical guidance to team members. Outside of work, he enjoys roasting coffee and barbecuing.

Omer Haim is a Startup Solutions Architect at Amazon Web Services. He helps startups with their cloud journey, and is passionate about containers and ML. In his spare time, Omer likes to travel, and occasionally game with his son.

Automating Anomaly Detection in Ecommerce Traffic Patterns

Post Syndicated from Aditya Pendyala original https://aws.amazon.com/blogs/architecture/automating-anomaly-detection-in-ecommerce-traffic-patterns/

Many organizations with large ecommerce presences have procedures to detect major anomalies in their user traffic. Often, these processes use static alerts or manual monitoring. However, the ability to detect minor anomalies in traffic patterns near real-time can be challenging. Early detection of these minor anomalies in ecommerce traffic (such as website page visits and order completions) helps organizations take corrective actions to address issues. This decreases negative impacts to business key performance indicators (KPIs).

In this blog post, we will demonstrate an artificial intelligence/machine learning (AI/ML) solution using AWS services. We’ll show how Amazon Kinesis and Amazon Lookout for Metrics can be used to detect major and minor anomalies near-real time, based on historical and current traffic trends.

The inconsistency of ecommerce traffic

The ecommerce traffic (and number of orders placed) varies based on season, month, date, and time of day. For example, ecommerce websites experience high traffic during weekday evening hours, compared to morning hours. Similarly, there is a spike in web traffic on weekends, compared to weekdays. However, the ecommerce traffic on holiday events (for example, Black Friday, Cyber Monday) does not follow this trend. Due to such dynamic and varying patterns, detecting minor anomalies in user traffic near-real time becomes difficult.

We need a smart solution that can detect the smallest deviation in user traffic based on historical data (date and time). As you can imagine, programming these trends based on static rules is time-intensive. In the next section, we discuss a solution that can help organizations automate and detect minor (and major) anomalies while still accounting for varying traffic trends.

The components of our anomaly detection solution

The architecture consists of three functional components:

  • The ecommerce application that customers use for interaction
  • The data ingesting, transforming, and storage platform
  • Anomaly detection and notification

This solution automates data ingestion and anomaly detection, and provides a graphical user interface to interact, tweak, and filter anomalies based on severity.

Figure 1 illustrates the architecture of this solution:

Figure 1. Architecture diagram of an anomaly detection solution for ecommerce traffic

Figure 1. Architecture diagram of an anomaly detection solution for ecommerce traffic

Let’s look at the individual components of this architecture before reviewing the overall solution.

The ecommerce application that customers use for interaction 

A customer’s journey of purchasing a product online involves user actions that include:

  • Searching for and viewing the product on the “Product Display Page” (PDP)
  • Adding to the “cart”
  • Completing the purchase on the “checkout“ page

The traffic on these pages is broken down into chunks based on time intervals. These serve as the data points that we can use to understand traffic patterns.

The data ingesting, transforming, and storage platform

Ecommerce applications generate data in multiple formats and in different volumes. This data must be fed into a streaming platform that can ingest and collect data continuously. Typically, the data must be transformed and stored for analysis and machine learning purposes. To satisfy these requirements, we will use Amazon Kinesis Data Streams as a streaming platform for data ingestion. Amazon Kinesis Data Firehose with AWS Lambda can transform the data. And we’ll store the data in Amazon Simple Storage Service (S3).

Anomaly detection and notification in near-real time

Once our data is ready, we must analyze it near-real time to identify anomalies. We must notify the concerned team about this anomaly so that they can take necessary corrective actions, if needed. We will use Lookout for Metrics and Amazon Simple Notification Service (SNS) to satisfy these requirements.

Lookout for Metrics can detect and diagnose anomalies in traffic patterns using ML. Amazon Lookout for Metrics accepts feedback on detected anomalies and tunes the results to improve accuracy over time. Lookout for Metrics is also capable of integrating with Amazon SNS, which can send notifications via SMS, mobile push, and emails.

Monitoring ecommerce traffic with Lookout for Metrics

As shown in Figure 1, data from user traffic and user interactions with the ecommerce application is captured as a function of time, and ingested into Kinesis Data Streams. Using Kinesis Data Firehose and Lambda, data is transformed and stored in an S3 bucket. We then create a detector in Lookout for Metrics and use the S3 bucket as the data source. Because of seamless integration between S3 and Lookout for Metrics, data from S3 bucket is automatically ingested into the detector we created.

Once the detector is activated, Lookout for Metrics will start monitoring the data for anomalies, and start identifying the anomalies near-real time. Lookout for Metrics also provides a mechanism to adjust severity threshold on a scale of 0-100, which will help decrease false positives as much as desired. In addition, it integrates with SNS, and can publish notifications to an SNS Topic. An email/ SMS or mobile push subscription can be created on this topic, which will notify users about any current anomalies.

 Conclusion

In this post, we discussed how minor anomalies are hard to detect near-real time in ecommerce traffic of organizations. We also discussed the services that can be used to monitor these anomalies, such as Lookout for Metrics. Use this architecture to help you monitor, detect anomalies in near-real time, and reduce any negative impact to your business KPIs.

For further reading:

Automate Amazon Connect Data Streaming using AWS CDK

Post Syndicated from Tarik Makota original https://aws.amazon.com/blogs/architecture/automate-amazon-connect-data-streaming-using-aws-cdk/

Many customers want to provision Amazon Web Services (AWS) cloud resources quickly and consistently with lifecycle management, by treating infrastructure as code (IaC). Commonly used services are AWS CloudFormation and HashiCorp Terraform. Currently, customers set up Amazon Connect data streaming manually, as the service is not available under CloudFormation resource types. Customers may want to extend it to retrieve real-time contact and agent data. Integration is done manually and can result in issues with IaC.

Amazon Connect contact trace records (CTRs) capture the events associated with a contact in the contact center. Amazon Connect agent event streams are Amazon Kinesis Data Streams that provide near real-time reporting of agent activity within the Amazon Connect instance. The events published to the stream include these contact control panel (CCP) events:

  • Agent login
  • Agent logout
  • Agent connects with a contact
  • Agent status change, such as to available to handle contacts, or on break, or at training.

In this blog post, we will show you how to automate Amazon Connect data streaming using AWS Cloud Development Kit (AWS CDK). AWS CDK is an open source software development framework to define your cloud application resources using familiar programming languages. We will create a custom CDK resource, which in turn uses Amazon Connect API. This can be used as a template to automate other parts of Amazon Connect, or for other AWS services that don’t expose its full functionality through CloudFormation.

Overview of Amazon Connect automation solution

Amazon Connect is an omnichannel cloud contact center that helps you provide superior customer service. We will stream Amazon Connect agent activity and contact trace records to Amazon Kinesis. We will assume that data will then be used by other services or third-party integrations for processing. Here are the high-level steps and AWS services that we are going use, see Figure 1:

  1. Amazon Connect: We will create an instance and enable data streaming
  2. Cloud Deployment Toolkit: We will create custom resource and orchestrate automation
  3. Amazon Kinesis Data Streams and Amazon Kinesis Data Firehose: To stream data out of Connect
  4. AWS Identity and Access Management (IAM): To govern access and permissible actions across all AWS services
  5. Third-party tool or Amazon S3: Used as a destination of Connect data via Amazon Kinesis data
Figure 1. Connect data streaming automation workflow

Figure 1. Connect data streaming automation workflow

Walkthrough and deployment tasks

Sample code for this solution is provided in this GitHub repo. The code is packaged as a CDK application, so the solution can be deployed in minutes. The deployment tasks are as follows:

  • Deploy the CDK app
  • Update Amazon Connect instance settings
  • Import the demo flow and data

Custom Resources enables you to write custom logic in your CloudFormation deployment. You implement the creation, update, and deletion logic to define the custom resource deployment.

CDK implements the AWSCustomResource, which is an AWS Lambda backed custom resource that uses the AWS SDK to provision your resources. This means that the CDK stack deploys a provisioning Lambda. Upon deployment, it calls the AWS SDK API operations that you defined for the resource lifecycle (create, update, and delete).

Prerequisites

For this walkthrough, you need the following prerequisites:

Deploy and verify

1. Deploy the CDK application.

The resources required for this demo are packaged as a CDK app. Before proceeding, confirm you have command line interface (CLI) access to the AWS account where you would like to deploy your solution.

  • Open a terminal window and clone the GitHub repository in a directory of your choice:
    git clone [email protected]:aws-samples/connect-cdk-blog
  • Navigate to the cdk-app directory and follow the deployment instructions. The default Region is usually us-east-1. If you would like to deploy in another Region, you can run:
    export AWS_DEFAULT_REGION=eu-central-1

2. Create the CloudFormation stack by initiating the following commands.

source .env/bin/activate
pip install -r requirements.txt
cdk synth
cdk bootstrap
cdk deploy  --parametersinstanceId={YOUR-AMAZON-CONNECT-INSTANCE-ID}

--parameters ctrStreamName={CTRStream}

--parameters agentStreamName={AgentStream}

Note: By default, the stack will create contact trace records stream [ctrStreamName] as a Kinesis Data Stream. If you want to use an Amazon Kinesis Data Firehose delivery stream instead, you can modify this behavior by going to cdk.json and adding “ctr_stream_type”: “KINESIS_FIREHOSE” as a parameter under “context.”

Once the status of CloudFormation stack is updated to CREATE_COMPLETE, the following resources are created:

  • Kinesis Data Stream
  • IAM roles
  • Lambda

3. Verify the integration.

  • Kinesis Data Streams are added to the Amazon Connect instance
Figure 2. Screenshot of Amazon Connect with Data Streaming enabled

Figure 2. Screenshot of Amazon Connect with Data Streaming enabled

Cleaning up

You can remove all resources provisioned for the CDK app by running the following command under connect-app directory:

cdk destroy

This will not remove your Amazon Connect instance. You can remove it by navigating to the AWS Management Console -> Services -> Amazon Connect. Find your Connect instance and click Delete.

Conclusion

In this blog, we demonstrated how to maintain Amazon Connect as Infrastructure as Code (IaC). Using a custom resource of AWS CDK, we have shown how to automate setting Amazon Kinesis Data Streams to Data Streaming in Amazon Connect. The same approach can be extended to automate setting other Amazon Connect properties such as Amazon Lex, AWS Lambda, Amazon Polly, and Customer Profiles. This approach will help you to integrate Amazon Connect with your Workflow Management Application in a faster and consistent manner, and reduce manual configuration.

For more information, refer to Enable Data Streaming for your instance.

Stream Apache HBase edits for real-time analytics

Post Syndicated from Amir Shenavandeh original https://aws.amazon.com/blogs/big-data/stream-apache-hbase-edits-for-real-time-analytics/

Apache HBase is a non-relational database. To use the data, applications need to query the database to pull the data and changes from tables. In this post, we introduce a mechanism to stream Apache HBase edits into streaming services such as Apache Kafka or Amazon Kinesis Data Streams. In this approach, changes to data are pushed and queued into a streaming platform such as Kafka or Kinesis Data Streams for real-time processing, using a custom Apache HBase replication endpoint.

We start with a brief technical background on HBase replication and review a use case in which we store IoT sensor data into an HBase table and enrich rows using periodic batch jobs. We demonstrate how this solution enables you to enrich the records in real time and in a serverless way using AWS Lambda functions.

Common scenarios and use cases of this solution are as follows:

  • Auditing data, triggers, and anomaly detection using Lambda
  • Shipping WALEdits via Kinesis Data Streams to Amazon OpenSearch Service (successor to Amazon Elasticsearch Service) to index records asynchronously
  • Triggers on Apache HBase bulk loads
  • Processing streamed data using Apache Spark Streaming, Apache Flink, Amazon Kinesis Data Analytics, or Kinesis analytics
  • HBase edits or change data capture (CDC) replication into other storage platforms, such as Amazon Simple Storage Service (Amazon S3), Amazon Relational Database Service (Amazon RDS), and Amazon DynamoDB
  • Incremental HBase migration by replaying the edits from any point in time, based on configured retention in Kafka or Kinesis Data Streams

This post progresses into some common use cases you might encounter, along with their design options and solutions. We review and expand on these scenarios in separate sections, in addition to the considerations and limits in our application designs.

Introduction to HBase replication

At a very high level, the principle of HBase replication is based on replaying transactions from a source cluster to the destination cluster. This is done by replaying WALEdits or Write Ahead Log entries on the RegionServers of the source cluster into the destination cluster. To explain WALEdits, in HBase, all the mutations in data like PUT or DELETE are written to MemStore of their specific region and are appended to a WAL file as WALEdits or entries. Each WALEdit represents a transaction and can carry multiple write operations on a row. Because the MemStore is an in-memory entity, in case a region server fails, the lost data can be replayed and restored from the WAL files. Having a WAL is optional, and some operations may not require WALs or can request to bypass WALs for quicker writes. For example, records in a bulk load aren’t recorded in WAL.

HBase replication is based on transferring WALEdits to the destination cluster and replaying them so any operation that bypasses WAL isn’t replicated.

When setting up a replication in HBase, a ReplicationEndpoint implementation needs to be selected in the replication configuration when creating a peer, and on every RegionServer, an instance of ReplicationEndpoint runs as a thread. In HBase, a replication endpoint is pluggable for more flexibility in replication and shipping WALEdits to different versions of HBase. You can also use this to build replication endpoints for sending edits to different platforms and environments. For more information about setting up replication, see Cluster Replication.

HBase bulk load replication HBASE-13153

In HBase, bulk loading is a method to directly import HFiles or Store files into RegionServers. This avoids the normal write path and WALEdits. As a result, far less CPU and network resources are used when importing big portions of data into HBase tables.

You can also use HBase bulk loads to recover data when an error or outage causes the cluster to lose track of regions and Store files.

Because bulk loads skip WAL creation, all new records aren’t replicated to the secondary cluster. In HBASE-13153, which is an enhancement, a bulk load is represented as a bulk load event, carrying the location of the imported files. You can activate this by setting hbase.replication.bulkload.enabled to true and setting hbase.replication.cluster.id to a unique value as a prerequisite.

Custom streaming replication endpoint

We can use HBase’s pluggable endpoints to stream records into platforms such as Kinesis Data Streams or Kafka. Transferred records can be consumed by Lambda functions, processed by a Spark Streaming application or Apache Flink on Amazon EMR, Kinesis Data Analytics, or any other big data platform.

In this post, we demonstrate an implementation of a custom replication endpoint that allows replicating WALEdits in Kinesis Data Streams or Kafka topics.

In our example, we built upon the BaseReplicationEndpoint abstract class, inheriting the ReplicationEndpoint interface.

The main method to implement and override is the replicate method. This method replicates a set of items it receives every time it’s called and blocks until all those records are replicated to the destination.

For our implementation and configuration options, see our GitHub repository.

Use case: Enrich records in real time

We now use the custom streaming replication endpoint implementation to stream HBase edits to Kinesis Data Streams.

The provided AWS CloudFormation template demonstrates how we can set up an EMR cluster with replication to either Kinesis Data Streams or Apache Kafka and consume the replicated records, using a Lambda function to enrich data asynchronously, in real time. In our sample project, we launch an EMR cluster with an HBase database. A sample IoT traffic generator application runs as a step in the cluster and puts records, containing a registration number and an instantaneous speed, into a local HBase table. Records are replicated in real time into a Kinesis stream or Kafka topic based on the selected option at launch, using our custom HBase replication endpoint. When the step starts putting the records into the stream, a Lambda function is provisioned and starts digesting records from the beginning of the stream and catches up with the stream. The function calculates a score per record, based on a formula on variation from minimum and maximum speed limits in the use case, and persists the result as a score qualifier into a different column family, out of replication scope in the source table, by running HBase puts on RowKey.

The following diagram illustrates this architecture.

To launch our sample environment, you can use our template on GitHub.

The template creates a VPC, public and private subnets, Lambda functions to consume records and prepare the environment, an EMR cluster with HBase, and a Kinesis data stream or an Amazon Managed Streaming for Apache Kafka (Amazon MSK) cluster depending on the selected parameters when launching the stack.

Architectural design patterns

Traditionally, Apache HBase tables are considered as data stores, where consumers get or scan the records from tables. It’s very common in modern databases to react to database logs or CDC for real-time use cases and triggers. With our streaming HBase replication endpoint, we can project table changes into message delivery systems like Kinesis Data Streams or Apache Kafka.

We can trigger Lambda functions to consume messages and records from Apache Kafka or Kinesis Data Streams to consume the records in a serverless design or the wider Amazon Kinesis ecosystems such as Kinesis Data Analytics or Amazon Kinesis Data Firehose for delivery into Amazon S3. You could also pipe in Amazon OpenSearch Service.

A wide range of consumer ecosystems, such as Apache Spark, AWS Glue, and Apache Flink, is available to consume from Kafka and Kinesis Data Streams.

Let’s review few other common use cases.

Index HBase rows

Apache HBase rows are retrievable by RowKey. Writing a row into HBase with the same RowKey overwrites or creates a new version of the row. To retrieve a row, it needs to be fetched by the RowKey or a range of rows needs to be scanned if the RowKey is unknown.

In some use cases, scanning the table for a specific qualifier or value is expensive if we index our rows in another parallel system like Elasticsearch asynchronously. Applications can use the index to find the RowKey. Without this solution, a periodic job has to scan the table and write them into an indexing service like Elasticsearch to hydrate the index, or the producing application has to write in both HBase and Elasticsearch directly, which adds overhead to the producer.

Enrich and audit data

A very common use case for HBase streaming endpoints is enriching data and storing the enriched records in a data store, such as Amazon S3 or RDS databases. In this scenario, a custom HBase replication endpoint streams the records into a message distribution system such as Apache Kafka or Kinesis Data Streams. Records can be serialized, using the AWS Glue Schema Registry for schema validation. A consumer on the other end of the stream reads the records, enriches them, and validates against a machine learning model in Amazon SageMaker for anomaly detection. The consumer persists the records in Amazon S3 and potentially triggers an alert using Amazon Simple Notification Service (Amazon SNS). Stored data on Amazon S3 can be further digested on Amazon EMR, or we can create a dashboard on Amazon QuickSight, interfacing Amazon Athena for queries.

The following diagram illustrates our architecture.

Store and archive data lineage

Apache HBase comes with the snapshot feature. You can freeze the state of tables into snapshots and export them to any distributed file system like HDFS or Amazon S3. Recovering snapshots restores the entire table to the snapshot point.

Apache HBase also supports versioning at the row level. You can configure column families to keep row versions, and the default versioning is based on timestamps.

However, when using this approach to stream records into Kafka or Kinesis Data Streams, records are retained inside the stream, and you can partially replay a period. Recovering snapshots only recovers up to the snapshot point and the future records aren’t present.

In Kinesis Data Streams, by default records of a stream are accessible for up to 24 hours from the time they are added to the stream. This limit can be increased to up to 7 days by enabling extended data retention, or up to 365 days by enabling long-term data retention. See Quotas and Limits for more information.

In Apache Kafka, record retention has virtually no limits based on available resources and disk space configured on the Kafka cluster, and can be configured by setting log.retention.

Trigger on HBase bulk load

The HBase bulk load feature uses a MapReduce job to output table data in HBase’s internal data format, and then directly loads the generated Store files into the running cluster. Using bulk load uses less CPU and network resources than loading via the HBase API, as HBase bulk load bypasses WALs in the write path and the records aren’t seen by replication. However, since HBASE-13153, you can configure HBase to replicate a meta record as an indication of a bulk load event.

A Lambda function processing replicated WALEdits can listen to this event to trigger actions, such as automatically refreshing a read replica HBase cluster on Amazon S3 whenever a bulk load happens. The following diagram illustrates this workflow.

Considerations for replication into Kinesis Data Streams

Kinesis Data Streams is a massively scalable and durable real-time data streaming service. Kinesis Data Streams can continuously capture gigabytes of data per second from hundreds of thousands of sources with very low latency. Kinesis is fully managed and runs your streaming applications without requiring you to manage any infrastructure. It’s durable, because records are synchronously replicated across three Availability Zones, and you can increase data retention to 365 days.

When considering Kinesis Data Streams for any solution, it’s important to consider service limits. For instance, as of this writing, the maximum size of the data payload of a record before base64-encoding is up to 1 MB, so we must make sure the records or serialized WALEdits remain within the Kinesis record size limit. To be more efficient, you can enable the hbase.replication.compression-enabled attribute to GZIP compress the records before sending them to the configured stream sink.

Kinesis Data Streams retains the order of the records within the shards as they arrive, and records can be read or processed in the same order. However, in this sample custom replication endpoint, a random partition key is used so that the records are evenly distributed between the shards. We can also use a hash function to generate a partition key when putting records into the stream, for example based on the Region ID so that all the WALEdits from the same Region land in the same shard and consumers can assume Region locality per shards.

For delivering records in KinesisSinkImplemetation, we use the Amazon Kinesis Producer Library (KPL) to put records into Kinesis data streams. The KPL simplifies producer application development; we can achieve high write throughput to a Kinesis data stream. We can use the KPL in either synchronous or asynchronous use cases. We suggest using the higher performance of the asynchronous interface unless there is a specific reason to use synchronous behavior. KPL is very configurable and has retry logic built in. You can also perform record aggregation for maximum throughput. In KinesisSinkImplemetation, by default records are asynchronously replicated to the stream. We can change to synchronous mode by setting hbase.replication.kinesis.syncputs to true. We can enable record aggregation by setting hbase.replication.kinesis.aggregation-enabled to true.

The KPL can incur an additional processing delay because it buffers records before sending them to the stream based on a user-configurable attribute of RecordMaxBufferedTime. Larger values of RecordMaxBufferedTime results in higher packing efficiencies and better performance. However, applications that can’t tolerate this additional delay may need to use the AWS SDK directly.

Kinesis Data Streams and the Kinesis family are fully managed and easily integrate with the rest of the AWS ecosystem with minimum development effort with services such as the AWS Glue Schema Registry and Lambda. We recommend considering Kinesis Data Streams for low-latency, real-time use cases on AWS.

Considerations for replication into Apache Kafka

Apache Kafka is a high-throughput, scalable, and highly available open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

AWS offers Amazon MSK as a fully managed Kafka service. Amazon MSK provides the control plane operations and runs open-source versions of Apache Kafka. Existing applications, tooling, and plugins from partners and the Apache Kafka community are supported without requiring changes to application code.

You can configure this sample project for Apache Kafka brokers directly or just point towards an Amazon MSK ARN for replication.

Although there is virtually no limit on the size of the messages in Kafka, the default maximum message size is set to 1 MB by default, so we must make sure the records, or serialized WALEdits, remain within the maximum message size for topics.

The Kafka producer tries to batch records together whenever possible to limit the number of requests for more efficiency. This is configurable by setting batch.size, linger.ms, and delivery.timeout.ms.

In Kafka, topics are partitioned, and partitions are distributed between different Kafka brokers. This distributed placement allows for load balancing of consumers and producers. When a new event is published to a topic, it’s appended to one of the topic’s partitions. Events with the same event key are written to the same partition, and Kafka guarantees that any consumer of a given topic partition can always read that partition’s events in exactly the same order as they were written. KafkaSinkImplementation uses a random partition key to distribute the messages evenly between the partitions. This could be based on a heuristic function, for example based on Region ID, if the order of the WALEdits or record locality is important by the consumers.

Semantic guarantees

Like any streaming application, it’s important to consider semantic guarantee from the producer of messages, to acknowledge or fail the status of delivery of messages in the message queue and checkpointing on the consumer’s side. Based on our use cases, we need to consider the following:

  • At most once delivery – Messages are never delivered more than once, and there is a chance of losing messages
  • At least once delivery – Messages can be delivered more than once, with no loss of messages
  • Exactly once delivery – Every message is delivered only once, and there is no loss of messages

After changes are persisted as WALs and in MemStore, the replicate method in ReplicationEnpoint is called to replicate a collection of WAL entries and returns a Boolean (true/false) value. If the returned value is true, the entries are considered successfully replicated by HBase and the replicate method is called for the next batch of WAL entries. Depending on configuration, both KPL and Kafka producers might buffer the records for longer if configured for asynchronous writes. Failures can cause loss of entries, retries, and duplicate delivery of records to the stream, which could be determinantal for synchronous or asynchronous message delivery.

If our operations aren’t idempotent, you can checkpoint or check for unique sequence numbers on the consumer side. For a simple HBase record replication, RowKey operations are idempotent and they carry a timestamp and sequence ID.

Summary

Replication of HBase WALEdits into streams is a powerful tool that you can use in multiple use cases and in combination with other AWS services. You can create practical solutions to further process records in real time, audit the data, detect anomalies, set triggers on ingested data, or archive data in streams to be replayed on other HBase databases or storage services from a point in time. This post outlined some common use cases and solutions, along with some best practices when implementing your custom HBase streaming replication endpoints.

Review, clone, and try our HBase replication endpoint implementation from our GitHub repository and launch our sample CloudFormation template.

We like to learn about your use cases. If you have questions or suggestions, please leave a comment.


About the Authors

Amir Shenavandeh is a Senior Hadoop systems engineer and Amazon EMR subject matter expert at Amazon Web Services. He helps customers with architectural guidance and optimization. He leverages his experience to help people bring their ideas to life, focusing on distributed processing and big data architectures.

Maryam Tavakoli is a Cloud Engineer and Amazon OpenSearch subject matter expert at Amazon Web Services. She helps customers with their Analytics and Streaming workload optimization and is passionate about solving complex problems with simplistic user experience that can empower customers to be more productive.

Integrate Etleap with Amazon Redshift Streaming Ingestion (preview) to make data available in seconds

Post Syndicated from Caius Brindescu original https://aws.amazon.com/blogs/big-data/integrate-etleap-with-amazon-redshift-streaming-ingestion-preview-to-make-data-available-in-seconds/

Amazon Redshift is a fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using SQL and your extract, transform, and load (ETL), business intelligence (BI), and reporting tools. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads.

Etleap is an AWS Advanced Technology Partner with the AWS Data & Analytics Competency and Amazon Redshift Service Ready designation. Etleap ETL removes the headaches experienced building data pipelines. A cloud-native platform that seamlessly integrates with AWS infrastructure, Etleap ETL consolidates data without the need for coding. Automated issue detection pinpoints problems so data teams can stay focused on business initiatives, not data pipelines.

In this post, we show how Etleap customers are integrating with the new streaming ingestion feature in Amazon Redshift (currently in limited preview) to load data directly from Amazon Kinesis Data Streams. This reduces load times from minutes to seconds and helps you gain faster data insights.

Amazon Redshift streaming ingestion with Kinesis Data Streams

Traditionally, you had to use Amazon Kinesis Data Firehose to land your stream into Amazon Simple Storage Service (Amazon S3) files and then employ a COPY command to move the data into Amazon Redshift. This method incurs latencies in the order of minutes.

Now, the native streaming ingestion feature in Amazon Redshift lets you ingest data directly from Kinesis Data Streams. The new feature enables you to ingest hundreds of megabytes of data per second and query it at exceptionally low latency—in many cases only 10 seconds after entering the data stream.

Configure Amazon Redshift streaming ingestion with SQL queries

Amazon Redshift streaming ingestion uses SQL to connect with one or more Kinesis data streams simultaneously. In this section, we walk through the steps to configure streaming ingestion.

Create an external schema

We begin by creating an external schema referencing Kinesis using syntax adapted from Redshift’s support for Federated Queries:

CREATE EXTERNAL SCHEMA MySchema
FROM Kinesis
IAM_ROLE { default | 'iam-role-arn' };

This external schema command creates an object inside Amazon Redshift that acts as a proxy to Kinesis Data Streams. Specifically, to the collection of data streams that are accessible via the AWS Identity and Access Management (IAM) role. You can use either the default Amazon Redshift cluster IAM role or a specified IAM role that has been attached to the cluster previously.

Create a materialized view

You can use Amazon Redshift materialized views to materialize a point-in-time view of a Kinesis data stream, as accumulated up to the time it is queried. The following command creates a materialized view over a stream from the previously defined schema:

CREATE MATERIALIZED VIEW MyView AS
SELECT *
FROM MySchema.MyStream;

Note the use of the dot syntax to pick out the particular stream desired. The attributes of the stream include a timestamp field, partition key, sequence number, and a VARBYTE data payload.

Although the previous materialized view definition simply performs a SELECT *, more sophisticated processing is possible, for instance, applying filtering conditions or shredding JSON data into columns. To demonstrate, consider the following Kinesis data stream with JSON payloads:

{
 “player” : “alice 127”,
 “region” : “us-west-1”,
 “action” : “entered shop”,
}

To demonstrate this, write a materialized view that shreds the JSON into columns, focusing only on the entered shop action:

CREATE MATERIALIZED VIEW ShopEntrances AS
SELECT ApproximateArrivalTimestamp, SequenceNumber,
   json_extract_path_text(from_varbyte(Data, 'utf-8'), 'player') as Player,
   json_extract_path_text(from_varbyte(Data, 'utf-8'), 'region') as Region
FROM MySchema.Actions
WHERE json_extract_path_text(from_varbyte(Data, 'utf-8'), 'action') = 'entered shop';

On the Amazon Redshift leader node, the view definition is parsed and analyzed. On success, it is added to the system catalogs. No further communication with Kinesis Data Streams occurs until the initial refresh.

Refresh the materialized view

The following command pulls data from Kinesis Data Streams into Amazon Redshift:

REFRESH MATERIALIZED VIEW MyView;

You can initiate it manually (via the SQL preceding command) or automatically via a scheduled query. In either case, it uses the IAM role associated with the stream. Each refresh is incremental and massively parallel, storing its progress in each Kinesis shard in the system catalogs so as to be ready for the next round of refresh.

With this process, you can now query near-real-time data from your Kinesis data stream through Amazon Redshift.

Use Amazon Redshift streaming ingestion with Etleap

Etleap pulls data from databases, applications, file stores, and event streams, and transforms it before loading it into an AWS data repository. Data ingestion pipelines typically process batches every 5–60 minutes, so when you query your data in Amazon Redshift, it’s at least 5 minutes out of date. For many use cases, such as ad hoc queries and BI reporting, this latency time is acceptable.

But what about when your team demands more up-to-date data? An example is operational dashboards, where you need to track KPIs in near-real time. Amazon Redshift load times are bottlenecked by COPY commands that move data from Amazon S3 into Amazon Redshift, as mentioned earlier.

This is where streaming ingestion comes in: by staging the data in Kinesis Data Streams rather than Amazon S3, Etleap can reduce data latency in Amazon Redshift to less than 10 seconds. To preview this feature, we ingest data from SQL databases such as MySQL and Postgres that support change data capture (CDC). The data flow is shown in the following diagram.

Etleap manages the end-to-end data flow through AWS Database Migration Service (AWS DMS) and Kinesis Data Streams, and creates and schedules Amazon Redshift queries, providing up-to-date data.

AWS DMS consumes the replication logs from the source, and produces insert, update, and delete events. These events are written to a Kinesis data stream that has multiple shards in order to handle the event load. Etleap transforms these events according to user-specified rules, and writes them to another data stream. Finally, a sequence of Amazon Redshift commands load data from the stream into a destination table. This procedure takes less than 10 seconds in real-world scenarios.

Configure Amazon Redshift streaming ingestion with Etleap

Previously, we explored how data in Kinesis Data Streams can be accessed in Amazon Redshift using SQL queries. In this section, we see how Etleap uses the streaming ingestion feature to mirror a table from MySQL into Amazon Redshift, and the end-to-end latency we can achieve.

Etleap customers that are part of the Streaming Ingestion Preview Program can ingest data into Amazon Redshift directly from an Etleap-managed Kinesis data stream. All pipelines from a CDC-enabled source automatically use this feature.

The destination table in Amazon Redshift is Type 1, a mirror of the table in the source database.

For example, say you want to mirror a MySQL table in Amazon Redshift. The table represents the online shopping carts that users have open. In this case, low latency is critical so that the platform marketing strategists can instantly identify abandoned carts and high demand items.

The cart table has the following structure:

CREATE TABLE cart (
id int PRIMARY KEY AUTO_INCREMENT, 
user_id INT,
current_price DECIMAL(6,2),
no_items INT,
checked_out TINY_INT(1),
update_date TIMESTAMP
);

Changes from the source table are captured using AWS DMS and then sent to Etleap via a Kinesis data stream. Etleap transforms these records and writes them to another data stream using the following structure:

{
            "id": 8322,
            "user_id": 443,
            "current_price": 22.98,
            "no_items": 3,
            "checked_out": 0,
            "update_date": "2021-11-05 23:11",
            "op": "U"
}

The structure encodes the row that was modified or inserted, as well as the operation type (represented by the op column), which can have three values: I (insert), U (update) or D (delete).

This information is then materialized in Amazon Redshift from the data stream:

CREATE EXTERNAL SCHEMA etleap_stream
FROM KINESIS
IAM_ROLE '<redacted>';

CREATE MATERIALIZED VIEW cart_staging
DISTSTYLE KEY
	DISTKEY(id)
	SORTKEY(etleap_sequence_no)
AS SELECT
	CAST(PartitionKey as bigint) AS etleap_sequence_no,
	CAST(JSON_EXTRACT_PATH_TEXT(from_varbyte(Data, 'utf-8'), 'id') as bigint) AS id,
	JSON_PARSE(FROM_VARBYTE(Data, 'utf-8')) AS Data
FROM etleap_stream."cart";

In the materialized view, we expose the following columns:

  • PartitionKey represents an Etleap sequence number, to ensure that updates are processed in the correct order.
  • We shred the primary keys of the table (id in the preceding example) from the payload, using them as a distribution key to improve the update performance.
  • The Data column is parsed out into a SUPER type from the JSON object in the stream. This is shredded into the corresponding columns in the cart table when the data is inserted.

With this staging materialized view, Etleap then updates the destination table (cart) that has the following schema:

CREATE TABLE cart ( 
id BIGINT PRIMARY KEY,
user_id BIGINT,
current_price DECIMAL(6,2),
no_items INT,
checked_out BOOLEAN,
update_date VARCHAR(64)
)
DISTSTYLE key
distkey(id);

To update the table, Etleap runs the following queries, selecting only the changed rows from the staging materialized view, and applies them to the cart table:

BEGIN;

REFRESH MATERIALIZED VIEW cart_staging;

UPDATE _etleap_si SET end_sequence_no = (
	SELECT COALESCE(MIN(etleap_sequence_no), (SELECT MAX(etleap_sequence_no) FROM cart_staging)) FROM 
	(
		SELECT 
			etleap_sequence_no, 
			LEAD(etleap_sequence_no, 1) OVER (ORDER BY etleap_sequence_no) - etleap_sequence_no AS diff
		FROM cart_staging 
		WHERE etleap_sequence_no > (SELECT start_sequence_no FROM _etleap_si WHERE table_name = 'cart')
	)
	WHERE diff > 1
) WHERE table_name = 'cart';



DELETE FROM cart
WHERE id IN (
	SELECT id
	FROM cart_staging
	WHERE etleap_sequence_no > (SELECT start_sequence_no FROM _etleap_si WHERE table_name = 'cart') 
	AND etleap_sequence_no <= (SELECT end_sequence_no FROM _etleap_si WHERE table_name = 'cart')
);

INSERT INTO cart
SELECT 
	DISTINCT(id),
	CAST(Data."timestamp" as timestamp),
	CAST(Data.payload as varchar(256)),
	CAST(Data.etleap_sequence_no as bigint) from
  	(SELECT id, 
  	JSON_PARSE(LAST_VALUE(JSON_SERIALIZE(Data)) OVER (PARTITION BY id ORDER BY etleap_sequence_no ASC ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)) AS Data
   	FROM cart_staging
	WHERE etleap_sequence_no > (SELECT start_sequence_no FROM _etleap_si WHERE table_name = 'cart') 
	AND etleap_sequence_no <= (SELECT end_sequence_no FROM _etleap_si WHERE table_name = 'cart'
AND Data.op != 'D')
);


UPDATE _etleap_si SET start_sequence_no = end_sequence_no WHERE table_name = 'cart';

COMMIT;

We run the following sequence of queries:

  1. Refresh the cart_staging materialized view to get new records from the cart stream.
  2. Delete all records from the cart table that were updated or deleted since the last time we ran the update sequence.
  3. Insert all the updated and newly inserted records from the cart_staging materialized view into the cart table.
  4. Update the _etleap_si bookkeeping table with the current position. Etleap uses this table to optimize the query in the staging materialized view.

This update sequence runs continuously to minimize end-to-end latency. To measure performance, we simulated the change stream from a database table that has up to 100,000 inserts, updates, and deletes. We tested target table sizes of up to 1.28 billion rows. Testing was done on a 2-node ra3.xlplus Amazon Redshift cluster and a Kinesis data stream with 32 shards.

The following figure shows how long the update sequence takes on average over 5 runs in different scenarios. Even in the busiest scenario (100,000 changes to a 1.28 billion row table), the sequence takes just over 10 seconds to run. In our experiment, the refresh time was independent of the delta size, and took 3.7 seconds with a standard deviation of 0.4 seconds.

This shows that the update process can keep up with source database tables that have 1 billion rows and 10,000 inserts, updates, and deletes per second.

Summary

In this post, you learned about the native streaming ingestion feature in Amazon Redshift and how it achieves latency in seconds, while ingesting data from Kinesis Data Streams into Amazon Redshift. You also learned about the architecture of Amazon Redshift with the streaming ingestion feature enabled, how to configure it using SQL commands, and use the capability in Etleap.

To learn more about Etleap, take a look at the Etleap ETL on AWS Quick Start, or visit their listing on AWS Marketplace.


About the Authors

Caius Brindescu is an engineer at Etleap with over 3 years of experience in developing ETL software. In addition to development work, he helps customers make the most out of Etleap and Amazon Redshift. He holds a PhD from Oregon State University and one AWS certification (Big Data – Specialty).

Todd J. Green is a Principal Engineer with AWS Redshift. Before joining Amazon, TJ worked at innovative database startups including LogicBlox and RelationalAI, and was an Assistant Professor of Computer Science at UC Davis. He received his PhD in Computer Science from UPenn. In his career as a researcher, TJ won a number of awards, including the 2017 ACM PODS Test-of-Time Award.

Maneesh Sharma is a Senior Database Engineer with Amazon Redshift. He works and collaborates with various Amazon Redshift Partners to drive better integration. In his spare time, he likes running, playing ping pong, and exploring new travel destinations.

Jobin George is a Big Data Solutions Architect with more than a decade of experience designing and implementing large-scale big data and analytics solutions. He provides technical guidance, design advice, and thought leadership to some of the key AWS customers and big data partners.

Amazon Kinesis Data Streams On-Demand – Stream Data at Scale Without Managing Capacity

Post Syndicated from Marcia Villalba original https://aws.amazon.com/blogs/aws/amazon-kinesis-data-streams-on-demand-stream-data-at-scale-without-managing-capacity/

Today we are launching Amazon Kinesis Data Streams On-demand, a new capacity mode. This capacity mode eliminates capacity provisioning and management for streaming workloads.

Kinesis Data Streams is a fully-managed, serverless service for real-time processing of streamed data at a massive scale. Kinesis Data Streams can take any amount of data, from any number of sources, and scale up and down as needed. Creating a new data stream is easy, since we announced Kinesis Data Streams in November 2013. To get started, you only need to specify the number of shards with which you must provision your stream.

Shards are the way to define capacity in Kinesis Data Streams. Each shard can ingest 1 MB/s and 1,000 records/second and egress up to 2 MB/s. You can add or remove shards of the stream using Kinesis Data Streams APIs to adjust the stream capacity according to the throughout needs of their workloads. This lets you make sure that producer and consumer applications don’t experience any throttling.

As customers adopt data streaming broadly, workloads with data traffic that can increase by millions of events in a few minutes are becoming more common. For these volatile traffic patterns, customers carefully plan capacity, monitor throughput, and in some cases develop processes that automatically change the Kinesis Data Streams stream capacity.

Kinesis Data Streams On-Demand Mode
That is why today we are announcing Kinesis Data Streams On-demand. This new capacity mode eliminates the need for provisioning and managing the capacity for streaming data. Using Kinesis Data Streams On-demand automatically scales the capacity in response to varying data traffic. Customers are charged per gigabyte of data written, read, and stored in the stream, in a pay-per-throughput fashion.

Data streams in the on-demand mode have the same high durability, high availability, low latency, security, and deep AWS integrations that Kinesis Data Streams already provides. Moreover, there are no new APIs to write or read data. All existing Kinesis Data Streams integrations work in the on-demand mode.

Kinesis Data Streams uses the partition key to distribute data across shards. That is why when using Kinesis Data Streams On-demand, you still must specify a partition key for each record to write data into a data stream, as you do today in Kinesis Data Streams using the provisioned mode. In Kinesis Data Streams On-demand, the data stream automatically adapts to handle uneven data distribution patterns. But you must be careful that no partition key exceeds a shard’s limits. If this happens, then you will receive write throttles, and then you can retry these requests.

When a new data stream is created using Kinesis Data Streams On-demand, it gets created with the default capacity of 4 MB/s and 4,000 records per second for writes. Kinesis Data Streams On-demand can automatically scale up to 200 MB/s and 200,000 records per second for writes.

Kinesis Data Streams On-demand accommodates up to double its previous peak write throughput observed in the last 30 days. As your data stream’s write throughput hits a new peak, Kinesis Data Streams automatically scales the stream’s capacity.

For example, if your data stream has a write throughput that varies between 10 MB/s and 40 MB/s, Kinesis Data Streams will make sure that you can easily burst to double the peak—80 MB/s. And, if later on that same data stream reaches a new peak of 50 MB/s, then Kinesis Data Streams will make sure that there is enough capacity to ingest 100 MB/s. However, write throttling can occur if your traffic grows more than double the previous peak in less than 15 minutes.

When to Use Kinesis Data Streams On-demand
On-demand mode is great for customers that have an unknown or variable workload, or who simply don’t want to deal with capacity management. On-demand mode works best for workloads that have even partition key distribution. For example, you run a mobile game that has variable traffic through the week or day, as customers play mostly on nights or weekends. Or, you run a streaming platform that hosts live shows, and you see a sudden increase in demand depending on the guests you have.

In addition, you can switch between on-demand and provision mode twice a day. For example, you run an e-commerce site with predictable traffic. But, starting next month, there will be many marketing campaigns launched globally. You don’t know the impact that those will have on the site traffic. Switch your Kinesis Data Streams to on-demand mode, and now you can enjoy the automated capacity planning and management for your data streams.

Get Started with Kinesis Data Streams On-demand
Create a new data stream with Kinesis Data Streams On-demand from the AWS console, AWS SDKs, AWS Command Line Interface (CLI), and AWS CloudFormation.

To create one from the console, visit the Kinesis console and Create data stream. When selecting the capacity mode, select On-demand.

Creating a data stream

At the end of the page, all of the settings for the new data stream are presented. These settings can be changed after the data stream has been created.

Data stream settings

Let’s See This in Action!
For this demo, I want to show you how the new Kinesis Data Streams capability works. This situation is best described if you at look at the following Amazon CloudWatch graphs. The green line represents the bytes ingested successfully into the stream, and the red line shows the percentage of traffic that is throttled.

First, we will start with a stream provisioned with five shards. For the first three minutes, we are sending a load of 4 MB/s. You can see that the stream can handle the load.

At the time stamp 21:19, we increase the load to 12 MB/s. Now the stream cannot handle the load, and the throttles start (the red line starts climbing up to 60 percent of request being throttled).

Increase the load on a provisioned stream

At the time stamp 21:23, we change the stream capacity from provisioned to on-demand. You can do that on-the-fly without affecting the stream. See that it takes a very short time for the stream to handle the load when converting from one capacity mode to the other.

In a few minutes (time stamp 21:24) the throttles start to drop as the stream starts scaling up. The stream capacity doubles to 10 shards first (time stamp 21:26), and the stream keeps scaling up until each shard has a load of less than 0.5 MB/s. In this way, if the stream suddenly receives double the amount of load, then it has the capacity ready to handle it.

Change to on-demand mode

At the time stamp 21:26, the load in the stream is increased to 18 MB/s. You can see the green line climbing to 350,000 records – there are no throttles, and the stream ends this demo with 40 open shards. This means that if suddenly the stream receives a load of 40 MB/s, then it could handle it with no problem.

Increase the load

Available Now!
The Amazon Kinesis Data Streams On-demand is available globally in all commercial Regions.

You can learn more about the capacity modes in the Amazon Kinesis Data Streams Developer Guide.

Marcia

Field Notes: How to Enable Cross-Account Access for Amazon Kinesis Data Streams using Kinesis Client Library 2.x

Post Syndicated from Uday Narayanan original https://aws.amazon.com/blogs/architecture/field-notes-how-to-enable-cross-account-access-for-amazon-kinesis-data-streams-using-kinesis-client-library-2-x/

Businesses today are dealing with vast amounts of real-time data they need to process and analyze to generate insights. Real-time delivery of data and insights enable businesses to quickly make decisions in response to sensor data from devices, clickstream events, user engagement, and infrastructure events, among many others.

Amazon Kinesis Data Streams offers a managed service that lets you focus on building and scaling your streaming applications for near real-time data processing, rather than managing infrastructure. Customers can write Kinesis Data Streams consumer applications to read data from Kinesis Data Streams and process them per their requirements.

Often, the Kinesis Data Streams and consumer applications reside in the same AWS account. However, there are scenarios where customers follow a multi-account approach resulting in Kinesis Data Streams and consumer applications operating in different accounts. Some reasons for using the multi-account approach are to:

  • Allocate AWS accounts to different teams, projects, or products for rapid innovation, while still maintaining unique security requirements.
  • Simplify AWS billing by mapping AWS costs specific to product or service line.
  • Isolate accounts for specific security or compliance requirements.
  • Scale resources and mitigate hard AWS service limits constrained to a single account.

The following options allow you to access Kinesis Data Streams across accounts.

  • Amazon Kinesis Client Library (KCL) for Java or using the MultiLang Daemon for KCL.
  • Amazon Kinesis Data Analytics for Apache Flink – Cross-account access is supported for both Java and Python. For detailed implementation guidance, review the AWS documentation page for Kinesis Data Analytics.
  • AWS Glue Streaming – The documentation for AWS Glue describes how to configure AWS Glue streaming ETL jobs to access cross-account Kinesis Data Streams.
  • AWS Lambda – Lambda currently does not support cross-account invocations from Amazon Kinesis, but a workaround can be used.

In this blog post, we will walk you through the steps to configure KCL for Java and Python for cross-account access to Kinesis Data Streams.

Overview of solution

As shown in Figure 1, Account A has the Kinesis data stream and Account B has the KCL instances consuming from the Kinesis data stream in Account A. For the purposes of this blog post the KCL code is running on Amazon Elastic Compute Cloud (Amazon EC2).

Figure 1. Steps to access a cross-account Kinesis data stream

The steps to access a Kinesis data stream in one account from a KCL application in another account are:

Step 1 – Create AWS Identity and Access Management (IAM) role in Account A to access the Kinesis data stream with trust relationship with Account B.

Step 2 – Create IAM role in Account B to assume the role in Account A. This role is attached to the EC2 fleet running the KCL application.

Step 3 – Update the KCL application code to assume the role in Account A to read Kinesis data stream in Account A.

Prerequisites

  • KCL for Java version 2.3.4 or later.
  • AWS Security Token Service (AWS STS) SDK.
  • Create a Kinesis data stream named StockTradeStream in Account A and a producer to load data into the stream. If you do not have a producer, you can use the Amazon Kinesis Data Generator to send test data into your Kinesis data stream.

Walkthrough

Step 1 – Create IAM policies and IAM role in Account A

First, we will create an IAM role in Account A, with permissions to access the Kinesis data stream created in the same account. We will also add Account B as a trusted entity to this role.

  1. Create IAM policy kds-stock-trade-stream-policy to access Kinesis data stream in Account A using the following policy definition. This policy restricts access to specific Kinesis data stream.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords",
                "kinesis:ListShards",
                "kinesis:DescribeStreamSummary",
                "kinesis:RegisterStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-east-1:Account-A-AccountNumber:stream/StockTradeStream"
            ]
        },
        {
            "Sid": "Stmt234",
            "Effect": "Allow",
            "Action": [
                "kinesis:SubscribeToShard",
                "kinesis:DescribeStreamConsumer"
            ],
            "Resource": [
                "arn:aws:kinesis:us-east-1:Account-A-AccountNumber:stream/StockTradeStream/*"
            ]
        }
    ]
}

Note: The above policy assumes the name of the Kinesis data stream is StockTradeStream.

  1. Create IAM role kds-stock-trade-stream-role in Account A.
aws iam create-role --role-name kds-stock-trade-stream-role --assume-role-policy-document "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"AWS\":[\"arn:aws:iam::Account-B-AccountNumber:root\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
  1. Attach the kds-stock-trade-stream-policy IAM policy to kds-stock-trade-stream-role role.
aws iam attach-role-policy --policy-arn arn:aws:iam::Account-A-AccountNumber:policy/kds-stock-trade-stream-policy --role-name kds-stock-trade-stream-role

In the above steps, you will have to replace Account-A-AccountNumber with the AWS account number of the account that has the Kinesis data stream and Account-B-AccountNumber will need to be replaced with the AWS account number of the account that has the KCL application

Step 2 – Create IAM policies and IAM role in Account B

We will now create an IAM role in account B to assume the role created in Account A in Step 1. This role will also grant the KCL application access to Amazon DynamoDB and Amazon CloudWatch in Account B. For every KCL application, a DynamoDB table is used to keep track of the shards in a Kinesis data stream that are being leased and processed by the workers of the KCL consumer application. The name of the DynamoDB table is the same as the KCL application name. Similarly, the KCL application needs access to emit metrics to CloudWatch. Because the KCL application is running in Account B, we want to maintain the DynamoDB table and the CloudWatch metrics in the same account as the application code. For this blog post, our KCL application name is StockTradesProcessor.

  1. Create IAM policy kcl-stock-trader-app-policy, with permissions access to DynamoDB and CloudWatch in Account B, and to assume the kds-stock-trade-stream-role role created in Account A.
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AssumeRoleInSourceAccount",
            "Effect": "Allow",
            "Action": "sts:AssumeRole",
            "Resource": "arn:aws:iam::Account-A-AccountNumber:role/kds-stock-trade-stream-role"
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DescribeTable",
                "dynamodb:Scan",
                "dynamodb:PutItem",
                "dynamodb:GetItem",
                "dynamodb:UpdateItem",
                "dynamodb:DeleteItem"
            ],
            "Resource": [
                "arn:aws:dynamodb:us-east-1:Account-B-AccountNumber:table/StockTradesProcessor"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

The above policy gives access to a DynamoDB table StockTradesProcessor. If you change you KCL application name, make sure you change the above policy to reflect the corresponding DynamoDB table name.

  1. Create role kcl-stock-trader-app-role in Account B to assume role in Account A.
aws iam create-role --role-name kcl-stock-trader-app-role --assume-role-policy-document "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":[\"ec2.amazonaws.com\"]},\"Action\":[\"sts:AssumeRole\"]}]}"
  1. Attach the policy kcl-stock-trader-app-policy to the kcl-stock-trader-app-role.
aws iam attach-role-policy --policy-arn arn:aws:iam::Account-B-AccountNumber:policy/kcl-stock-trader-app-policy --role-name kcl-stock-trader-app-role
  1. Create an instance profile with a name as kcl-stock-trader-app-role.
aws iam create-instance-profile --instance-profile-name kcl-stock-trader-app-role
  1. Attach the kcl-stock-trader-app-role role to the instance profile.
aws iam add-role-to-instance-profile --instance-profile-name kcl-stock-trader-app-role --role-name kcl-stock-trader-app-role
  1. Attach the kcl-stock-trader-app-role to the EC2 instances that are running the KCL code.
aws ec2 associate-iam-instance-profile --iam-instance-profile Name=kcl-stock-trader-app-role --instance-id <your EC2 instance>

In the above steps, you will have to replace Account-A-AccountNumber with the AWS account number of the account that has the Kinesis data stream, Account-B-AccountNumber will need to be replaced with the AWS account number of the account which has the KCL application and <your EC2 instance id> will need to be replaced with the correct EC2 instance id. This instance profile should be added to any new EC2 instances of the KCL application that are started.

Step 3 – Update KCL stock trader application to access cross-account Kinesis data stream

KCL application in java

To demonstrate the setup for cross-account access for KCL using Java, we have used the KCL stock trader application as the starting point and modified it to enable access to a Kinesis data stream in another AWS account.

After the IAM policies and roles have been created and attached to the EC2 instance running the KCL application, we will update the main class of the consumer application to enable cross-account access.

Setting up the integrated development environment (IDE)

To download and build the code for the stock trader application, follow these steps:

  1. Clone the source code from the GitHub repository to your computer.
$  git clone https://github.com/aws-samples/amazon-kinesis-learning
Cloning into 'amazon-kinesis-learning'...
remote: Enumerating objects: 169, done.
remote: Counting objects: 100% (77/77), done.
remote: Compressing objects: 100% (37/37), done.
remote: Total 169 (delta 16), reused 56 (delta 8), pack-reused 92
Receiving objects: 100% (169/169), 45.14 KiB | 220.00 KiB/s, done.
Resolving deltas: 100% (29/29), done.
  1. Create a project in your integrated development environment (IDE) with the source code you downloaded in the previous step. For this blog post, we are using Eclipse for our IDE, therefore the instructions will be specific to Eclipse project.
    1. Open Eclipse IDE. Select File -> Import.
      A dialog box will open, as shown in Figure 2.

Figure 2. Create an Eclipse project

  1. Select Maven -> Existing Maven Projects, and select Next. Then you will be prompted to select a folder location for stock trader application.

Figure 3. Select the folder for your project

Select Browse, and navigate to the downloaded source code folder location. The IDE will automatically detect maven pom.xml.

Select Finish to complete the import. IDE will take 2–3 minutes to download all libraries to complete setup stock trader project.

  1. After the setup is complete, the IDE will look like similar to Figure 4.
Figure 4. Final view of pom.xl file after setup is complete

Figure 4. Final view of pom.xl file after setup is complete

  1. Open pom.xml, and replace it with the following content. This will add all the prerequisites and dependencies required to build and package the jar application.
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-learning</artifactId>
    <packaging>jar</packaging>
    <name>Amazon Kinesis Tutorial</name>
    <version>0.0.1</version>
    <description>Tutorial and examples for aws-kinesis-client
    </description>
    <url>https://aws.amazon.com/kinesis</url>

    <scm>
        <url>https://github.com/awslabs/amazon-kinesis-learning.git</url>
    </scm>

    <licenses>
        <license>
            <name>Amazon Software License</name>
            <url>https://aws.amazon.com/asl</url>
            <distribution>repo</distribution>
        </license>
    </licenses>

    <properties>
        <aws-kinesis-client.version>2.3.4</aws-kinesis-client.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>software.amazon.kinesis</groupId>
            <artifactId>amazon-kinesis-client</artifactId>
            <version>2.3.4</version>
        </dependency>
        <dependency>
            <groupId>commons-logging</groupId>
            <artifactId>commons-logging</artifactId>
            <version>1.2</version>
        </dependency>
		<dependency>
		    <groupId>software.amazon.awssdk</groupId>
		    <artifactId>sts</artifactId>
		    <version>2.16.74</version>
		</dependency>
		<dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-simple</artifactId>
       <version>1.7.25</version>
   </dependency>
    </dependencies>

 	<build>
        <finalName>amazon-kinesis-learning</finalName>
        <plugins>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.1</version>

                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>

                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
    
            </plugin>
        </plugins>
    </build>


</project>

Update the main class of consumer application

The updated code for the StockTradesProcessor.java class is shown as follows. The changes made to the class to enable cross-account access are highlighted in bold.

package com.amazonaws.services.kinesis.samples.stocktrades.processor;

import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.sts.StsClient; import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider; import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.KinesisClientUtil;
import software.amazon.kinesis.coordinator.Scheduler;

/**
 * Uses the Kinesis Client Library (KCL) 2.2.9 to continuously consume and process stock trade
 * records from the stock trades stream. KCL monitors the number of shards and creates
 * record processor instances to read and process records from each shard. KCL also
 * load balances shards across all the instances of this processor.
 *
 */
public class StockTradesProcessor {

    private static final Log LOG = LogFactory.getLog(StockTradesProcessor.class);

    private static final Logger ROOT_LOGGER = Logger.getLogger("");
    private static final Logger PROCESSOR_LOGGER =
            Logger.getLogger("com.amazonaws.services.kinesis.samples.stocktrades.processor.StockTradeRecordProcessor");

    private static void checkUsage(String[] args) {
        if (args.length != 5) {
            System.err.println("Usage: " + StockTradesProcessor.class.getSimpleName()
                    + " <application name> <stream name> <region> <role arn> <role session name>");
            System.exit(1);
        }
    }

    /**
     * Sets the global log level to WARNING and the log level for this package to INFO,
     * so that we only see INFO messages for this processor. This is just for the purpose
     * of this tutorial, and should not be considered as best practice.
     *
     */
    private static void setLogLevels() {
        ROOT_LOGGER.setLevel(Level.WARNING);
        // Set this to INFO for logging at INFO level. Suppressed for this example as it can be noisy.
        PROCESSOR_LOGGER.setLevel(Level.WARNING);
    }
    
    private static AwsCredentialsProvider roleCredentialsProvider(String roleArn, String roleSessionName, Region region) { AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder() .roleArn(roleArn) .roleSessionName(roleSessionName) .durationSeconds(900) .build(); LOG.warn("Initializing assume role request session: " + assumeRoleRequest.roleSessionName()); StsClient stsClient = StsClient.builder().region(region).build(); StsAssumeRoleCredentialsProvider stsAssumeRoleCredentialsProvider = StsAssumeRoleCredentialsProvider .builder() .stsClient(stsClient) .refreshRequest(assumeRoleRequest) .asyncCredentialUpdateEnabled(true) .build(); LOG.warn("Initializing sts role credential provider: " + stsAssumeRoleCredentialsProvider.prefetchTime().toString()); return stsAssumeRoleCredentialsProvider; }

    public static void main(String[] args) throws Exception {
        checkUsage(args);

        setLogLevels();

        String applicationName = args[0];
        String streamName = args[1];
        Region region = Region.of(args[2]);
        String roleArn = args[3]; String roleSessionName = args[4]; 
        
        if (region == null) {
            System.err.println(args[2] + " is not a valid AWS region.");
            System.exit(1);
        }
        
        AwsCredentialsProvider awsCredentialsProvider = roleCredentialsProvider(roleArn,roleSessionName, region); KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region).credentialsProvider(awsCredentialsProvider));
        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        StockTradeRecordProcessorFactory shardRecordProcessor = new StockTradeRecordProcessorFactory();
        ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), shardRecordProcessor);

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
        );
        int exitCode = 0;
        try {
            scheduler.run();
        } catch (Throwable t) {
            LOG.error("Caught throwable while processing data.", t);
            exitCode = 1;
        }
        System.exit(exitCode);

    }

}

Let’s review the changes made to the code to understand the key parts of how the cross-account access works.

AssumeRoleRequest assumeRoleRequest = AssumeRoleRequest.builder() .roleArn(roleArn) .roleSessionName(roleSessionName) .durationSeconds(900) .build();

AssumeRoleRequest class is used to get the credentials to access the Kinesis data stream in Account A using the role that was created. The value of the variable assumeRoleRequest is passed to the StsAssumeRoleCredentialsProvider.

StsClient stsClient = StsClient.builder().region(region).build();
StsAssumeRoleCredentialsProvider stsAssumeRoleCredentialsProvider = StsAssumeRoleCredentialsProvider .builder() .stsClient(stsClient) .refreshRequest(assumeRoleRequest) .asyncCredentialUpdateEnabled(true) .build();

StsAssumeRoleCredentialsProvider periodically sends an AssumeRoleRequest to the AWS STS to maintain short-lived sessions to use for authentication. Using refreshRequest, these sessions are updated asynchronously in the background as they get close to expiring. As asynchronous refresh is not set by default, we explicitly set it to true using asyncCredentialUpdateEnabled.

AwsCredentialsProvider awsCredentialsProvider = roleCredentialsProvider(roleArn,roleSessionName, region);
KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region).credentialsProvider(awsCredentialsProvider));

  • KinesisAsyncClient is the client for accessing Kinesis asynchronously. We can create an instance of KinesisAsyncClient by passing to it the credentials to access the Kinesis data stream in Account A through the assume role. The values of Kinesis, DynamoDB, and the CloudWatch client along with the stream name, application name is used to create a ConfigsBuilder instance.
  • The ConfigsBuilder instance is used to create the KCL scheduler (also known as KCL worker in KCL versions 1.x).
  • The scheduler creates a new thread for each shard (assigned to this consumer instance), which continuously loops to read records from the data stream. It then invokes the record processor instance (StockTradeRecordProcessor in this example) to process each batch of records received. This is the class which will contain your record processing logic. The Developing Custom Consumers with Shared Throughput Using KCL section of the documentation will provide more details on the working of KCL.

KCL application in python

In this section we will show you how to configure a KCL application written in Python to access a cross-account Kinesis data stream.

A.      Steps 1 and 2 from earlier remain the same and will need to be completed before moving ahead. After the IAM roles and policies have been created, log into the EC2 instance and clone the amazon-kinesis-client-python repository using the following command.

git clone https://github.com/awslabs/amazon-kinesis-client-python.git

B.      Navigate to the amazon-kinesis-client-python directory and run the following commands.

sudo yum install python-pip
sudo pip install virtualenv
virtualenv /tmp/kclpy-sample-env
source /tmp/kclpy-sample/env/bin/activate
pip install amazon_kclpy

C.      Next, navigate to amazon-kinesis-client-python/samples and open the sample.properties file. The properties file has properties such as streamName, application name, and credential information that lets you customize the configuration for your use case.

D.      We will modify the properties file to change the stream name and application name, and to add the credentials to enable access to a Kinesis data stream in a different account. You can replace the sample.properties file and replace with the following snippet. The bolded sections show the changes we have made.

# The script that abides by the multi-language protocol. This script will

# be executed by the MultiLangDaemon, which will communicate with this script

# over STDIN and STDOUT according to the multi-language protocol.

executableName = sample_kclpy_app.py

# The name of an Amazon Kinesis stream to process.

streamName = StockTradeStream

# Used by the KCL as the name of this application. Will be used as the name

# of an Amazon DynamoDB table which will store the lease and checkpoint

# information for workers with this application name

applicationName = StockTradesProcessor

# Users can change the credentials provider the KCL will use to retrieve credentials.

# The DefaultAWSCredentialsProviderChain checks several other providers, which is

# described here:

# http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html

#AWSCredentialsProvider = DefaultAWSCredentialsProviderChain

AWSCredentialsProvider = STSAssumeRoleSessionCredentialsProvider|arn:aws:iam::Account-A-AccountNumber:role/kds-stock-trade-stream-role|kinesiscrossaccount

AWSCredentialsProviderDynamoDB = DefaultAWSCredentialsProviderChain

AWSCredentialsProviderCloudWatch = DefaultAWSCredentialsProviderChain

# Appended to the user agent of the KCL. Does not impact the functionality of the

# KCL in any other way.

processingLanguage = python/2.7

# Valid options at TRIM_HORIZON or LATEST.

# See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax

initialPositionInStream = LATEST

# The following properties are also available for configuring the KCL Worker that is created

# by the MultiLangDaemon.

# The KCL defaults to us-east-1

#regionName = us-east-1

In the above step, you will have to replace Account-A-AccountNumber with the AWS account number of the account that has the kinesis stream.

We use the STSAssumeRoleSessionCredentialsProvider class and pass to it the role created in Account A which have permissions to access the Kinesis data stream. This gives the KCL application in Account B permissions to read the Kinesis data stream in Account A. The DynamoDB lease table and the CloudWatch metrics are in Account B. Hence, we can use the DefaultAWSCredentialsProviderChain for AWSCredentialsProviderDynamoDB and AWSCredentialsProviderCloudWatch in the properties file. You can now save the sample.properties file.

E.      Next, we will change the application code to print the data read from the Kinesis data stream to standard output (STDOUT). Edit the sample_kclpy_app.py under the samples directory. You will add all your application code logic in the process_record method. This method is called for every record in the Kinesis data stream. For this blog post, we will add a single line to the method to print the records to STDOUT, as shown in Figure 5.

Figure 5. Add custom code to process_record method

Figure 5. Add custom code to process_record method

F.       Save the file, and run the following command to build the project with the changes you just made.

cd amazon-kinesis-client-python/
python setup.py install

G.     Now you are ready to run the application. To start the KCL application, run the following command from the amazon-kinesis-client-python directory.

`amazon_kclpy_helper.py --print_command --java /usr/bin/java --properties samples/sample.properties`

This will start the application. Your application is now ready to read the data from the Kinesis data stream in another account and display the contents of the stream on STDOUT. When the producer starts writing data to the Kinesis data stream in Account A, you will start seeing those results being printed.

Clean Up

Once you are done testing the cross-account access make sure you clean up your environment to avoid incurring cost. As part of the cleanup we recommend you delete the Kinesis data stream, StockTradeStream, the EC2 instances that the KCL application is running on, and the DynamoDB table that was created by the KCL application.

Conclusion

In this blog post, we discussed the techniques to configure your KCL applications written in Java and Python to access a Kinesis data stream in a different AWS account. We also provided sample code and configurations which you can modify and use in your application code to set up the cross-account access. Now you can continue to build a multi-account strategy on AWS, while being able to easily access your Kinesis data streams from applications in multiple AWS accounts.

Building well-architected serverless applications: Optimizing application costs

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/building-well-architected-serverless-applications-optimizing-application-costs/

This series of blog posts uses the AWS Well-Architected Tool with the Serverless Lens to help customers build and operate applications using best practices. In each post, I address the serverless-specific questions identified by the Serverless Lens along with the recommended best practices. See the introduction post for a table of contents and explanation of the example application.

COST 1. How do you optimize your serverless application costs?

Design, implement, and optimize your application to maximize value. Asynchronous design patterns and performance practices ensure efficient resource use and directly impact the value per business transaction. By optimizing your serverless application performance and its code patterns, you can directly impact the value it provides, while making more efficient use of resources.

Serverless architectures are easier to manage in terms of correct resource allocation compared to traditional architectures. Due to its pay-per-value pricing model and scale based on demand, a serverless approach effectively reduces the capacity planning effort. As covered in the operational excellence and performance pillars, optimizing your serverless application has a direct impact on the value it produces and its cost. For general serverless optimization guidance, see the AWS re:Invent talks, “Optimizing your Serverless applications” Part 1 and Part 2, and “Serverless architectural patterns and best practices”.

Required practice: Minimize external calls and function code initialization

AWS Lambda functions may call other managed services and third-party APIs. Functions may also use application dependencies that may not be suitable for ephemeral environments. Understanding and controlling what your function accesses while it runs can have a direct impact on value provided per invocation.

Review code initialization

I explain the Lambda initialization process with cold and warm starts in “Optimizing application performance – part 1”. Lambda reports the time it takes to initialize application code in Amazon CloudWatch Logs. As Lambda functions are billed by request and duration, you can use this to track costs and performance. Consider reviewing your application code and its dependencies to improve the overall execution time to maximize value.

You can take advantage of Lambda execution environment reuse to make external calls to resources and use the results for subsequent invocations. Use TTL mechanisms inside your function handler code. This ensures that you can prevent additional external calls that incur additional execution time, while preemptively fetching data that isn’t stale.

Review third-party application deployments and permissions

When using Lambda layers or applications provisioned by AWS Serverless Application Repository, be sure to understand any associated charges that these may incur. When deploying functions packaged as container images, understand the charges for storing images in Amazon Elastic Container Registry (ECR).

Ensure that your Lambda function only has access to what its application code needs. Regularly review that your function has a predicted usage pattern so you can factor in the cost of other services, such as Amazon S3 and Amazon DynamoDB.

Required practice: Optimize logging output and its retention

Considering reviewing your application logging level. Ensure that logging output and log retention are appropriately set to your operational needs to prevent unnecessary logging and data retention. This helps you have the minimum of log retention to investigate operational and performance inquiries when necessary.

Emit and capture only what is necessary to understand and operate your component as intended.

With Lambda, any standard output statements are sent to CloudWatch Logs. Capture and emit business and operational events that are necessary to help you understand your function, its integration, and its interactions. Use a logging framework and environment variables to dynamically set a logging level. When applicable, sample debugging logs for a percentage of invocations.

In the serverless airline example used in this series, the booking service Lambda functions use Lambda Powertools as a logging framework with output structured as JSON.

Lambda Powertools is added to the Lambda functions as a shared Lambda layer in the AWS Serverless Application Model (AWS SAM) template. The layer ARN is stored in Systems Manager Parameter Store.

Parameters:
  SharedLibsLayer:
    Type: AWS::SSM::Parameter::Value<String>
    Description: Project shared libraries Lambda Layer ARN
Resources:
    ConfirmBooking:
        Type: AWS::Serverless::Function
        Properties:
            FunctionName: !Sub ServerlessAirline-ConfirmBooking-${Stage}
            Handler: confirm.lambda_handler
            CodeUri: src/confirm-booking
            Layers:
                - !Ref SharedLibsLayer
            Runtime: python3.7
…

The LOG_LEVEL and other Powertools settings are configured in the Globals section as Lambda environment variable for all functions.

Globals:
    Function:
        Environment:
            Variables:
                POWERTOOLS_SERVICE_NAME: booking
                POWERTOOLS_METRICS_NAMESPACE: ServerlessAirline
                LOG_LEVEL: INFO 

For Amazon API Gateway, there are two types of logging in CloudWatch: execution logging and access logging. Execution logs contain information that you can use to identify and troubleshoot API errors. API Gateway manages the CloudWatch Logs, creating the log groups and log streams. Access logs contain details about who accessed your API and how they accessed it. You can create your own log group or choose an existing log group that could be managed by API Gateway.

Enable access logs, and selectively review the output format and request fields that might be necessary. For more information, see “Setting up CloudWatch logging for a REST API in API Gateway”.

API Gateway logging

API Gateway logging

Enable AWS AppSync logging which uses CloudWatch to monitor and debug requests. You can configure two types of logging: request-level and field-level. For more information, see “Monitoring and Logging”.

AWS AppSync logging

AWS AppSync logging

Define and set a log retention strategy

Define a log retention strategy to satisfy your operational and business needs. Set log expiration for each CloudWatch log group as they are kept indefinitely by default.

For example, in the booking service AWS SAM template, log groups are explicitly created for each Lambda function with a parameter specifying the retention period.

Parameters:
    LogRetentionInDays:
        Type: Number
        Default: 14
        Description: CloudWatch Logs retention period
Resources:
    ConfirmBookingLogGroup:
        Type: AWS::Logs::LogGroup
        Properties:
            LogGroupName: !Sub "/aws/lambda/${ConfirmBooking}"
            RetentionInDays: !Ref LogRetentionInDays

The Serverless Application Repository application, auto-set-log-group-retention can update the retention policy for new and existing CloudWatch log groups to the specified number of days.

For log archival, you can export CloudWatch Logs to S3 and store them in Amazon S3 Glacier for more cost-effective retention. You can use CloudWatch Log subscriptions for custom processing, analysis, or loading to other systems. Lambda extensions allows you to process, filter, and route logs directly from Lambda to a destination of your choice.

Good practice: Optimize function configuration to reduce cost

Benchmark your function using a different set of memory size

For Lambda functions, memory is the capacity unit for controlling the performance and cost of a function. You can configure the amount of memory allocated to a Lambda function, between 128 MB and 10,240 MB. The amount of memory also determines the amount of virtual CPU available to a function. Benchmark your AWS Lambda functions with differing amounts of memory allocated. Adding more memory and proportional CPU may lower the duration and reduce the cost of each invocation.

In “Optimizing application performance – part 2”, I cover using AWS Lambda Power Tuning to automate the memory testing process to balances performance and cost.

Best practice: Use cost-aware usage patterns in code

Reduce the time your function runs by reducing job-polling or task coordination. This avoids overpaying for unnecessary compute time.

Decide whether your application can fit an asynchronous pattern

Avoid scenarios where your Lambda functions wait for external activities to complete. I explain the difference between synchronous and asynchronous processing in “Optimizing application performance – part 1”. You can use asynchronous processing to aggregate queues, streams, or events for more efficient processing time per invocation. This reduces wait times and latency from requesting apps and functions.

Long polling or waiting increases the costs of Lambda functions and also reduces overall account concurrency. This can impact the ability of other functions to run.

Consider using other services such as AWS Step Functions to help reduce code and coordinate asynchronous workloads. You can build workflows using state machines with long-polling, and failure handling. Step Functions also supports direct service integrations, such as DynamoDB, without having to use Lambda functions.

In the serverless airline example used in this series, Step Functions is used to orchestrate the Booking microservice. The ProcessBooking state machine handles all the necessary steps to create bookings, including payment.

Booking service state machine

Booking service state machine

To reduce costs and improves performance with CloudWatch, create custom metrics asynchronously. You can use the Embedded Metrics Format to write logs, rather than the PutMetricsData API call. I cover using the embedded metrics format in “Understanding application health” – part 1 and part 2.

For example, once a booking is made, the logs are visible in the CloudWatch console. You can select a log stream and find the custom metric as part of the structured log entry.

Custom metric structured log entry

Custom metric structured log entry

CloudWatch automatically creates metrics from these structured logs. You can create graphs and alarms based on them. For example, here is a graph based on a BookingSuccessful custom metric.

CloudWatch metrics custom graph

CloudWatch metrics custom graph

Consider asynchronous invocations and review run away functions where applicable

Take advantage of Lambda’s event-based model. Lambda functions can be triggered based on events ingested into Amazon Simple Queue Service (SQS) queues, S3 buckets, and Amazon Kinesis Data Streams. AWS manages the polling infrastructure on your behalf with no additional cost. Avoid code that polls for third-party software as a service (SaaS) providers. Rather use Amazon EventBridge to integrate with SaaS instead when possible.

Carefully consider and review recursion, and establish timeouts to prevent run away functions.

Conclusion

Design, implement, and optimize your application to maximize value. Asynchronous design patterns and performance practices ensure efficient resource use and directly impact the value per business transaction. By optimizing your serverless application performance and its code patterns, you can reduce costs while making more efficient use of resources.

In this post, I cover minimizing external calls and function code initialization. I show how to optimize logging output with the embedded metrics format, and log retention. I recap optimizing function configuration to reduce cost and highlight the benefits of asynchronous event-driven patterns.

This post wraps up the series, building well-architected serverless applications, where I cover the AWS Well-Architected Tool with the Serverless Lens . See the introduction post for links to all the blog posts.

For more serverless learning resources, visit Serverless Land.

 

Field Notes: How to Scale OpenTravel Messaging Architecture with Amazon Kinesis Data Streams

Post Syndicated from Danny Gagne original https://aws.amazon.com/blogs/architecture/field-notes-how-to-scale-opentravel-messaging-architecture-with-amazon-kinesis-data-streams/

The travel industry relies on OpenTravel messaging systems to collect and distribute travel data—like hotel inventory and pricing—to many independent ecommerce travel sites. These travel sites need immediate access to the most current hotel inventory and pricing data. This allows shoppers access to the available rooms at the right prices. Each time a room is reserved, unreserved, or there is a price change, an ordered message with minimal latency must be sent from the hotel system to the travel site.

Overview of Solution

In this blog post, we will describe the architectural considerations needed to scale a low latency FIFO messaging system built with Amazon Kinesis Data Streams.

The architecture must satisfy the following constraints:

  1. Messages must arrive in order to each destination, with respect to their hotel code.
  2. Messages are delivered to multiple destinations independently.

Kinesis Data Streams is a managed service that enables real-time processing of streaming records. The service provides ordering of records, as well as the ability to read and replay records in the same order to multiple Amazon Kinesis applications. Kinesis data stream applications can also consume data in parallel from a stream through parallelization of consumers.

We will start with an architecture that supports one hotel with one destination, and scale it to support 1,000 hotels and 100 destinations (38,051 records per second). With the OpenTravel use case, the order of messages matters only within a stream of messages from a given hotel.

Smallest scale: One hotel with one processed delivery destination. One million messages a month.

Figure 1: Architecture showing a system sending messages from 1 hotel to 1 destination (.3805 RPS)

Figure 1: Architecture showing a system sending messages from 1 hotel to 1 destination (.3805 RPS)

Full Scale: 1,000 Hotels with 100 processed delivery destination. 100 billion messages a month.

Figure 2: Architecture showing a system sending messages from 1 hotel to 1 destination (38,051 RPS)

Figure 2: Architecture showing a system sending messages from 1 hotel to 1 destination (38,051 RPS)

In the example application, OpenTravel XML message data is the record payload. The record is written into the stream with a producer. The record is read from the stream shard by the consumer and sent to several HTTP destinations.

Each Kinesis data stream shard supports writes up to 1,000 records per second, up to a maximum data write total of 1 MB per second. Each shard supports reads up to five transactions per second, up to a maximum data read total of 2 MB per second. There is no upper quota on the number of streams you can have in an account.

Streams Shards Writes/Input limit Reads/Output limit
1 1 1 MB per second
1,000 records per second
2 MB per second
1 500 500 MB per second
500,000 records per second
1,000 MB per second

OpenTravel message

In the following OpenTravel message, there is only one field that is important to our use case: HotelCode. In OTA (OpenTravel Agency) messaging, order matters, but it only matters in the context of a given hotel, specified by the HotelCode. As we scale up our solution, we will use the HotelCode as a partition key. Each destination receives the same messages. If we are sending 100 destinations, then we will send the message 100 times. The average message size is 4 KB.

<HotelAvailNotif>
    <request>
        <POS>
            <Source>
                <RequestorID ID = "1000">
                </RequestorID>
                <BookingChannel>
                    <CompanyName Code = "ClientTravelAgency1">
                    </CompanyName>
                </BookingChannel>
            </Source>
        </POS>
   <AvailStatusMessages HotelCode="100">
     <AvailStatusMessage BookingLimit="9">
         <StatusApplicationControl Start="2013-12-20" End="2013-12-25" RatePlanCode="BAR" InvCode="APT" InvType="ROOM" Mon="true" Tue="true" Weds="true" Thur="false"  Fri="true" Sat="true" Sun="true"/>
         <LengthsOfStay ArrivalDateBased="true">
       <LengthOfStay Time="2" TimeUnit="Day" MinMaxMessageType="MinLOS"/>
       <LengthOfStay Time="8" TimeUnit="Day" MinMaxMessageType="MaxLOS"/>
         </LengthsOfStay>
         <RestrictionStatus Status="Open" SellThroughOpenIndicator="false" MinAdvancedBookingOffset="5"/>
     </AvailStatusMessage>    
    </AvailStatusMessages>
 </request>
</HotelAvailNotif>

Source: https://github.com/XML-Travelgate/xtg-content-articles-pub/blob/master/docs/hotel-push/HotelPUSH.md 

Producer and consumer message error handling

Asynchronous message processing presents challenges with error handling, because an error can be logged to the producer or consumer application logs. Short-lived errors can be resolved with a delayed retry. However, there are cases when the data is bad and the message will always cause an error; these messages should be added to a dead letter queue (DLQ).

Producer retries and consumer retries are some of reasons why records may be delivered more than once. Kinesis Data Streams does not automatically remove duplicate records. If the destination needs a strict guarantee of uniqueness, the message must have a primary key to remove duplicates when processed in the client (Handling Duplicate Records). In most cases, the destination can mitigate duplicate messages by processing the same message multiple times in a way that produces the same result (idempotence). If a destination endpoint becomes unavailable or is not performant, the consumer should back off and retry until the endpoint is available. The failure of a single destination endpoint should not impact the performance of delivery of messages to other destinations.

Messaging with one hotel and one destination

When starting at the smallest scale—one hotel and one destination—the system must support one million messages per month. This volume expects input of 4 GB of message data per month at a rate of 0.3805 records per second, and .0015 MB per second both input and output. For the system to process one hotel to one destination, it needs at least one producer, one stream, one shard, and a single consumer.

Hotels/Destinations Streams Shards Consumers (standard) Input Output
1 hotel, 1 destination
(4-KB average record size)
1 1 1 0.3805 records per second, or .0015 MB per second 0.3805 records per second, or .0015 MB per second
Maximum stream limits 1 1 1 250 (4-KB records) per second
1 MB per second (per stream)
500 (4-KB records) per second
2 MB per second (per stream)

With this design, the single shard supports writes up to 250 records per second, up to a maximum data write total of 1 MB per second. PutRecords request supports up to 500 records and each record in the request can be as large as 1 MB. Because the average message size is 4 KB, it enables 250 records per second to be written to the shard.

Figure 3: Architecture using Amazon Kinesis Data Streams (1 hotel, 1 destination)

The single shard can also support consumer reads up to five transactions per second, up to a maximum data read total of 2 MB per second. The producer writes to the shard in FIFO order. A consumer pulls from the shard and delivers to the destination in the same order as received. In this one hotel and one destination example, the consumer read capacity would be 500 records per second.

Figure 4: Records passing through the architecture

Messaging with 1,000 hotels and one destination

Next, we will maintain a single destination and scale the number of hotels from one hotel to 1,000 hotels. This scale increases the producer inputs from one million messages to one billion messages a month. This volume expects an input of 4 TB of message data per month at a rate of 380 records per second, and 1.486 MB per second.

Hotels/Destinations Streams Shards Consumers (standard) Input Output
1,000 hotels,
1 destination
(4-KB average
record size)
1 2 1 380 records per second, or
1.486 MB
per second
380 records per second, or
1.486 MB
per second
Maximum stream limits 1 2 1 500 (4-KB records) per second
2 MB per second
(per stream)
1,000 (4-KB records) per second
4 MB per second
(per stream)

The volume of incoming messages increased to 1.486 MB per second, requiring one additional shard. A shard can be split using the API to increase the overall throughput capacity. The write capacity is increased by distributing messages to the shards. We will use the HotelCode as a partition key, to maintain order of messages with respect to a given hotel. This will distribute the writes into the two shards. The messages for each HotelCode will be written into the same shard and maintain FIFO order. By using the HotelCode as a partition key, it doubles the stream write capacity to 2 MB per second.

A single consumer can read the records from each shard at 2 MB per second per shard. The consumer will iterate through each shard, processing the records in FIFO order based on the HotelCode. It then sends messages to the destination in the same order they were sent from the producer.

Messaging with 1,000 hotels and five destinations

As we scale up further, we maintain 1,000 hotels and scale the delivery to five destinations. This increases the consumer reads from one billion messages to five billion messages a month. It has the same input of 4 TB of message data per month at a rate of 380 records per second, and 1.486 MB per second. The output is now 326 GB of message data per month at a rate of 1,902.5 records per second, and 7.43 MB per second.

Hotels/Destinations Streams Shards Consumers (standard) Input Output
1,000 hotels,
5 destinations
(4-KB average
record size)
1 4 5 380 records per second, or 1.486 MB per second 1,902.5 records
per second, or
7.43 MB per second
Maximum stream capacity 1 4 5 1,000 (4-KB records) records per second
(per shard)
4 MB per second (per stream)
2,000 (4-KB records) records per second per shard with a
standard consumer
8 MB per second
(per stream)

To support this scale, we increase the number of consumers to match the destinations. A dedicated consumer for each destination allows the system to have committed capacity to each destination. If a destination fails or becomes slow, it will not impact other destination consumers.

Figure 5: Four Kinesis shards to support read throughput

We increase to four shards to support additional read throughput required for the increased destination consumers. Each destination consumer iterates through the messages in each shard in FIFO order based on the HotelCode. It then sends the messages to the assigned destination maintaining the hotel specific ordering. Like in the previous 1,000-to-1 design, the messages for each HotelCode are written into the same shard while maintaining its FIFO order.

The distribution of messages per HotelCode should be monitored with the metrics like WriteProvisionedThroughputExceeded and ReadProvisionedThroughputExceeded to ensure an even distribution between shards, because HotelCode is the partition key. If there is uneven distribution, it may require a dedicated shard for each HotelCode. In the following examples, it is assumed that there is an even distribution of messages.

Figure 6: Architecture showing 1,000 Hotels, 4 Shards, 5 Consumers, and 5 Destinations

Messaging with 1,000 hotels and 100 destinations

With 1,000 hotels and 100 destinations, the system processes 100 billion messages per month. It continues to have the same number of incoming messages, 4 TB of message data per month, at a rate of 380 records per second, and 1.486 MB per second. The number of destinations has increased from 4 to 100, resulting in 390 TB of message data per month, at a rate of 38,051 records per second, and 148.6 MB per second.

Hotels/Destinations Streams Shards Consumers (standard) Input Output
1,000 hotels,
100 destinations
(4-KB average
record size)
1 78 100 380 records per second, or
1.486 MB per second
38,051 records per second, or
148.6 MB per second
Maximum stream capacity 1 78 100 19,500 (4-KB records) records per second (per stream)
78 MB per second (per stream)
39,000 (4-KB records) records per second per stream with a standard consumer
156 MB per second
(per stream)

The number of shards increases from four to 78 to support the required read throughput needed for 100 consumers, to attain a read capacity of 156 MB per second with 78 shards.

 

Figure 7: Architecture with 1,000 hotels, 78 Kinesis shards, 100 consumers, and 100 destinations

Next, we will look at a different architecture using a different type of consumer called the enhanced fan-out consumer. The enhanced fan-out consumer will improve the efficiency of the stream throughput and processing efficiency.

Lambda enhanced fan-out consumers

Enhanced fan-out consumers can increase the per shard read consumption throughput through event-based consumption, reduce latency with parallelization, and support error handling. The enhanced fan-out consumer increases the read capacity of consumers from a shared 2 MB per second, to a dedicated 2 MB per second for each consumer.

When using Lambda as an enhanced fan-out consumer, you can use the Event Source Mapping Parallelization Factor to have one Lambda pull from one shard concurrently with up to 10 parallel invocations per consumer. Each parallelized invocation contains messages with the same partition key (HotelCode) and maintains order. The invocations complete each message before processing with the next parallel invocation.

Figure 8: Lambda fan-out consumers with parallel invocations, maintaining order

Consumers can gain performance improvements by setting a larger batch size for each invocation. When setting the Event Source Mapping batch size, the consumer can set the maximum number of records that the Lambda will retrieve from the stream at the time of invoking the function. The Event Source Mapping batch window can be used to adjust the time to wait to gather records for the batch before invoking the function.

The Lambda Event Source Mapping has a setting ReportBatchItemFailures that can be used to keep track of the last successfully processed record. When the next invocation of the Lambda starts the batch from the checkpoint, it starts with the failed record. This will occur until the maximum number of retries for the failed record occurs and it is expired. If this feature is enabled and a failure occurs, Lambda will prioritize checkpointing, over other set mechanisms, to minimize duplicate processing.

Lambda has built-in support for sending old or exhausted retries to an on-failure destination with the option of Amazon Simple Queue Service as a DLQ or SNS topic. The Lambda consumer can be configured with maximum record retries and maximum record age, so repeated failures are sent to a DLQ and handled with a separate Lambda function.

Figure 9: Lambda fan-out consumers with parallel invocations, and error handling

Messaging with Lambda fan-out consumers at scale

In the 1,000 hotel and 100 destination scenario, we will scale by shard and stream. Lambda fan-out consumers have a hard quota of 20 fanout-out consumers per stream. With one consumer per destination, 100 fan-out consumers will need five streams. The producer will write to one stream, which has a consumer that writes to the five streams that our destination consumers are reading from.

Hotels/Destinations Streams Shards Consumers (fan-out) Input Output
1,000 hotels,
100 destinations
(4-KB average
record size)
5 10 (2 each stream) 100 380 records
per second, or
74.3 MB per second
38,051 records
per second, or
148.6 MB per second
Maximum stream capacity 5 10 (2 each stream) 100 500 (4-KB records) per second per stream
2 MB per second per stream
40,000 (4-KB records) per second per stream
200,000 (4-KB records) per second with 5 streams
40 MB per second per stream
200 MB per second with 5 streams

 

Figure 10: Architecture 1,000 hotels, 100 destinations, multiple streams, and fan-out consumers

This architecture increases throughput to 200 MB per second, with only 12 shards, compared to 78 shards with standard consumers.

Conclusion

In this blog post, we explained how to use Kinesis Data Streams to process low latency ordered messages at scale with OpenTravel data. We reviewed the aspects of efficient processing and message consumption, scaling considerations, and error handling scenarios.  We explored multiple architectures, one dimension of scale at a time, to demonstrate several considerations for scaling the OpenTravel messaging system.

References

Building well-architected serverless applications: Optimizing application performance – part 2

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/building-well-architected-serverless-applications-optimizing-application-performance-part-2/

This series of blog posts uses the AWS Well-Architected Tool with the Serverless Lens to help customers build and operate applications using best practices. In each post, I address the serverless-specific questions identified by the Serverless Lens along with the recommended best practices. See the introduction post for a table of contents and explanation of the example application.

PERF 1. Optimizing your serverless application’s performance

This post continues part 1 of this security question. Previously, I cover measuring and optimizing function startup time. I explain cold and warm starts and how to reuse the Lambda execution environment to improve performance. I show a number of ways to analyze and optimize the initialization startup time. I explain how only importing necessary libraries and dependencies increases application performance.

Good practice: Design your function to take advantage of concurrency via asynchronous and stream-based invocations

AWS Lambda functions can be invoked synchronously and asynchronously.

Favor asynchronous over synchronous request-response processing.

Consider using asynchronous event processing rather than synchronous request-response processing. You can use asynchronous processing to aggregate queues, streams, or events for more efficient processing time per invocation. This reduces wait times and latency from requesting apps and functions.

When you invoke a Lambda function with a synchronous invocation, you wait for the function to process the event and return a response.

Synchronous invocation

Synchronous invocation

As synchronous processing involves a request-response pattern, the client caller also needs to wait for a response from a downstream service. If the downstream service then needs to call another service, you end up chaining calls that can impact service reliability, in addition to response times. For example, this POST /order request must wait for the response to the POST /invoice request before responding to the client caller.

Example synchronous processing

Example synchronous processing

The more services you integrate, the longer the response time, and you can no longer sustain complex workflows using synchronous transactions.

Asynchronous processing allows you to decouple the request-response using events without waiting for a response from the function code. This allows you to perform background processing without requiring the client to wait for a response, improving client performance. You pass the event to an internal Lambda queue for processing and Lambda handles the rest. An external process, separate from the function, manages polling and retries. Using this asynchronous approach can also make it easier to handle unpredictable traffic with significant volumes.

Asynchronous invocation

Asynchronous invocation

For example, the client makes a POST /order request to the order service. The order service accepts the request and returns that it has been received, without waiting for the invoice service. The order service then makes an asynchronous POST /invoice request to the invoice service, which can then process independently of the order service. If the client must receive data from the invoice service, it can handle this separately via a GET /invoice request.

Example asynchronous processing

Example asynchronous processing

You can configure Lambda to send records of asynchronous invocations to another destination service. This helps you to troubleshoot your invocations. You can also send messages or events that can’t be processed correctly into a dedicated Amazon Simple Queue Service (SQS) dead-letter queue for investigation.

You can add triggers to a function to process data automatically. For more information on which processing model Lambda uses for triggers, see “Using AWS Lambda with other services”.

Asynchronous workflows handle a variety of use cases including data Ingestion, ETL operations, and order/request fulfillment. In these use-cases, data is processed as it arrives and is retrieved as it changes. For example asynchronous patterns, see “Serverless Data Processing” and “Serverless Event Submission with Status Updates”.

For more information on Lambda synchronous and asynchronous invocations, see the AWS re:Invent presentation “Optimizing your serverless applications”.

Tune batch size, batch window, and compress payloads for high throughput

When using Lambda to process records using Amazon Kinesis Data Streams or SQS, there are a number of tuning parameters to consider for performance.

You can configure a batch window to buffer messages or records for up to 5 minutes. You can set a limit of the maximum number of records Lambda can process by setting a batch size. Your Lambda function is invoked whichever comes first.

For high volume SQS standard queue throughput, Lambda can process up to 1000 concurrent batches of records per second. For more information, see “Using AWS Lambda with Amazon SQS”.

For high volume Kinesis Data Streams throughput, there are a number of options. Configure the ParallelizationFactor setting to process one shard of a Kinesis Data Stream with more than one Lambda invocation simultaneously. Lambda can process up to 10 batches in each shard. For more information, see “New AWS Lambda scaling controls for Kinesis and DynamoDB event sources.” You can also add more shards to your data stream to increase the speed at which your function can process records. This increases the function concurrency at the expense of ordering per shard. For more details on using Kinesis and Lambda, see “Monitoring and troubleshooting serverless data analytics applications”.

Kinesis enhanced fan-out can maximize throughput by dedicating a 2 MB/second input/output channel per second per consumer instead of 2 MB per shard. For more information, see “Increasing stream processing performance with Enhanced Fan-Out and Lambda”.

Kinesis stream producers can also compress records. This is at the expense of additional CPU cycles for decompressing the records in your Lambda function code.

Required practice: Measure, evaluate, and select optimal capacity units

Capacity units are a unit of consumption for a service. They can include function memory size, number of stream shards, number of database reads/writes, request units, or type of API endpoint. Measure, evaluate and select capacity units to enable optimal configuration of performance, throughput, and cost.

Identify and implement optimal capacity units.

For Lambda functions, memory is the capacity unit for controlling the performance of a function. You can configure the amount of memory allocated to a Lambda function, between 128 MB and 10,240 MB. The amount of memory also determines the amount of virtual CPU available to a function. Adding more memory proportionally increases the amount of CPU, increasing the overall computational power available. If a function is CPU-, network- or memory-bound, then changing the memory setting can dramatically improve its performance.

Choosing the memory allocated to Lambda functions is an optimization process that balances performance (duration) and cost. You can manually run tests on functions by selecting different memory allocations and measuring the time taken to complete. Alternatively, use the AWS Lambda Power Tuning tool to automate the process.

The tool allows you to systematically test different memory size configurations and depending on your performance strategy – cost, performance, balanced – it identifies what is the most optimum memory size to use. For more information, see “Operating Lambda: Performance optimization – Part 2”.

AWS Lambda Power Tuning report

AWS Lambda Power Tuning report

Amazon DynamoDB manages table processing throughput using read and write capacity units. There are two different capacity modes, on-demand and provisioned.

On-demand capacity mode supports up to 40K read/write request units per second. This is recommended for unpredictable application traffic and new tables with unknown workloads. For higher and predictable throughputs, provisioned capacity mode along with DynamoDB auto scaling is recommended. For more information, see “Read/Write Capacity Mode”.

For high throughput Amazon Kinesis Data Streams with multiple consumers, consider using enhanced fan-out for dedicated 2 MB/second throughput per consumer. When possible, use Kinesis Producer Library and Kinesis Client Library for effective record aggregation and de-aggregation.

Amazon API Gateway supports multiple endpoint types. Edge-optimized APIs provide a fully managed Amazon CloudFront distribution. These are better for geographically distributed clients. API requests are routed to the nearest CloudFront Point of Presence (POP), which typically improves connection time.

Edge-optimized API Gateway deployment

Edge-optimized API Gateway deployment

Regional API endpoints are intended when clients are in the same Region. This helps you to reduce request latency and allows you to add your own content delivery network if necessary.

Regional endpoint API Gateway deployment

Regional endpoint API Gateway deployment

Private API endpoints are API endpoints that can only be accessed from your Amazon Virtual Private Cloud (VPC) using an interface VPC endpoint. For more information, see “Creating a private API in Amazon API Gateway”.

For more information on endpoint types, see “Choose an endpoint type to set up for an API Gateway API”. For more general information on API Gateway, see the AWS re:Invent presentation “I didn’t know Amazon API Gateway could do that”.

AWS Step Functions has two workflow types, standard and express. Standard Workflows have exactly once workflow execution and can run for up to one year. Express Workflows have at-least-once workflow execution and can run for up to five minutes. Consider the per-second rates you require for both execution start rate and the state transition rate. For more information, see “Standard vs. Express Workflows”.

Performance load testing is recommended at both sustained and burst rates to evaluate the effect of tuning capacity units. Use Amazon CloudWatch service dashboards to analyze key performance metrics including load testing results. I cover performance testing in more detail in “Regulating inbound request rates – part 1”.

For general serverless optimization information, see the AWS re:Invent presentation “Serverless at scale: Design patterns and optimizations”.

Conclusion

Evaluate and optimize your serverless application’s performance based on access patterns, scaling mechanisms, and native integrations. You can improve your overall experience and make more efficient use of the platform in terms of both value and resources.

This post continues from part 1 and looks at designing your function to take advantage of concurrency via asynchronous and stream-based invocations. I cover measuring, evaluating, and selecting optimal capacity units.

This well-architected question will continue in part 3 where I look at integrating with managed services directly over functions when possible. I cover optimizing access patterns and applying caching where applicable.

For more serverless learning resources, visit Serverless Land.

How MEDHOST’s cardiac risk prediction successfully leveraged AWS analytic services

Post Syndicated from Pandian Velayutham original https://aws.amazon.com/blogs/big-data/how-medhosts-cardiac-risk-prediction-successfully-leveraged-aws-analytic-services/

MEDHOST has been providing products and services to healthcare facilities of all types and sizes for over 35 years. Today, more than 1,000 healthcare facilities are partnering with MEDHOST and enhancing their patient care and operational excellence with its integrated clinical and financial EHR solutions. MEDHOST also offers a comprehensive Emergency Department Information System with business and reporting tools. Since 2013, MEDHOST’s cloud solutions have been utilizing Amazon Web Services (AWS) infrastructure, data source, and computing power to solve complex healthcare business cases.

MEDHOST can utilize the data available in the cloud to provide value-added solutions for hospitals solving complex problems, like predicting sepsis, cardiac risk, and length of stay (LOS) as well as reducing re-admission rates. This requires a solid foundation of data lake and elastic data pipeline to keep up with multi-terabyte data from thousands of hospitals. MEDHOST has invested a significant amount of time evaluating numerous vendors to determine the best solution for its data needs. Ultimately, MEDHOST designed and implemented machine learning/artificial intelligence capabilities by leveraging AWS Data Lab and an end-to-end data lake platform that enables a variety of use cases such as data warehousing for analytics and reporting.

Since you’re reading this post, you may also be interested in the following:

Getting started

MEDHOST’s initial objectives in evaluating vendors were to:

  • Build a low-cost data lake solution to provide cardiac risk prediction for patients based on health records
  • Provide an analytical solution for hospital staff to improve operational efficiency
  • Implement a proof of concept to extend to other machine learning/artificial intelligence solutions

The AWS team proposed AWS Data Lab to architect, develop, and test a solution to meet these objectives. The collaborative relationship between AWS and MEDHOST, AWS’s continuous innovation, excellent support, and technical solution architects helped MEDHOST select AWS over other vendors and products. AWS Data Lab’s well-structured engagement helped MEDHOST define clear, measurable success criteria that drove the implementation of the cardiac risk prediction and analytical solution platform. The MEDHOST team consisted of architects, builders, and subject matter experts (SMEs). By connecting MEDHOST experts directly to AWS technical experts, the MEDHOST team gained a quick understanding of industry best practices and available services allowing MEDHOST team to achieve most of the success criteria at the end of a four-day design session. MEDHOST is now in the process of moving this work from its lower to upper environment to make the solution available for its customers.

Solution

For this solution, MEDHOST and AWS built a layered pipeline consisting of ingestion, processing, storage, analytics, machine learning, and reinforcement components. The following diagram illustrates the Proof of Concept (POC) that was implemented during the four-day AWS Data Lab engagement.

Ingestion layer

The ingestion layer is responsible for moving data from hospital production databases to the landing zone of the pipeline.

The hospital data was stored in an Amazon RDS for PostgreSQL instance and moved to the landing zone of the data lake using AWS Database Migration Service (DMS). DMS made migrating databases to the cloud simple and secure. Using its ongoing replication feature, MEDHOST and AWS implemented change data capture (CDC) quickly and efficiently so MEDHOST team could spend more time focusing on the most interesting parts of the pipeline.

Processing layer

The processing layer was responsible for performing extract, tranform, load (ETL) on the data to curate them for subsequent uses.

MEDHOST used AWS Glue within its data pipeline for crawling its data layers and performing ETL tasks. The hospital data copied from RDS to Amazon S3 was cleaned, curated, enriched, denormalized, and stored in parquet format to act as the heart of the MEDHOST data lake and a single source of truth to serve any further data needs. During the four-day Data Lab, MEDHOST and AWS targeted two needs: powering MEDHOST’s data warehouse used for analytics and feeding training data to the machine learning prediction model. Even though there were multiple challenges, data curation is a critical task which requires an SME. AWS Glue’s serverless nature, along with the SME’s support during the Data Lab, made developing the required transformations cost efficient and uncomplicated. Scaling and cluster management was addressed by the service, which allowed the developers to focus on cleaning data coming from homogenous hospital sources and translating the business logic to code.

Storage layer

The storage layer provided low-cost, secure, and efficient storage infrastructure.

MEDHOST used Amazon S3 as a core component of its data lake. AWS DMS migration tasks saved data to S3 in .CSV format. Crawling data with AWS Glue made this landing zone data queryable and available for further processing. The initial AWS Glue ETL job stored the parquet formatted data to the data lake and its curated zone bucket. MEDHOST also used S3 to store the .CSV formatted data set that will be used to train, test, and validate its machine learning prediction model.

Analytics layer

The analytics layer gave MEDHOST pipeline reporting and dashboarding capabilities.

The data was in parquet format and partitioned in the curation zone bucket populated by the processing layer. This made querying with Amazon Athena or Amazon Redshift Spectrum fast and cost efficient.

From the Amazon Redshift cluster, MEDHOST created external tables that were used as staging tables for MEDHOST data warehouse and implemented an UPSERT logic to merge new data in its production tables. To showcase the reporting potential that was unlocked by the MEDHOST analytics layer, a connection was made to the Redshift cluster to Amazon QuickSight. Within minutes MEDHOST was able to create interactive analytics dashboards with filtering and drill-down capabilities such as a chart that showed the number of confirmed disease cases per US state.

Machine learning layer

The machine learning layer used MEDHOST’s existing data sets to train its cardiac risk prediction model and make it accessible via an endpoint.

Before getting into Data Lab, the MEDHOST team was not intimately familiar with machine learning. AWS Data Lab architects helped MEDHOST quickly understand concepts of machine learning and select a model appropriate for its use case. MEDHOST selected XGBoost as its model since cardiac prediction falls within regression technique. MEDHOST’s well architected data lake enabled it to quickly generate training, testing, and validation data sets using AWS Glue.

Amazon SageMaker abstracted underlying complexity of setting infrastructure for machine learning. With few clicks, MEDHOST started Jupyter notebook and coded the components leading to fitting and deploying its machine learning prediction model. Finally, MEDHOST created the endpoint for the model and ran REST calls to validate the endpoint and trained model. As a result, MEDHOST achieved the goal of predicting cardiac risk. Additionally, with Amazon QuickSight’s SageMaker integration, AWS made it easy to use SageMaker models directly in visualizations. QuickSight can call the model’s endpoint, send the input data to it, and put the inference results into the existing QuickSight data sets. This capability made it easy to display the results of the models directly in the dashboards. Read more about QuickSight’s SageMaker integration here.

Reinforcement layer

Finally, the reinforcement layer guaranteed that the results of the MEDHOST model were captured and processed to improve performance of the model.

The MEDHOST team went beyond the original goal and created an inference microservice to interact with the endpoint for prediction, enabled abstracting of the machine learning endpoint with the well-defined domain REST endpoint, and added a standard security layer to the MEDHOST application.

When there is a real-time call from the facility, the inference microservice gets inference from the SageMaker endpoint. Records containing input and inference data are fed to the data pipeline again. MEDHOST used Amazon Kinesis Data Streams to push records in real time. However, since retraining the machine learning model does not need to happen in real time, the Amazon Kinesis Data Firehose enabled MEDHOST to micro-batch records and efficiently save them to the landing zone bucket so that the data could be reprocessed.

Conclusion

Collaborating with AWS Data Lab enabled MEDHOST to:

  • Store single source of truth with low-cost storage solution (data lake)
  • Complete data pipeline for a low-cost data analytics solution
  • Create an almost production-ready code for cardiac risk prediction

The MEDHOST team learned many concepts related to data analytics and machine learning within four days. AWS Data Lab truly helped MEDHOST deliver results in an accelerated manner.


About the Authors

Pandian Velayutham is the Director of Engineering at MEDHOST. His team is responsible for delivering cloud solutions, integration and interoperability, and business analytics solutions. MEDHOST utilizes modern technology stack to provide innovative solutions to our customers. Pandian Velayutham is a technology evangelist and public cloud technology speaker.

 

 

 

 

George Komninos is a Data Lab Solutions Architect at AWS. He helps customers convert their ideas to a production-ready data product. Before AWS, he spent 3 years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.