Tag Archives: AWS Glue

How NerdWallet uses AWS and Apache Hudi to build a serverless, real-time analytics platform

Post Syndicated from Kevin Chun original https://aws.amazon.com/blogs/big-data/how-nerdwallet-uses-aws-and-apache-hudi-to-build-a-serverless-real-time-analytics-platform/

This is a guest post by Kevin Chun, Staff Software Engineer in Core Engineering at NerdWallet.

NerdWallet’s mission is to provide clarity for all of life’s financial decisions. This covers a diverse set of topics: from choosing the right credit card, to managing your spending, to finding the best personal loan, to refinancing your mortgage. As a result, NerdWallet offers powerful capabilities that span across numerous domains, such as credit monitoring and alerting, dashboards for tracking net worth and cash flow, machine learning (ML)-driven recommendations, and many more for millions of users.

To build a cohesive and performant experience for our users, we need to be able to use large volumes of varying user data sourced by multiple independent teams. This requires a strong data culture along with a set of data infrastructure and self-serve tooling that enables creativity and collaboration.

In this post, we outline a use case that demonstrates how NerdWallet is scaling its data ecosystem by building a serverless pipeline that enables streaming data from across the company. We iterated on two different architectures. We explain the challenges we ran into with the initial design and the benefits we achieved by using Apache Hudi and additional AWS services in the second design.

Problem statement

NerdWallet captures a sizable amount of spending data. This data is used to build helpful dashboards and actionable insights for users. The data is stored in an Amazon Aurora cluster. Even though the Aurora cluster works well as an Online Transaction Processing (OLTP) engine, it’s not suitable for large, complex Online Analytical Processing (OLAP) queries. As a result, we can’t expose direct database access to analysts and data engineers. The data owners have to solve requests with new data derivations on read replicas. As the data volume and the diversity of data consumers and requests grow, this process gets more difficult to maintain. In addition, data scientists mostly require data files access from an object store like Amazon Simple Storage Service (Amazon S3).

We decided to explore alternatives where all consumers can independently fulfill their own data requests safely and scalably using open-standard tooling and protocols. Drawing inspiration from the data mesh paradigm, we designed a data lake based on Amazon S3 that decouples data producers from consumers while providing a self-serve, security-compliant, and scalable set of tooling that is easy to provision.

Initial design

The following diagram illustrates the architecture of the initial design.

The design included the following key components:

  1. We chose AWS Data Migration Service (AWS DMS) because it’s a managed service that facilitates the movement of data from various data stores such as relational and NoSQL databases into Amazon S3. AWS DMS allows one-time migration and ongoing replication with change data capture (CDC) to keep the source and target data stores in sync.
  2. We chose Amazon S3 as the foundation for our data lake because of its scalability, durability, and flexibility. You can seamlessly increase storage from gigabytes to petabytes, paying only for what you use. It’s designed to provide 11 9s of durability. It supports structured, semi-structured, and unstructured data, and has native integration with a broad portfolio of AWS services.
  3. AWS Glue is a fully managed data integration service. AWS Glue makes it easier to categorize, clean, transform, and reliably transfer data between different data stores.
  4. Amazon Athena is a serverless interactive query engine that makes it easy to analyze data directly in Amazon S3 using standard SQL. Athena scales automatically—running queries in parallel—so results are fast, even with large datasets, high concurrency, and complex queries.

This architecture works fine with small testing datasets. However, the team quickly ran into complications with the production datasets at scale.

Challenges

The team encountered the following challenges:

  • Long batch processing time and complexed transformation logic – A single run of the Spark batch job took 2–3 hours to complete, and we ended up getting a fairly large AWS bill when testing against billions of records. The core problem was that we had to reconstruct the latest state and rewrite the entire set of records per partition for every job run, even if the incremental changes were a single record of the partition. When we scaled that to thousands of unique transactions per second, we quickly saw the degradation in transformation performance.
  • Increased complexity with a large number of clients – This workload contained millions of clients, and one common query pattern was to filter by single client ID. There were numerous optimizations that we were forced to tack on, such as predicate pushdowns, tuning the Parquet file size, using a bucketed partition scheme, and more. As more data owners adopted this architecture, we would have to customize each of these optimizations for their data models and consumer query patterns.
  • Limited extendibility for real-time use cases – This batch extract, transform, and load (ETL) architecture wasn’t going to scale to handle hourly updates of thousands of records upserts per second. In addition, it would be challenging for the data platform team to keep up with the diverse real-time analytical needs. Incremental queries, time-travel queries, improved latency, and so on would require heavy investment over a long period of time. Improving on this issue would open up possibilities like near-real-time ML inference and event-based alerting.

With all these limitations of the initial design, we decided to go all-in on a real incremental processing framework.

Solution

The following diagram illustrates our updated design. To support real-time use cases, we added Amazon Kinesis Data Streams, AWS Lambda, Amazon Kinesis Data Firehose and Amazon Simple Notification Service (Amazon SNS) into the architecture.

The updated components are as follows:

  1. Amazon Kinesis Data Streams is a serverless streaming data service that makes it easy to capture, process, and store data streams. We set up a Kinesis data stream as a target for AWS DMS. The data stream collects the CDC logs.
  2. We use a Lambda function to transform the CDC records. We apply schema validation and data enrichment at the record level in the Lambda function. The transformed results are published to a second Kinesis data stream for the data lake consumption and an Amazon SNS topic so that changes can be fanned out to various downstream systems.
  3. Downstream systems can subscribe to the Amazon SNS topic and take real-time actions (within seconds) based on the CDC logs. This can support use cases like anomaly detection and event-based alerting.
  4. To solve the problem of long batch processing time, we use Apache Hudi file format to store the data and perform streaming ETL using AWS Glue streaming jobs. Apache Hudi is an open-source transactional data lake framework that greatly simplifies incremental data processing and data pipeline development. Hudi allows you to build streaming data lakes with incremental data pipelines, with support for transactions, record-level updates, and deletes on data stored in data lakes. Hudi integrates well with various AWS analytics services such as AWS Glue, Amazon EMR, and Athena, which makes it a straightforward extension of our previous architecture. While Apache Hudi solves the record-level update and delete challenges, AWS Glue streaming jobs convert the long-running batch transformations into low-latency micro-batch transformations. We use the AWS Glue Connector for Apache Hudi to import the Apache Hudi dependencies in the AWS Glue streaming job and write transformed data to Amazon S3 continuously. Hudi does all the heavy lifting of record-level upserts, while we simply configure the writer and transform the data into Hudi Copy-on-Write table type. With Hudi on AWS Glue streaming jobs, we reduce the data freshness latency for our core datasets from hours to under 15 minutes.
  5. To solve the partition challenges for high cardinality UUIDs, we use the bucketing technique. Bucketing groups data based on specific columns together within a single partition. These columns are known as bucket keys. When you group related data together into a single bucket (a file within a partition), you significantly reduce the amount of data scanned by Athena, thereby improving query performance and reducing cost. Our existing queries are filtered on the user ID already, so we significantly improve the performance of our Athena usage without having to rewrite queries by using bucketed user IDs as the partition scheme. For example, the following code shows total spending per user in specific categories:
    SELECT ID, SUM(AMOUNT) SPENDING
    FROM "{{DATABASE}}"."{{TABLE}}"
    WHERE CATEGORY IN (
    'ENTERTAINMENT',
    'SOME_OTHER_CATEGORY')
    AND ID_BUCKET ='{{ID_BUCKET}}'
    GROUP BY ID;

  1. Our data scientist team can access the dataset and perform ML model training using Amazon SageMaker.
  2. We maintain a copy of the raw CDC logs in Amazon S3 via Amazon Kinesis Data Firehose.

Conclusion

In the end, we landed on a serverless stream processing architecture that can scale to thousands of writes per second within minutes of freshness on our data lakes. We’ve rolled out to our first high-volume team! At our current scale, the Hudi job is processing roughly 1.75 MiB per second per AWS Glue worker, which can automatically scale up and down (thanks to AWS Glue auto scaling). We’ve also observed an outstanding improvement of end-to-end freshness at less than 5 minutes due to Hudi’s incremental upserts vs. our first attempt.

With Hudi on Amazon S3, we’ve built a high-leverage foundation to personalize our users’ experiences. Teams that own data can now share their data across the organization with reliability and performance characteristics built into a cookie-cutter solution. This enables our data consumers to build more sophisticated signals to provide clarity for all of life’s financial decisions.

We hope that this post will inspire your organization to build a real-time analytics platform using serverless technologies to accelerate your business goals.


About the authors

Kevin Chun is a Staff Software Engineer in Core Engineering at NerdWallet. He builds data infrastructure and tooling to help NerdWallet provide clarity for all of life’s financial decisions.

Dylan Qu is a Specialist Solutions Architect focused on big data and analytics with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Introducing AWS Glue Flex jobs: Cost savings on ETL workloads

Post Syndicated from Aniket Jiddigoudar original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-flex-jobs-cost-savings-on-etl-workloads/

AWS Glue is a serverless data integration service that makes it simple to discover, prepare, and combine data for analytics, machine learning (ML), and application development. You can use AWS Glue to create, run, and monitor data integration and ETL (extract, transform, and load) pipelines and catalog your assets across multiple data stores. Typically, these data integration jobs can have varying degrees of priority and time sensitivity. For example, non-urgent workloads such as pre-production, testing, and one-time data loads often don’t require fast job startup times or consistent runtimes via dedicated resources.

Today, we are pleased to announce the general availability of a new AWS Glue job run class called Flex. Flex allows you to optimize your costs on your non-urgent or non-time sensitive data integration workloads such as pre-production jobs, testing, and one-time data loads. With Flex, AWS Glue jobs run on spare compute capacity instead of dedicated hardware. The start and runtimes of jobs using Flex can vary because spare compute resources aren’t readily available and can be reclaimed during the run of a job

Regardless of the run option used, AWS Glue jobs have the same capabilities, including access to custom connectors, visual authoring interface, job scheduling, and Glue Auto Scaling. With the Flex execution option, customers can optimize the costs of their data integration workloads by configuring the execution option based on the workloads’ requirements, using standard execution option for time-sensitive workloads, and Flex for non-urgent workloads. The Flex execution class is available for AWS Glue 3.0 Spark jobs.

The Flex execution class is available for AWS Glue 3.0 Spark jobs.

In this post, we provide more details about AWS Glue Flex jobs and how to enable Flex capacity.

How do you use Flexible capacity?

The AWS Glue jobs API now supports an additional parameter called execution-class, which lets you choose STANDARD or FLEX when running the job. To use Flex, you simply set the parameter to FLEX.

To enable Flex via the AWS Glue Studio console, complete the following steps:

  1. On the AWS Glue Studio console, while authoring a job, navigate to the Job details tab
  2. Select Flex Execution.
  3. Set an appropriate value for the Job Timeout parameter (defaults to 120 minutes for Flex jobs).
  4. Save the job.
  5. After finalizing all other details, choose Run to run the job with Flex capacity.

On the Runs tab, you should be able to see FLEX listed under Execution class.

You can also enable Flex via the AWS Command Line Interface (AWS CLI).

You can set the --execution-class setting in the start-job-run API, which lets you run a particular AWS Glue job’s run with Flex capacity:

aws glue start-job-run --job-name my-job \
    --execution-class FLEX \
    --timeout 300 \

You can also set the --execution-class during the create-job API. This sets the default run class of all the runs of this job to FLEX:

aws glue create-job \
    --name flexCLI \
    --role AWSGlueServiceRoleDefault \
    --command "Name=glueetl,ScriptLocation=s3://mybucket/myfolder/" \
    --region us-east-2 \
    --execution-class FLEX \
    --worker-type G.1X \
    --number-of-workers 10 \
    --glue-version 3.0

The following are additional details about the relevant parameters:

  • –execution-class – The enum string that specifies if a job should be run as FLEX or STANDARD capacity. The default is STANDARD.
  • –timeout – Specifies the time (in minutes) the job will run before it’s moved into a TIMEOUT state.

When should you use Flexible capacity?

The Flex execution class is ideal for reducing the costs of time-insensitive workloads. For example:

  • Nightly ETL jobs, or jobs that run over weekends for processing workloads
  • One-time bulk data ingestion jobs
  • Jobs running in test environments or pre-production workloads
  • Time-insensitive workloads where it’s acceptable to have variable start and end times

In comparison, the standard execution class is ideal for time-sensitive workloads that require fast job startup and dedicated resources. In addition, jobs that have downstream dependencies are better served by the standard execution class.

What is the typical life-cycle of a Flexible capacity Job?

When a start-job-run API call is issued, with the execution-class set to FLEX, AWS Glue will begin to request compute resources. If no resources are available immediately upon issuing the API call, the job will move into a WAITING state. No billing occurs at this point.

As soon as the job is able to acquire compute resources, the job moves to a RUNNING state. At this point, even if all the computes requested aren’t available, the job begins running on whatever hardware is present. As more Flex capacity becomes available, AWS Glue adds it to the job, up to a maximum value specified by Number of workers.

At this point, billing begins. You’re charged only for the compute resources that are running at any given time, and only for the duration that they ran for.

While the job is running, if Flex capacity is reclaimed, AWS Glue continues running the job on the existing compute resources while it tries to meet the shortfall by requesting more resources. If capacity is reclaimed, billing for that capacity is halted as well. Billing for new capacity will start when it is provisioned again. If the job completes successfully, the job’s state moves to SUCCEEDED. If the job fails due to various user or system errors, the job’s state transitions to FAILED. If the job is unable to complete before the time specified by the --timeout parameter, whether due to a lack of compute capacity or due to issues with the AWS Glue job script, the job goes into a TIMEOUT state.

Flexible job runs rely on the availability of non-dedicated compute capacity in AWS, which in turn depends on several factors, such as the Region and Availability Zone, time of day, day of the week, and the number of DPUs required by a job.

A parameter of particular importance for Flex Jobs is the --timeout value. It’s possible for Flex jobs to take longer to run than standard jobs, especially if capacity is reclaimed while the job is running. As a result, selecting the right timeout value that’s appropriate for your workload is critical. Choose a timeout value such that the total cost of the Flex job run doesn’t exceed a standard job run. If the value is set too high, the job can wait for too long, trying to acquire capacity that isn’t available. If the value is set too low, the job times out, even if capacity is available and the job execution is proceeding correctly.

How are Flex capacity jobs billed?

Flex jobs are billed per worker at the Flex DPU-hour rates. This means that you’re billed only for the capacity that actually ran during the execution of the job, for the duration that it ran.

For example, if you ran an AWS Glue Flex job for 10 workers, and AWS Glue was only able to acquire 5 workers, you’re only billed for five workers, and only for the duration that those workers ran. If, during the job run, two out of those five workers are reclaimed, then billing for those two workers is stopped, while billing for the remaining three workers continues. If provisioning for the two reclaimed workers is successful during the job run, billing for those two will start again.

For more information on Flex pricing, refer to AWS Glue pricing.

Conclusion

This post discusses the new AWS Glue Flex job execution class, which allows you to optimize costs for non-time-sensitive ETL workloads and test environments.

You can start using Flex capacity for your existing and new workloads today. However, note that the Flex class is not supported for Python Shell jobs, AWS Glue streaming jobs, or AWS Glue ML jobs.

For more information on AWS Glue Flex jobs, refer to their latest documentation.

Special thanks to everyone who contributed to the launch: Parag Shah, Sampath Shreekantha, Yinzhi Xi and Jessica Cheng,


About the authors

Aniket Jiddigoudar is a Big Data Architect on the AWS Glue team.

Vaibhav Porwal is a Senior Software Development Engineer on the AWS Glue team.

Sriram Ramarathnam is a Software Development Manager on the AWS Glue team.

Best practices to optimize cost and performance for AWS Glue streaming ETL jobs

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/best-practices-to-optimize-cost-and-performance-for-aws-glue-streaming-etl-jobs/

AWS Glue streaming extract, transform, and load (ETL) jobs allow you to process and enrich vast amounts of incoming data from systems such as Amazon Kinesis Data Streams, Amazon Managed Streaming for Apache Kafka (Amazon MSK), or any other Apache Kafka cluster. It uses the Spark Structured Streaming framework to perform data processing in near-real time.

This post covers use cases where data needs to be efficiently processed, delivered, and possibly actioned in a limited amount of time. This can cover a wide range of cases, such as log processing and alarming, continuous data ingestion and enrichment, data validation, internet of things, machine learning (ML), and more.

We discuss the following topics:

  • Development tools that help you code faster using our newly launched AWS Glue Studio notebooks
  • How to monitor and tune your streaming jobs
  • Best practices for sizing and scaling your AWS Glue cluster, using our newly launched features like auto scaling and the small worker type G 0.25X

Development tools

AWS Glue Studio notebooks can speed up the development of your streaming job by allowing data engineers to work using an interactive notebook and test code changes to get quick feedback—from business logic coding to testing configuration changes—as part of tuning.

Before you run any code in the notebook (which would start the session), you need to set some important configurations.

The magic %streaming creates the session cluster using the same runtime as AWS Glue streaming jobs. This way, you interactively develop and test your code using the same runtime that you use later in the production job.

Additionally, configure Spark UI logs, which will be very useful for monitoring and tuning the job.

See the following configuration:

%streaming
%%configure
{
"--enable-spark-ui": "true",
"--spark-event-logs-path": "s3://your_bucket/sparkui/"
}

For additional configuration options such as version or number of workers, refer to Configuring AWS Glue interactive sessions for Jupyter and AWS Glue Studio notebooks.

To visualize the Spark UI logs, you need a Spark history server. If you don’t have one already, refer to Launching the Spark History Server for deployment instructions.

Structured Streaming is based on streaming DataFrames, which represent micro-batches of messages.
The following code is an example of creating a stream DataFrame using Amazon Kinesis as the source:

kinesis_options = {
  "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
  "startingPosition": "TRIM_HORIZON",
  "inferSchema": "true",
  "classification": "json"
}
kinesisDF = glueContext.create_data_frame_from_options(
   connection_type="kinesis",
   connection_options=kinesis_options
)

The AWS Glue API helps you create the DataFrame by doing schema detection and auto decompression, depending on the format. You can also build it yourself using the Spark API directly:

kinesisDF = spark.readStream.format("kinesis").options(**kinesis_options).load()

After your run any code cell, it triggers the startup of the session, and the application soon appears in the history server as an incomplete app (at the bottom of the page there is a link to display incomplete apps) named GlueReplApp, because it’s a session cluster. For a regular job, it’s listed with the job name given when it was created.

History server home page

From the notebook, you can take a sample of the streaming data. This can help development and give an indication of the type and size of the streaming messages, which might impact performance.

Monitor the cluster with Structured Streaming

The best way to monitor and tune your AWS Glue streaming job is using the Spark UI; it gives you the overall streaming job trends on the Structured Streaming tab and the details of each individual micro-batch processing job.

Overall view of the streaming job

On the Structured Streaming tab, you can see a summary of the streams running in the cluster, as in the following example.

Normally there is just one streaming query, representing a streaming ETL. If you start multiple in parallel, it’s good if you give it a recognizable name, calling queryName() if you use the writeStream API directly on the DataFrame.

After a good number of batches are complete (such as 10), enough for the averages to stabilize, you can use Avg Input/sec column to monitor how many events or messages the job is processing. This can be confusing because the column to the right, Avg Process/sec, is similar but often has a higher number. The difference is that this process time tells us how efficient our code is, whereas the average input tells us how many messages the cluster is reading and processing.

The important thing to note is that if the two values are similar, it means the job is working at maximum capacity. It’s making the best use of the hardware but it likely won’t be able to cope with an increase in volume without causing delays.

In the last column is the latest batch number. Because they’re numbered incrementally from zero, this tells us how many batches the query has processed so far.

When you choose the link in the “Run ID” column of a streaming query, you can review the details with graphs and histograms, as in the following example.

The first two rows correspond to the data that is used to calculate the averages shown on the summary page.

For Input Rate, each data point is calculated by dividing the number of events read for the batch by the time passed between the current batch start and the previous batch start. In a healthy system that is able to keep up, this is equal to the configured trigger interval (in the GlueContext.forEachBatch() API, this is set using the option windowSize).

Because it uses the current batch rows with the previous batch latency, this graph is often unstable in the first batches until the Batch Duration (the last line graph) stabilizes.

In this example, when it stabilizes, it gets completely flat. This means that either the influx of messages is constant or the job is hitting the limit per batch set (we discuss how to do this later in the post).

Be careful if you set a limit per batch that is constantly hit, you could be silently building a backlog, but everything could look good in the job metrics. To monitor this, have a metric of latency measuring the difference between the message timestamp when it gets created and the time it’s processed.

Process Rate is calculated by dividing the number of messages in a batch by the time it took to process that batch. For instance, if the batch contains 1,000 messages, and the trigger interval is 10 seconds but the batch only needed 5 seconds to process it, the process rate would be 1000/5 = 200 msg/sec. while the input rate for that batch (assuming the previous batch also ran within the interval) is 1000/10 = 100 msg/sec.

This metric is useful to measure how efficient our code processing the batch is, and therefore it can get higher than the input rate (this doesn’t mean it’s processing more messages, just using less time). As mentioned earlier, if both metrics get close, it means the batch duration is close to the interval and therefore additional traffic is likely to start causing batch trigger delays (because the previous batch is still running) and increase latency.

Later in this post, we show how auto scaling can help prevent this situation.

Input Rows shows the number of messages read for each batch, like input rate, but using volume instead of rate.

It’s important to note that if the batch processes the data multiple times (for example, writing to multiple destinations), the messages are counted multiple times. If the rates are greater than the expected, this could be the reason. In general, to avoid reading messages multiple times, you should cache the batch while processing it, which is the default when you use the GlueContext.forEachBatch() API.

The last two rows tell us how long it takes to process each batch and how is that time spent. It’s normal to see the first batches take much longer until the system warms up and stabilizes.
The important thing to look for is that the durations are roughly stable and well under the configured trigger interval. If that’s not the case, the next batch gets delayed and could start a compounding delay by building a backlog or increasing batch size (if the limit allows taking the extra messages pending).

In Operation Duration, the majority of time should be spent on addBatch (the mustard color), which is the actual work. The rest are fixed overhead, therefore the smaller the batch process, the more percentage of time that will take. This represents the trade-off between small batches with lower latency or bigger batches but more computing efficient.

Also, it’s normal for the first batch to spend significant time in the latestOffset (the brown bar), locating the point at which it needs to start processing when there is no checkpoint.

The following query statistics show another example.

In this case, the input has some variation (meaning it’s not hitting the batch limit). Also, the process rate is roughly the same as the input rate. This tells us the system is at max capacity and struggling to keep up. By comparing the input rows and input rate, we can guess that the interval configured is just 3 seconds and the batch duration is barely able to meet that latency.

Finally, in Operation Duration, you can observe that because the batches are so frequent, a significant amount of time (proportionally speaking) is spent saving the checkpoint (the dark green bar).

With this information, we can probably improve the stability of the job by increasing the trigger interval to 5 seconds or more. This way, it checkpoints less often and has more time to process data, which might be enough to get batch duration consistently under the interval. The trade-off is that the latency between when a message is published and when it’s processed is longer.

Monitor individual batch processing

On the Jobs tab, you can see how long each batch is taking and dig into the different steps the processing involves to understand how the time is spent. You can also check if there are tasks that succeed after retry. If this happens continuously, it can silently hurt performance.

For instance, the following screenshot shows the batches on the Jobs tab of the Spark UI of our streaming job.

Each batch is considered a job by Spark (don’t confuse the job ID with the batch number; they only match if there is no other action). The job group is the streaming query ID (this is important only when running multiple queries).

The streaming job in this example has a single stage with 100 partitions. Both batches processed them successfully, so the stage is marked as succeeded and all the tasks completed (100/100 in the progress bar).

However, there is a difference in the first batch: there were 20 task failures. You know all the failed tasks succeeded in the retries, otherwise the stage would have been marked as failed. For the stage to fail, the same task would have to fail four times (or as configured by spark.task.maxFailures).

If the stage fails, the batch fails as well and possibly the whole job; if the job was started by using GlueContext.forEachBatch(), it has a number of retries as per the batchMaxRetries parameter (three by default).

These failures are important because they have two effects:

  • They can silently cause delays in the batch processing, depending on how long it took to fail and retry.
  • They can cause records to be sent multiple times if the failure is in the last stage of the batch, depending on the type of output. If the output is files, in general it won’t cause duplicates. However, if the destination is Amazon DynamoDB, JDBC, Amazon OpenSearch Service, or another output that uses batching, it’s possible that some part of the output has already been sent. If you can’t tolerate any duplicates, the destination system should handle this (for example, being idempotent).

Choosing the description link takes you to the Stages tab for that job. Here you can dig into the failure: What is the exception? Is it always in the same executor? Does it succeed on the first retry or took multiple?

Ideally, you want to identify these failures and solve them. For example, maybe the destination system is throttling us because doesn’t have enough provisioned capacity, or a larger timeout is needed. Otherwise, you should at least monitor it and decide if it is systemic or sporadic.

Sizing and scaling

Defining how to split the data is a key element in any distributed system to run and scale efficiently. The design decisions on the messaging system will have a strong influence on how the streaming job will perform and scale, and thereby affect the job parallelism.

In the case of AWS Glue Streaming, this division of work is based on Apache Spark partitions, which define how to split the work so it can be processed in parallel. Each time the job reads a batch from the source, it divides the incoming data into Spark partitions.

For Apache Kafka, each topic partition becomes a Spark partition; similarly, for Kinesis, each stream shard becomes a Spark partition. To simplify, I’ll refer to this parallelism level as number of partitions, meaning Spark partitions that will be determined by the input Kafka partitions or Kinesis shards on a one-to-one basis.

The goal is to have enough parallelism and capacity to process each batch of data in less time than the configured batch interval and therefore be able to keep up. For instance, with a batch interval of 60 seconds, the job lets 60 seconds of data build up and then processes that data. If that work takes more than 60 seconds, the next batch waits until the previous batch is complete before starting a new batch with the data that has built up since the previous batch started.

It’s a good practice to limit the amount of data to process in a single batch, instead of just taking everything that has been added since the last one. This helps make the job more stable and predictable during peak times. It allows you to test that the job can handle volume of data without issues (for example, memory or throttling).

To do so, specify a limit when defining the source stream DataFrame:

  • For Kinesis, specify the limit using kinesis.executor.maxFetchRecordsPerShard, and revise this number if the number of shards changes substantially. You might need to increase kinesis.executor.maxFetchTimeInMs as well, in order to allow more time to read the batch and make sure it’s not truncated.
  • For Kafka, set maxOffsetsPerTrigger, which divides that allowance equally between the number of partitions.

The following is an example of setting this config for Kafka (for Kinesis, it’s equivalent but using Kinesis properties):

kafka_properties= {
  "kafka.bootstrap.servers": "bootstrapserver1:9092",
  "subscribe": "mytopic",
  "startingOffsets": "latest",
  "maxOffsetsPerTrigger": "5000000"
}
# Pass the properties as options when creating the DataFrame
spark.spark.readStream.format("kafka").options(**kafka_properties).load()

Initial benchmark

If the events can be processed individually (no interdependency such as grouping), you can get a rough estimation of how many messages a single Spark core can handle by running with a single partition source (one Kafka partition or one Kinesis shard stream) with data preloaded into it and run batches with a limit and the minimum interval (1 second). This simulates a stress test with no downtime between batches.

For these repeated tests, clear the checkpoint directory, use a different one (for example, make it dynamic using the timestamp in the path), or just disable the checkpointing (if using the Spark API directly), so you can reuse the same data.
Leave a few batches to run (at least 10) to give time for the system and the metrics to stabilize.

Start with a small limit (using the limit configuration properties explained in the previous section) and do multiple reruns, increasing the value. Record the batch duration for that limit and the throughput input rate (because it’s a stress test, the process rate should be similar).

In general, larger batches tend to be more efficient up to a point. This is because the fixed overhead taken for each to checkpoint, plan, and coordinate the nodes is more significant if the batches are smaller and therefore more frequent.

Then pick your reference initial settings based on the requirements:

  • If a goal SLA is required, use the largest batch size whose batch duration is less than half the latency SLA. This is because in the worst case, a message that is stored just after a batch is triggered has to wait at least the interval and then the processing time (which should be less than the interval). When the system is keeping up, the latency in this worst case would be close to twice the interval, so aim for the batch duration to be less than half the target latency.
  • In the case where the throughput is the priority over latency, just pick the batch size that provides a higher average process rate and define an interval that allows some buffer over the observed batch duration.

Now you have an idea of the number of messages per core our ETL can handle and the latency. These numbers are idealistic because the system won’t scale perfectly linearly when you add more partitions and nodes. You can use the messages per core obtained to divide the total number of messages per second to process and get the minimum number of Spark partitions needed (each core handles one partition in parallel).

With this number of estimated Spark cores, calculate the number of nodes needed depending on the type and version, as summarized in the following table.

AWS Glue Version Worker Type vCores Spark Cores per Worker
2 G 1X 4 8
2 G 2X 8 16
3 G 0.25X 2 2
3 G 1X 4 4
3 G 2X 8 8

Using the newer version 3 is preferable because it includes more optimizations and features like auto scaling (which we discuss later). Regarding size, unless the job has some operation that is heavy on memory, it’s preferable to use the smaller instances so there aren’t so many cores competing for memory, disk, and network shared resources.

Spark cores are equivalent to threads; therefore, you can have more (or less) than the actual cores available in the instance. This doesn’t mean that having more Spark cores is going to necessarily be faster if they’re not backed by physical cores, it just means you have more parallelism competing for the same CPU.

Sizing the cluster when you control the input message system

This is the ideal case because you can optimize the performance and the efficiency as needed.

With the benchmark information you just gathered, you can define your initial AWS Glue cluster size and configure Kafka or Kinesis with the number of partitions or topics estimated, plus some buffer. Test this baseline setup and adjust as needed until the job can comfortably meet the total volume and required latency.

For instance, if we have determined that we need 32 cores to be well within the latency requirement for the volume of data to process, then we can create an AWS Glue 3.0 cluster with 9 G.1X nodes (a driver and 8 workers with 4 cores = 32) which reads from a Kinesis data stream with 32 shards.

Imagine that the volume of data in that stream doubles and we want to keep the latency requirements. To do so, we double the number of workers (16 + 1 driver = 17) and the number of shards on the stream (now 64). Remember this is just a reference and needs to be validated; in practice you might need more or less nodes depending on the cluster size, if the destination system can keep up, complexity of transformations, or other parameters.

Sizing the cluster when you don’t control the message system configuration

In this case, your options for tuning are much more limited.

Check if a cluster with the same number of Spark cores as existing partitions (determined by the message system) is able to keep up with the expected volume of data and latency, plus some allowance for peak times.

If that’s not the case, adding more nodes alone won’t help. You need to repartition the incoming data inside AWS Glue. This operation adds an overhead to redistribute the data internally, but it’s the only way the job can scale out in this scenario.

Let’s illustrate with an example. Imagine we have a Kinesis data stream with one shard that we don’t control, and there isn’t enough volume to justify asking the owner to increase the shards. In the cluster, significant computing for each message is needed; for each message, it runs heuristics and other ML techniques to take action depending on the calculations. After running some benchmarks, the calculations can be done promptly for the expected volume of messages using 8 cores working in parallel. By default, because there is only one shard, only one core will process all the messages sequentially.

To solve this scenario, we can provision an AWS Glue 3.0 cluster with 3 G 1X nodes to have 8 worker cores available. In the code repartition, the batch distributes the messages randomly (as evenly as possible) between them:

def batch_function(data_frame, batch_id):
    # Repartition so the udf is called in parallel for each partition
    data_frame.repartition(8).foreach(process_event_udf)

glueContext.forEachBatch(frame=streaming_df, batch_function=batch_function)

If the messaging system resizes the number of partitions or shards, the job picks up this change on the next batch. You might need to adjust the cluster capacity accordingly with the new data volume.

The streaming job is able to process more partitions than Spark cores are available, but might cause inefficiencies because the additional partitions will be queued and won’t start being processed until others finish. This might result in many nodes being idle while the remaining partitions finish and the next batch can be triggered.

When the messages have processing interdependencies

If the messages to be processed depend on other messages (either in the same or previous batches), that’s likely to be a limiting factor on the scalability. In that case, it might help to analyze a batch (job in Spark UI) to see where the time is spent and if there are imbalances by checking the task duration percentiles on the Stages tab (you can also reach this page by choosing a stage on the Jobs tab).

Auto scaling

Up to now, you have seen sizing methods to handle a stable stream of data with the occasional peak.
However, for variable incoming volumes of data, this isn’t cost-effective because you need to size for the worst-case scenario or accept higher latency at peak times.

This is where AWS Glue Streaming 3.0 auto scaling comes in. You can enable it for the job and define the maximum number of workers you want to allow (for example, using the number you have determined needed for the peak times).

The runtime monitors the trend of time spent on batch processing and compares it with the configured interval. Based on that, it makes a decision to increase or decrease the number of workers as needed, being more aggressive as the batch times get near or go over the allowed interval time.

The following screenshot is an example of a streaming job with auto scaling enabled.

Splitting workloads

You have seen how to scale a single job by adding nodes and partitioning the data as needed, which is enough on most cases. As the cluster grows, there is still a single driver and the nodes have to wait for the others to complete the batch before they can take additional work. If it reaches a point that increasing the cluster size is no longer effective, you might want to consider splitting the workload between separate jobs.

In the case of Kinesis, you need to divide the data into multiple streams, but for Apache Kafka, you can divide a topic into multiple jobs by assigning partitions to each one. To do so, instead of the usual subscribe or subscribePattern where the topics are listed, use the property assign to assign using JSON a subset of the topic partitions that the job will handle (for example, {"topic1": [0,1,2]}). At the time of this writing, it’s not possible to specify a range, so you need to list all the partitions, for instance building that list dynamically in the code.

Sizing down

For low volumes of traffic, AWS Glue Streaming has a special type of small node: G 0.25X, which provides two cores and 4 GB RAM for a quarter of the cost of a DPU, so it’s very cost-effective. However, even with that frugal capacity, if you have many small streams, having a small cluster for each one is still not practical.

For such situations, there are currently a few options:

  • Configure the stream DataFrame to feed from multiple Kafka topics or Kinesis streams. Then in the DataFrame, use the columns topic and streamName, for Kafka and Kinesis sources respectively, to determine how to handle the data (for example, different transformations or destinations). Make sure the DataFrame is cached, so you don’t read the data multiple times.
  • If you have a mix of Kafka and Kinesis sources, you can define a DataFrame for each, join them, and process as needed using the columns mentioned in the previous point.
  • The preceding two cases require all the sources to have the same batch interval and links their processing (for example, a busier stream can delay a slower one). To have independent stream processing inside the same cluster, you can trigger the processing of separate stream’s DataFrames using separate threads. Each stream is monitored separately in the Spark UI, but you’re responsible for starting and managing those threads and handle errors.

Settings

In this post, we showed some config settings that impact performance. The following table summarizes the ones we discussed and other important config properties to use when creating the input stream DataFrame.

Property Applies to Remarks
maxOffsetsPerTrigger Kafka Limit of messages per batch. Divides the limit evenly among partitions.
kinesis.executor.maxFetchRecordsPerShard Kinesis Limit per each shard, therefore should be revised if the number of shards changes.
kinesis.executor.maxFetchTimeInMs Kinesis When increasing the batch size (either by increasing the batch interval or the previous property), the executor might need more time, allotted by this property.
startingOffsets Kafka Normally you want to read all the data available and therefore use earliest. However, if there is a big backlog, the system might take a long time to catch up and instead use latest to skip the history.
startingposition Kinesis Similar to startingOffsets, in this case the values to use are TRIM_HORIZON to backload and LATEST to start processing from now on.
includeHeaders Kafka Enable this flag if you need to merge and split multiple topics in the same job (see the previous section for details).
kinesis.executor.maxconnections Kinesis When writing to Kinesis, by default it uses a single connection. Increasing this might improve performance.
kinesis.client.avoidEmptyBatches Kinesis It’s best to set it to true to avoid wasting resources (for example, generating empty files) when there is no data (like the Kafka connector does). GlueContext.forEachBatch prevents empty batches by default.

Further optimizations

In general, it’s worth doing some compression on the messages to save on transfer time (at the expense of some CPU, depending on the compression type used).

If the producer compresses the messages individually, AWS Glue can detect it and decompress automatically in most cases, depending on the format and type. For more information, refer to Adding Streaming ETL Jobs in AWS Glue.

If using Kafka, you have the option to compress the topic. This way, the compression is more effective because it’s done in batches, end-to-end, and it’s transparent to the producer and consumer.

By default, the GlueContext.forEachBatch function caches the incoming data. This is helpful if the data needs to be sent to multiple sinks (for example, as Amazon S3 files and also to update a DynamoDB table) because otherwise the job would read the data multiple times from the source. But it can be detrimental to performance if the volume of data is big and there is only one output.

To disable this option, set persistDataFrame as false:

glueContext.forEachBatch(
    frame=myStreamDataFrame,
    batch_function=processBatch,
    options={
        "windowSize": "30 seconds",
        "checkpointLocation": myCheckpointPath,
        "persistDataFrame":  "false"
    }
)

In streaming jobs, it’s common to have to join streaming data with another DataFrame to do enrichment (for example, lookups). In that case, you want to avoid any shuffle if possible, because it splits stages and causes data to be moved between nodes.

When the DataFrame you’re joining to is relatively small to fit in memory, consider using a broadcast join. However, bear in mind it will be distributed to the nodes on every batch, so it might not be worth it if the batches are too small.

If you need to shuffle, consider enabling the Kryo serializer (if using custom serializable classes you need to register them first to use it).

As in any AWS Glue jobs, avoid using custom udf() if you can do the same with the provided API like Spark SQL. User-defined functions (UDFs) prevent the runtime engine from performing many optimizations (the UDF code is a black box for the engine) and in the case of Python, it forces the movement of data between processes.

Avoid generating too many small files (especially columnar like Parquet or ORC, which have overhead per file). To do so, it might be a good idea to coalesce the micro-batch DataFrame before writing the output. If you’re writing partitioned data to Amazon S3, repartition based on columns can significantly reduce the number of output files created.

Conclusion

In this post, you saw how to approach sizing and tuning an AWS Glue streaming job in different scenarios, including planning considerations, configuration, monitoring, tips, and pitfalls.

You can now use these techniques to monitor and improve your existing streaming jobs or use them when designing and building new ones.


About the author

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team.

How Epos Now modernized their data platform by building an end-to-end data lake with the AWS Data Lab

Post Syndicated from Debadatta Mohapatra original https://aws.amazon.com/blogs/big-data/how-epos-now-modernized-their-data-platform-by-building-an-end-to-end-data-lake-with-the-aws-data-lab/

Epos Now provides point of sale and payment solutions to over 40,000 hospitality and retailers across 71 countries. Their mission is to help businesses of all sizes reach their full potential through the power of cloud technology, with solutions that are affordable, efficient, and accessible. Their solutions allow businesses to leverage actionable insights, manage their business from anywhere, and reach customers both in-store and online.

Epos Now currently provides real-time and near-real-time reports and dashboards to their merchants on top of their operational database (Microsoft SQL Server). With a growing customer base and new data needs, the team started to see some issues in the current platform.

First, they observed performance degradation for serving the reporting requirements from the same OLTP database with the current data model. A few metrics that needed to be delivered in real time (seconds after a transaction was complete) and a few metrics that needed to be reflected in the dashboard in near-real-time (minutes) took several attempts to load in the dashboard.

This started to cause operational issues for their merchants. The end consumers of reports couldn’t access the dashboard in a timely manner.

Cost and scalability also became a major problem because one single database instance was trying to serve many different use cases.

Epos Now needed a strategic solution to address these issues. Additionally, they didn’t have a dedicated data platform for doing machine learning and advanced analytics use cases, so they decided on two parallel strategies to resolve their data problems and better serve merchants:

  • The first was to rearchitect the near-real-time reporting feature by moving it to a dedicated Amazon Aurora PostgreSQL-Compatible Edition database, with a specific reporting data model to serve to end consumers. This will improve performance, uptime, and cost.
  • The second was to build out a new data platform for reporting, dashboards, and advanced analytics. This will enable use cases for internal data analysts and data scientists to experiment and create multiple data products, ultimately exposing these insights to end customers.

In this post, we discuss how Epos Now designed the overall solution with support from the AWS Data Lab. Having developed a strong strategic relationship with AWS over the last 3 years, Epos Now opted to take advantage of the AWS Data lab program to speed up the process of building a reliable, performant, and cost-effective data platform. The AWS Data Lab program offers accelerated, joint-engineering engagements between customers and AWS technical resources to create tangible deliverables that accelerate data and analytics modernization initiatives.

Working with an AWS Data Lab Architect, Epos Now commenced weekly cadence calls to come up with a high-level architecture. After the objective, success criteria, and stretch goals were clearly defined, the final step was to draft a detailed task list for the upcoming 3-day build phase.

Overview of solution

As part of the 3-day build exercise, Epos Now built the following solution with the ongoing support of their AWS Data Lab Architect.

Epos Now Arch Image

The platform consists of an end-to-end data pipeline with three main components:

  • Data lake – As a central source of truth
  • Data warehouse – For analytics and reporting needs
  • Fast access layer – To serve near-real-time reports to merchants

We chose three different storage solutions:

  • Amazon Simple Storage Service (Amazon S3) for raw data landing and a curated data layer to build the foundation of the data lake
  • Amazon Redshift to create a federated data warehouse with conformed dimensions and star schemas for consumption by Microsoft Power BI, running on AWS
  • Aurora PostgreSQL to store all the data for near-real-time reporting as a fast access layer

In the following sections, we go into each component and supporting services in more detail.

Data lake

The first component of the data pipeline involved ingesting the data from an Amazon Managed Streaming for Apache Kafka (Amazon MSK) topic using Amazon MSK Connect to land the data into an S3 bucket (landing zone). The Epos Now team used the Confluent Amazon S3 sink connector to sink the data to Amazon S3. To make the sink process more resilient, Epos Now added the required configuration for dead-letter queues to redirect the bad messages to another topic. The following code is a sample configuration for a dead-letter queue in Amazon MSK Connect:

Because Epos Now was ingesting from multiple data sources, they used Airbyte to transfer the data to a landing zone in batches. A subsequent AWS Glue job reads the data from the landing bucket , performs data transformation, and moves the data to a curated zone of Amazon S3 in optimal format and layout. This curated layer then became the source of truth for all other use cases. Then Epos Now used an AWS Glue crawler to update the AWS Glue Data Catalog. This was augmented by the use of Amazon Athena for doing data analysis. To optimize for cost, Epos Now defined an optimal data retention policy on different layers of the data lake to save money as well as keep the dataset relevant.

Data warehouse

After the data lake foundation was established, Epos Now used a subsequent AWS Glue job to load the data from the S3 curated layer to Amazon Redshift. We used Amazon Redshift to make the data queryable in both Amazon Redshift (internal tables) and Amazon Redshift Spectrum. The team then used dbt as an extract, load, and transform (ELT) engine to create the target data model and store it in target tables and views for internal business intelligence reporting. The Epos Now team wanted to use their SQL knowledge to do all ELT operations in Amazon Redshift, so they chose dbt to perform all the joins, aggregations, and other transformations after the data was loaded into the staging tables in Amazon Redshift. Epos Now is currently using Power BI for reporting, which was migrated to the AWS Cloud and connected to Amazon Redshift clusters running inside Epos Now’s VPC.

Fast access layer

To build the fast access layer to deliver the metrics to Epos Now’s retail and hospitality merchants in near-real time, we decided to create a separate pipeline. This required developing a microservice running a Kafka consumer job to subscribe to the same Kafka topic in an Amazon Elastic Kubernetes Service (Amazon EKS) cluster. The microservice received the messages, conducted the transformations, and wrote the data to a target data model hosted on Aurora PostgreSQL. This data was delivered to the UI layer through an API also hosted on Amazon EKS, exposed through Amazon API Gateway.

Outcome

The Epos Now team is currently building both the fast access layer and a centralized lakehouse architecture-based data platform on Amazon S3 and Amazon Redshift for advanced analytics use cases. The new data platform is best positioned to address scalability issues and support new use cases. The Epos Now team has also started offloading some of the real-time reporting requirements to the new target data model hosted in Aurora. The team has a clear strategy around the choice of different storage solutions for the right access patterns: Amazon S3 stores all the raw data, and Aurora hosts all the metrics to serve real-time and near-real-time reporting requirements. The Epos Now team will also enhance the overall solution by applying data retention policies in different layers of the data platform. This will address the platform cost without losing any historical datasets. The data model and structure (data partitioning, columnar file format) we designed greatly improved query performance and overall platform stability.

Conclusion

Epos Now revolutionized their data analytics capabilities, taking advantage of the breadth and depth of the AWS Cloud. They’re now able to serve insights to internal business users, and scale their data platform in a reliable, performant, and cost-effective manner.

The AWS Data Lab engagement enabled Epos Now to move from idea to proof of concept in 3 days using several previously unfamiliar AWS analytics services, including AWS Glue, Amazon MSK, Amazon Redshift, and Amazon API Gateway.

Epos Now is currently in the process of implementing the full data lake architecture, with a rollout to customers planned for late 2022. Once live, they will deliver on their strategic goal to provide real-time transactional data and put insights directly in the hands of their merchants.


About the Authors

Jason Downing is VP of Data and Insights at Epos Now. He is responsible for the Epos Now data platform and product direction. He specializes in product management across a range of industries, including POS systems, mobile money, payments, and eWallets.

Debadatta Mohapatra is an AWS Data Lab Architect. He has extensive experience across big data, data science, and IoT, across consulting and industrials. He is an advocate of cloud-native data platforms and the value they can drive for customers across industries.

Use SQL queries to define Amazon Redshift datasets in AWS Glue DataBrew

Post Syndicated from Suraj Shivananda original https://aws.amazon.com/blogs/big-data/use-sql-queries-to-define-amazon-redshift-datasets-in-aws-glue-databrew/

In the post Data preparation using Amazon Redshift with AWS Glue DataBrew, we saw how to create an AWS Glue DataBrew job using a JDBC connection for Amazon Redshift. In this post, we show you how to create a DataBrew profile job and a recipe job using an Amazon Redshift connection with custom SQL.

DataBrew is a visual data preparation tool that can help you simplify your extract, transform, and load (ETL) process. You can now define a dataset from Amazon Redshift by applying custom SQL statements. Applying a custom SQL statement to a large source table allows you to select, join, and filter the data before cleaning, normalizing, and transforming it in a DataBrew project. Filtering and joining the data from your data source and only bringing in the data you want to transform simplifies the ETL process.

In this post, we demonstrate how to use custom SQL queries to define your Amazon Redshift datasets in DataBrew.

Solution overview

To implement this solution, you complete the following high-level steps:

  1. Create an Amazon Redshift connection.
  2. Create your dataset and use SQL queries to define your Amazon Redshift source datasets.
  3. Create a DataBrew profile job to profile the source data.
  4. Create a DataBrew project and recipe job to transform the data and load it to Amazon Simple Storage Service (Amazon S3).

The following diagram illustrates the architecture for our solution.

Prerequisites

To use this solution, complete the following prerequisite steps:

  1. Have an AWS account.
  2. Create an Amazon Redshift cluster in a private subnet within a VPC as a security best practice.
  3. Because DataBrew commands require that the cluster has access to Amazon S3, make sure you create a gateway VPC endpoint to Amazon S3. The gateway endpoint provides reliable connectivity to Amazon S3 without requiring an internet gateway or NAT device from your VPC.
  4. Enable the enhanced VPC routing in the Amazon Redshift cluster. Enhanced VPC routing forces all Amazon Redshift commands to use the connectivity to the gateway VPC endpoint to Amazon S3 in the same AWS Region as your cluster.
  5. Create a database and tables, and load the sample data in the Amazon Redshift cluster.
  6. Prepare a SQL query to extract the source dataset. You use this SQL query later in this post to create an Amazon Redshift source dataset in DataBrew.
  7. Create an S3 bucket to store data from the profile and recipe jobs. The DataBrew connection temporarily stores intermediate data in Amazon S3.
  8. For our use case, we use a mock dataset. You can download the DDL and data files from GitHub.

Security best practices

Consider the following best practices in order to mitigate security threats:

  • Review the shared responsibility model when using DataBrew.
  • Restrict network access for inbound and outbound traffic to least privilege. Take advantage of the routing traffic within the VPC by using an Amazon S3 gateway endpoint and enhanced VPC routing in Amazon Redshift.
  • Enable the lifecycle policy in Amazon S3 to retain only necessary data, and delete unnecessary data.
  • Enable Amazon S3 versioning and cross-Region replication for critical datasets to protect against accidental deletes.
  • Enable server-side encryption using AWS KMS (SSE-KMS) or Amazon S3 (SSE-S3).
  • DataBrew uses Amazon CloudWatch for logging, so you should update your log retention period to retain logs for the appropriate length of time.

Create an Amazon Redshift connection

In this section, you create a connection in DataBrew to connect to your Amazon Redshift cluster.

  1. On the DataBrew console, choose Datasets in the navigation pane.
  2. On the Connections tab, choose Create connection.
  3. For Connection name, enter a name, such as order-db-connection.
  4. For Connection type, select Amazon Redshift.
  5. Under Connection access, provide the Amazon Redshift cluster name, database name, database user, and database password.
  6. Choose Create connection.

Create your dataset by applying a custom SQL statement to filter the source data

In this section, you create a Amazon Redshift connection, add your custom SQL statement, and validate it. You can also validate your SQL statement directly in your Amazon Redshift cluster by using the Amazon Redshift query editor v2. The purpose of validating the SQL statement is to help you avoid failure in loading your dataset into a project or job. Also, checking the query runtime ensures that it runs in under 3 minutes, avoiding timeouts during project loading. To analyze and improve query performance in Amazon Redshift, see Tuning query performance.

  1. On the DataBrew console, choose Datasets in the navigation pane.
  2. On the Datasets tab, choose Connect new dataset.
  3. For Dataset name, enter a name, such as order-data.
  4. In the left pane, choose Amazon Redshift under Database connections.
  5. Add your Amazon Redshift connection and select Enter custom SQL.
  6. Enter the SQL query and choose Validate SQL.
  7. Under Additional configurations, for Enter S3 destination, provide an S3 destination to temporarily store the intermediate results.
  8. Choose Create dataset.

Create a DataBrew profile job

In this section, you use the newly created Amazon Redshift dataset to create a profile job. Data profiling helps you understand your dataset and plan the data preparation steps needed in running your recipe jobs.

  1. On the DataBrew console, choose Jobs in the navigation pane.
  2. On the Profile jobs tab, choose Create job.
  3. For Job name, enter a name, such as order-data-profile-job.
  4. For Job type¸ select Create a profile job.
  5. Under Job input, choose Browse datasets and choose the dataset you created earlier (order-data).
  6. For Data sample, select Full dataset.
  7. Under Job output settings¸ for S3 location, enter the S3 bucket for the job output files.
  8. For Role name, choose an AWS Identity and Access Management (IAM) role with permission for DataBrew to connect to the data on your behalf. For more information, refer to Adding an IAM role with data resource permissions.
  9. Choose Create and run job.

Check the status of your profile job. A profile output file is created and stored in Amazon S3 upon completion. You can choose View data profile to see more information.

In addition to an output file, DataBrew also provides visualizations. On the Dataset profile overview tab, you can see data visualizations that can help you understand your data better. Next, you can see detailed statistics about your data on the Column statistics tab, illustrated with graphics and charts. You can define data quality rules on the Data quality rules tab, and then see the results from the data quality ruleset that applies to this dataset.

For example, in the following screenshot, the amount column has 2% missing values, as shown on the Column statistics tab. You can provide rules that avoid triggering a recipe job in case of an anomaly. You can also notify the source teams to handle or acknowledge the missing values. DataBrew users can also add steps in the recipe job to handle the anomalies and missing values.

Create a DataBrew project and recipe job

In this section, you start analyzing and transforming your Amazon Redshift dataset in a DataBrew project. The custom SQL statement runs in Amazon Redshift when the project is loaded. Databrew performs read-only access to your source data.

Create a project

To create your project, complete the following steps:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Choose Create project.
  3. For Project name, enter a name, such as order-data-proj.
  4. Under Recipe details¸ choose Create new recipe and enter a recipe name, such as order-data-proj-recipe.
  5. For Select a dataset, select My datasets.
  6. Select the dataset you created earlier (order-data).
  7. Under Permissions, for Role name, choose your DataBrew role.
  8. Choose Create project.

DataBrew starts a session, constructs a DataFrame, extracts sample data, infers basic statistics, and displays the sample data in a grid view. You can add steps to build a transformation recipe. As of this writing, DataBrew offers over 350 transformations, with more on the way.

For our example use case, Company ABC has set a target to ship all orders within 7 days after the order date (internal SLA). They want a list of orders that didn’t meet the 7-day SLA for additional investigation. The following sample recipe contains steps to handle the missing values, filter the values by amount, change the date format, calculate the date difference, and filter the values by shipping days. The detailed steps are as follows:

  1. Fill missing values with 0 for the amount column.
  2. Filter values by amount greater than 0.
  3. Change the format of order_timestamp to align with ship_date.
  4. Create a new column called days_for_shipping using the dateTime function DATEDIFF to show the difference between order_timestamp and ship_date in days.
  5. Filter the values by days_for_shipping greater than 7.

Create a recipe job

To create your DataBrew recipe job, complete the following steps:

  1. On the DataBrew console, choose Jobs in the navigation pane.
  2. Choose Create job.
  3. For Job name¸ enter a name, such as SHIPPING-SLA-MISS.
  4. Under Job output settings, configure your Amazon S3 output settings.
  5. For S3 location, enter the location of your output bucket.
  6. For Role name, choose the IAM role that contains permissions for DataBrew to connect on your behalf.
  7. Choose Create and run job.

You can check the status of your job on the Jobs page.

The output file is in Amazon S3 as specified, and your data transformation is now complete.

Clean up

To avoid incurring future charges, we recommend deleting the resources you created during this walkthrough.

Conclusion

In this post, we walked through applying custom SQL statements to an Amazon Redshift data source in your dataset, which you can use in profiling and transformation jobs. You can now focus on building your data transformation steps knowing that you’re working on only the needed data.

To learn more about the various supported data sources for DataBrew, see Connecting to data with AWS Glue DataBrew.


About the authors

Suraj Shivananda is a Solutions Architect at AWS. He has over a decade of experience in Software Engineering, Data and Analytics, DevOps specifically for data solutions, automating and optimizing cloud based solutions. He’s a trusted technical advisor and helps customers build Well Architected solutions on the AWS platform.

Marie Yap is a Principal Solutions Architect for Amazon Web Services based in Hawaii. In this role, she helps various organizations begin their journey to the cloud. She also specializes in analytics and modern data architectures.

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.

Process Apache Hudi, Delta Lake, Apache Iceberg dataset at scale, part 2: Using AWS Glue Studio Visual Editor

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/part-2-integrate-apache-hudi-delta-lake-apache-iceberg-dataset-at-scale-using-aws-glue-studio-visual-editor/

Transactional data lake technologies such as Apache Hudi, Delta Lake, Apache Iceberg, and AWS Lake Formation governed tables is evolving rapidly, and gaining great popularity. These technologies simplified the data processing pipeline significantly, and they provided further useful capabilities like upserts, rolling back, and time travel queries.

In the first post of this series, we went through how to process Apache Hudi, Delta Lake, and Apache Iceberg datasets using AWS Glue connectors. AWS Glue simplifies reading and writing your data in those data lake formats, and building the data lakes on top of those technologies. Running the sample notebooks on AWS Glue Studio notebook, you could interactively develop and run your code, then immediately see the results. The notebooks let you explore how those technologies work when you have coding experience.

This second post focuses on other use cases for customers who prefer visual job authoring without writing custom code. Even without coding experience, you can easily build your transactional data lakes on AWS Glue Studio visual editor, and take advantage of those transactional data lake technologies. In addition, you can also use Amazon Athena to query the data stored using Hudi and Iceberg. This tutorial demonstrates how to read and write each format on AWS Glue Studio visual editor, and then how to query from Athena.

Process Apache Hudi, Delta Lake, Apache Iceberg dataset at scale

Prerequisites

The following are the instructions to read/write tables using each data lake format on AWS Glue Studio Visual Editor. You can use any of the marketplace connector or the custom connector based on your requirements.

To continue this tutorial, you must create the following AWS resources in advance:

Reads/writes using the connector on AWS Glue Studio Visual Editor

In this tutorial, you read and write each of the transaction data lake format data on the AWS Glue Studio Visual Editor. There are three main configurations: connection, connection options, and job parameters that you must configure per the data lake format. Note that no code is included in this tutorial. Let’s see how it works.

Apache Hudi writes

Complete following steps to write into Apache Hudi table using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose Amazon S3.
  5. For Target, choose hudi-0101-byoc-connector.
  6. Choose Create.
  7. Under Visual, choose Data source – S3 bucket.
  8. Under Node properties, for S3 source type, choose S3 location.
  9. For S3 URL, enter s3://covid19-lake/rearc-covid-19-world-cases-deaths-testing/json/.
  10. Choose Data target – Connector.
  11. Under Node properties, for Connection, choose hudi-0101-byoc-connection.
  12. For Connection options, enter the following pairs of Key and Value (choose Add new option to enter a new pair).
    1. Key: path. Value: <Your S3 path for Hudi table location>
    2. Key: hoodie.table.name, Value: test
    3. Key: hoodie.datasource.write.storage.type, Value: COPY_ON_WRITE
    4. Key: hoodie.datasource.write.operation, Value: upsert
    5. Key: hoodie.datasource.write.recordkey.field, Value: location
    6. Key: hoodie.datasource.write.precombine.field, Value: date
    7. Key: hoodie.datasource.write.partitionpath.field, Value: iso_code
    8. Key: hoodie.datasource.write.hive_style_partitioning, Value: true
    9. Key: hoodie.datasource.hive_sync.enable, Value: true
    10. Key: hoodie.datasource.hive_sync.database, Value: hudi
    11. Key: hoodie.datasource.hive_sync.table, Value: test
    12. Key: hoodie.datasource.hive_sync.partition_fields, Value: iso_code
    13. Key: hoodie.datasource.hive_sync.partition_extractor_class, Value: org.apache.hudi.hive.MultiPartKeysValueExtractor
    14. Key: hoodie.datasource.hive_sync.use_jdbc, Value: false
    15. Key: hoodie.datasource.hive_sync.mode, Value: hms
  13. Under Job details, for IAM Role, choose your IAM role.
  14. Under Advanced properties, for Job parameters, choose Add new parameter.
  15. For Key, enter --conf.
  16. For Value, enter spark.serializer=org.apache.spark.serializer.KryoSerializer.
  17. Choose Save.
  18. Choose Run.

Apache Hudi reads

Complete following steps to read from the Apache Hudi table that you created in the previous section using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose hudi-0101-byoc-connector.
  5. For Target, choose Amazon S3.
  6. Choose Create.
  7. Under Visual, choose Data source – Connection.
  8. Under Node properties, for Connection, choose hudi-0101-byoc-connection.
  9. For Connection options, choose Add new option.
  10. For Key, enter path. For Value, enter your S3 path for your Hudi table that you created in the previous section.
  11. Choose Transform – ApplyMapping, and choose Remove.
  12. Choose Data target – S3 bucket.
  13. Under Data target properties, for Format, choose JSON.
  14. For S3 Target type. choose S3 location.
  15. For S3 Target Location enter your S3 path for output location.
  16. Under Job details, for IAM Role, choose your IAM role.
  17. Choose Save.
  18. Choose Run.

Delta Lake writes

Complete the following steps to write into the Delta Lake table using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose Amazon S3.
  5. For Target, choose delta-100-byoc-connector.
  6. Choose Create.
  7. Under Visual, choose Data source – S3 bucket.
  8. Under Node properties, for S3 source type, choose S3 location.
  9. For S3 URL, enter s3://covid19-lake/rearc-covid-19-world-cases-deaths-testing/json/.
  10. Choose Data target – Connector.
  11. Under Node properties, for Connection, choose your delta-100-byoc-connection.
  12. For Connection options, choose Add new option.
  13. For Key, enter path. For Value, enter your S3 path for Delta table location. Choose Add new option.
  14. For Key, enter partitionKeys. For Value, enter iso_code.
  15. Under Job details, for IAM Role, choose your IAM role.
  16. Under Advanced properties, for Job parameters, choose Add new parameter.
  17. For Key, enter --conf.
  18. For Value, enter spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog.
  19. Choose Save.
  20. Choose Run.

Delta Lake reads

Complete the following steps to read from the Delta Lake table that you created in the previous section using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose delta-100-byoc-connector.
  5. For Target, choose Amazon S3.
  6. Choose Create.
  7. Under Visual, choose Data source – Connection.
  8. Under Node properties, for Connection, choose delta-100-byoc-connection.
  9. For Connection options, choose Add new option.
  10. For Key, enter path. For Value, enter your S3 path for Delta table that you created in the previous section. Choose Add new option.
  11. For Key, enter partitionKeys. For Value, enter iso_code.
  12. Choose Transform – ApplyMapping, and choose Remove.
  13. Choose Data target – S3 bucket.
  14. Under Data target properties, for Format, choose JSON.
  15. For S3 Target type, choose S3 location.
  16. For S3 Target Location enter your S3 path for output location.
  17. Under Job details, for IAM Role, choose your IAM role.
  18. Under Advanced properties, for Job parameters, choose Add new parameter.
  19. For Key, enter --conf.
  20. For Value, enter spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog.
  21. Choose Save.
  22. Choose Run.

Apache Iceberg writes

Complete the following steps to write into Apache Iceberg table using the connector:

  1. Open AWS Glue console.
  2. Choose Databases.
  3. Choose Add database.
  4. For database name, enter iceberg, and choose Create.
  5. Open AWS Glue Studio.
  6. Choose Jobs.
  7. Choose Visual with a source and target.
  8. For Source, choose Amazon S3.
  9. For Target, choose iceberg-0131-byoc-connector.
  10. Choose Create.
  11. Under Visual, choose Data source – S3 bucket.
  12. Under Node properties, for S3 source type, choose S3 location.
  13. For S3 URL, enter s3://covid19-lake/rearc-covid-19-world-cases-deaths-testing/json/.
  14. Choose Data target – Connector.
  15. Under Node properties, for Connection, choose iceberg-0131-byoc-connection.
  16. For Connection options, choose Add new option.
  17. For Key, enter path. For Value, enter glue_catalog.iceberg.test.
  18. Choose SQL under Transform to create a new AWS Glue Studio node.
  19. Under Node properties, for Node parents, choose ApplyMapping.
  20. Under Transform, for SQL alias, verify that myDataSource is entered.
  21. For SQL query, enter CREATE TABLE glue_catalog.iceberg.test AS SELECT * FROM myDataSource WHERE 1=2. This is to create a table definition with no records because the Iceberg target requires table definition before data ingestion.
  22. Under Job details, for IAM Role, choose your IAM role.
  23. Under Advanced properties, for Job parameters, choose Add new parameter.
  24. For Key, enter --conf.
  25. For Value, enter the following value (replace the placeholder your_s3_bucket with your S3 bucket name): spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse=s3://your_s3_bucket/iceberg/warehouse --conf spark.sql.catalog.glue_catalog.catalog-impl --conf park.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager --conf spark.sql.catalog.glue_catalog.lock.table=iceberg_lock --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
  26. Choose Save.
  27. Choose Run.

Apache Iceberg reads

Complete the following steps to read from Apache Iceberg table that you created in the previous section using the connector:

  1. Open AWS Glue Studio.
  2. Choose Jobs.
  3. Choose Visual with a source and target.
  4. For Source, choose Apache Iceberg Connector for AWS Glue 3.0.
  5. For Target, choose Amazon S3.
  6. Choose Create.
  7. Under Visual, choose Data source – Connection.
  8. Under Node properties, for Connection, choose your Iceberg connection name.
  9. For Connection options, choose Add new option.
  10. For Key, enter path. For Value, enter glue_catalog.iceberg.test.
  11. Choose Transform – ApplyMapping, and choose Remove.
  12. Choose Data target – S3 bucket.
  13. Under Data target properties, for Format, choose JSON.
  14. For S3 Target type, choose S3 location.
  15. For S3 Target Location enter your S3 path for the output location.
  16. Under Job details, for IAM Role, choose your IAM role.
  17. Under Advanced properties, for Job parameters, choose Add new parameter.
  18. For Key, enter --conf.
  19. For Value, enter the following value (replace the placeholder your_s3_bucket with your S3 bucket name): spark.sql.catalog.glue_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.glue_catalog.warehouse=s3://your_s3_bucket/iceberg/warehouse --conf spark.sql.catalog.glue_catalog.catalog-impl --conf park.sql.catalog.glue_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.glue_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager --conf spark.sql.catalog.glue_catalog.lock.table=iceberg_lock --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
  20. Choose Save.
  21. Choose Run.

Query from Athena

The Hudi table and the iceberg tables created with the above instructions are also queryable from Athena.

  1. Open the Athena console.
  2. Run the following SQL to query the Hudi table:
    SELECT * FROM "hudi"."test" LIMIT 10

  3. Run the following SQL to query the Iceberg table:
    SELECT * FROM "iceberg"."test" LIMIT 10

If you want to query the Delta table from Athena, follow Presto, Trino, and Athena to Delta Lake integration using manifests.

Conclusion

This post summarized how to utilize Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue platform, as well as demonstrated how each format works with the AWS Glue Studio Visual Editor. You can start using those data lake formats easily in any of the AWS Glue DynamicFrames, Spark DataFrames, and Spark SQL on the AWS Glue jobs, the AWS Glue Studio notebooks, and the AWS Glue Studio visual editor.


About the Author

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys collaborating with different teams to deliver results like this post. In his spare time, he enjoys playing video games with his family.

Process Apache Hudi, Delta Lake, Apache Iceberg datasets at scale, part 1: AWS Glue Studio Notebook

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/part-1-integrate-apache-hudi-delta-lake-apache-iceberg-datasets-at-scale-aws-glue-studio-notebook/

Cloud data lakes provides a scalable and low-cost data repository that enables customers to easily store data from a variety of data sources. Data scientists, business analysts, and line of business users leverage data lake to explore, refine, and analyze petabytes of data. AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. Customers use AWS Glue to discover and extract data from a variety of data sources, enrich and cleanse the data before storing it in data lakes and data warehouses.

Over years, many table formats have emerged to support ACID transaction, governance, and catalog usecases. For example, formats such as Apache Hudi, Delta Lake, Apache Iceberg, and AWS Lake Formation governed tables, enabled customers to run ACID transactions on Amazon Simple Storage Service (Amazon S3). AWS Glue supports these table formats for batch and streaming workloads. This post focuses on Apache Hudi, Delta Lake, and Apache Iceberg, and summarizes how to use them in AWS Glue 3.0 jobs. If you’re interested in AWS Lake Formation governed tables, then visit Effective data lakes using AWS Lake Formation series.

Bring libraries for the data lake formats

Today, there are three available options for bringing libraries for the data lake formats on the AWS Glue job platform: Marketplace connectors, custom connectors (BYOL), and extra library dependencies.

Marketplace connectors

AWS Glue Connector Marketplace is the centralized repository for cataloging the available Glue connectors provided by multiple vendors. You can subscribe to more than 60 connectors offered in AWS Glue Connector Marketplace as of today. There are marketplace connectors available for Apache Hudi, Delta Lake, and Apache Iceberg. Furthermore, the marketplace connectors are hosted on Amazon Elastic Container Registry (Amazon ECR) repository, and downloaded to the Glue job system in runtime. When you prefer simple user experience by subscribing to the connectors and using them on your Glue ETL jobs, the marketplace connector is a good option.

Custom connectors as bring-your-own-connector (BYOC)

AWS Glue custom connector enables you to upload and register your own libraries located in Amazon S3 as Glue connectors. You have more control over the library versions, patches, and dependencies. Since it uses your S3 bucket, you can configure the S3 bucket policy to share the libraries only with specific users, you can configure private network access to download the libraries using VPC Endpoints, etc. When you prefer having more control over those configurations, the custom connector as BYOC is a good option.

Extra library dependencies

There is another option – to download the data lake format libraries, upload them to your S3 bucket, and add extra library dependencies to them. With this option, you can add libraries directly to the job without a connector and use them. In Glue job, you can configure in Dependent JARs path. In API, it’s the --extra-jars parameter. In Glue Studio notebook, you can configure in the %extra_jars magic. To download the relevant JAR files, see the library locations in the section Create a Custom connection (BYOC).

Create a Marketplace connection

To create a new marketplace connection for Apache Hudi, Delta Lake, or Apache Iceberg, complete the following steps.

Apache Hudi 0.10.1

Complete the following steps to create a marketplace connection for Apache Hudi 0.10.1:

  1. Open AWS Glue Studio.
  2. Choose Connectors.
  3. Choose Go to AWS Marketplace.
  4. Search for Apache Hudi Connector for AWS Glue, and choose Apache Hudi Connector for AWS Glue.
  5. Choose Continue to Subscribe.
  6. Review the Terms and conditions, pricing, and other details, and choose the Accept Terms button to continue.
  7. Make sure that the subscription is complete and you see the Effective date populated next to the product, and then choose Continue to Configuration.
  8. For Delivery Method, choose Glue 3.0.
  9. For Software version, choose 0.10.1.
  10. Choose Continue to Launch.
  11. Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio. You’re redirected to AWS Glue Studio.
  12. For Name, enter a name for your connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Delta Lake 1.0.0

Complete the following steps to create a marketplace connection for Delta Lake 1.0.0:

  1. Open AWS Glue Studio.
  2. Choose Connectors.
  3. Choose Go to AWS Marketplace.
  4. Search for Delta Lake Connector for AWS Glue, and choose Delta Lake Connector for AWS Glue.
  5. Choose Continue to Subscribe.
  6. Review the Terms and conditions, pricing, and other details, and choose the Accept Terms button to continue.
  7. Make sure that the subscription is complete and you see the Effective date populated next to the product, and then choose Continue to Configuration.
  8. For Delivery Method, choose Glue 3.0.
  9. For Software version, choose 1.0.0-2.
  10. Choose Continue to Launch.
  11. Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio. You’re redirected to AWS Glue Studio.
  12. For Name, enter a name for your connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Apache Iceberg 0.12.0

Complete the following steps to create a marketplace connection for Apache Iceberg 0.12.0:

  1. Open AWS Glue Studio.
  2. Choose Connectors.
  3. Choose Go to AWS Marketplace.
  4. Search for Apache Iceberg Connector for AWS Glue, and choose Apache Iceberg Connector for AWS Glue.
  5. Choose Continue to Subscribe.
  6. Review the Terms and conditions, pricing, and other details, and choose the Accept Terms button to continue.
  7. Make sure that the subscription is complete and you see the Effective date populated next to the product, and then choose Continue to Configuration.
  8. For Delivery Method, choose Glue 3.0.
  9. For Software version, choose 0.12.0-2.
  10. Choose Continue to Launch.
  11. Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio. You’re redirected to AWS Glue Studio.
  12. For Name, enter iceberg-0120-mp-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Create a Custom connection (BYOC)

You can create your own custom connectors from JAR files. In this section, you can see the exact JAR files that are used in the marketplace connectors. You can just use the files for your custom connectors for Apache Hudi, Delta Lake, and Apache Iceberg.

To create a new custom connection for Apache Hudi, Delta Lake, or Apache Iceberg, complete the following steps.

Apache Hudi 0.9.0

Complete following steps to create a custom connection for Apache Hudi 0.9.0:

  1. Download the following JAR files, and upload them to your S3 bucket.
    1. https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3-bundle_2.12/0.9.0/hudi-spark3-bundle_2.12-0.9.0.jar
    2. https://repo1.maven.org/maven2/org/apache/hudi/hudi-utilities-bundle_2.12/0.9.0/hudi-utilities-bundle_2.12-0.9.0.jar
    3. https://repo1.maven.org/maven2/org/apache/parquet/parquet-avro/1.10.1/parquet-avro-1.10.1.jar
    4. https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.1/spark-avro_2.12-3.1.1.jar
    5. https://repo1.maven.org/maven2/org/apache/calcite/calcite-core/1.10.0/calcite-core-1.10.0.jar
    6. https://repo1.maven.org/maven2/org/datanucleus/datanucleus-core/4.1.17/datanucleus-core-4.1.17.jar
    7. https://repo1.maven.org/maven2/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter comma separated Amazon S3 paths for the above JAR files.
  6. For Name, enter hudi-090-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter org.apache.hudi.
  9. Choose Create connector.
  10. Choose hudi-090-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter hudi-090-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Apache Hudi 0.10.1

Complete the following steps to create a custom connection for Apache Hudi 0.9.0:

  1. Download following JAR files, and upload them to your S3 bucket.
    1. hudi-utilities-bundle_2.12-0.10.1.jar
    2. hudi-spark3.1.1-bundle_2.12-0.10.1.jar
    3. spark-avro_2.12-3.1.1.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter comma separated Amazon S3 paths for the above JAR files.
  6. For Name, enter hudi-0101-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter org.apache.hudi.
  9. Choose Create connector.
  10. Choose hudi-0101-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter hudi-0101-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Note that the above Hudi 0.10.1 installation on Glue 3.0 does not fully support Merge On Read (MoR) tables.

Delta Lake 1.0.0

Complete the following steps to create a custom connector for Delta Lake 1.0.0:

  1. Download the following JAR file, and upload it to your S3 bucket.
    1. https://repo1.maven.org/maven2/io/delta/delta-core_2.12/1.0.0/delta-core_2.12-1.0.0.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter a comma separated Amazon S3 path for the above JAR file.
  6. For Name, enter delta-100-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter org.apache.spark.sql.delta.sources.DeltaDataSource.
  9. Choose Create connector.
  10. Choose delta-100-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter delta-100-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Apache Iceberg 0.12.0

Complete the following steps to create a custom connection for Apache Iceberg 0.12.0:

  1. Download the following JAR files, and upload them to your S3 bucket.
    1. https://search.maven.org/remotecontent?filepath=org/apache/iceberg/iceberg-spark3-runtime/0.12.0/iceberg-spark3-runtime-0.12.0.jar
    2. https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.15.40/bundle-2.15.40.jar
    3. https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.15.40/url-connection-client-2.15.40.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter comma separated Amazon S3 paths for the above JAR files.
  6. For Name, enter iceberg-0120-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter iceberg.
  9. Choose Create connector.
  10. Choose iceberg-0120-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter iceberg-0120-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Apache Iceberg 0.13.1

Complete the following steps to create a custom connection for Apache Iceberg 0.13.1:

  1. Download the following JAR files, and upload them to your S3 bucket.
    1. iceberg-spark-runtime-3.1_2.12-0.13.1.jar
    2. https://repo1.maven.org/maven2/software/amazon/awssdk/bundle/2.17.161/bundle-2.17.161.jar
    3. https://repo1.maven.org/maven2/software/amazon/awssdk/url-connection-client/2.17.161/url-connection-client-2.17.161.jar
  2. Open AWS Glue Studio.
  3. Choose Connectors.
  4. Choose Create custom connector.
  5. For Connector S3 URL, enter comma separated Amazon S3 paths for the above JAR files.
  6. For Name, enter iceberg-0131-byoc-connector.
  7. For Connector Type, choose Spark.
  8. For Class name, enter iceberg.
  9. Choose Create connector.
  10. Choose iceberg-0131-byoc-connector.
  11. Choose Create connection.
  12. For Name, enter iceberg-0131-byoc-connection.
  13. Optionally, choose a VPC, subnet, and security group.
  14. Choose Create connection.

Prerequisites

To continue this tutorial, you must create the following AWS resources in advance:

  • AWS Identity and Access Management (IAM) role for your ETL job or notebook as instructed in Set up IAM permissions for AWS Glue Studio. Note that AmazonEC2ContainerRegistryReadOnly or equivalent permissions are needed when you use the marketplace connectors.
  • Amazon S3 bucket for storing data.
  • Glue connection (one of the marketplace connector or the custom connector corresponding to the data lake format).

Reads/writes using the connector on AWS Glue Studio Notebook

The following are the instructions to read/write tables using each data lake format on AWS Glue Studio Notebook. As a prerequisite, make sure that you have created a connector and a connection for the connector using the information above.
The example notebooks are hosted on AWS Glue Samples GitHub repository. You can find 7 notebooks available. In the following instructions, we will use one notebook per data lake format.

Apache Hudi

To read/write Apache Hudi tables in the AWS Glue Studio notebook, complete the following:

  1. Download hudi_dataframe.ipynb.
  2. Open AWS Glue Studio.
  3. Choose Jobs.
  4. Choose Jupyter notebook and then choose Upload and edit an existing notebook. From Choose file, select your ipynb file and choose Open, then choose Create.
  5. On the Notebook setup page, for Job name, enter your job name.
  6. For IAM role, select your IAM role. Choose Create job. After a short time period, the Jupyter notebook editor appears.
  7. In the first cell, replace the placeholder with your Hudi connection name, and run the cell:
    %connections hudi-0101-byoc-connection (Alternatively you can use your connection name created from the marketplace connector).
  8. In the second cell, replace the S3 bucket name placeholder with your S3 bucket name, and run the cell.
  9. Run the cells in the section Initialize SparkSession.
  10. Run the cells in the section Clean up existing resources.
  11. Run the cells in the section Create Hudi table with sample data using catalog sync to create a new Hudi table with sample data.
  12. Run the cells in the section Read from Hudi table to verify the new Hudi table. There are five records in this table.
  13. Run the cells in the section Upsert records into Hudi table to see how upsert works on Hudi. This code inserts one new record, and updates the one existing record. You can verify that there is a new record product_id=00006, and the existing record product_id=00001’s price has been updated from 250 to 400.
  14. Run the cells in the section Delete a Record. You can verify that the existing record product_id=00001 has been deleted.
  15. Run the cells in the section Point in time query. You can verify that you’re seeing the previous version of the table where the upsert and delete operations haven’t been applied yet.
  16. Run the cells in the section Incremental Query. You can verify that you’re seeing only the recent commit about product_id=00006.

On this notebook, you could complete the basic Spark DataFrame operations on Hudi tables.

Delta Lake

To read/write Delta Lake tables in the AWS Glue Studio notebook, complete following:

  1. Download delta_sql.ipynb.
  2. Open AWS Glue Studio.
  3. Choose Jobs.
  4. Choose Jupyter notebook, and then choose Upload and edit an existing notebook. From Choose file, select your ipynb file and choose Open, then choose Create.
  5. On the Notebook setup page, for Job name, enter your job name.
  6. For IAM role, select your IAM role. Choose Create job. After a short time period, the Jupyter notebook editor appears.
  7. In the first cell, replace the placeholder with your Delta connection name, and run the cell:
    %connections delta-100-byoc-connection
  8. In the second cell, replace the S3 bucket name placeholder with your S3 bucket name, and run the cell.
  9. Run the cells in the section Initialize SparkSession.
  10. Run the cells in the section Clean up existing resources.
  11. Run the cells in the section Create Delta table with sample data to create a new Delta table with sample data.
  12. Run the cells in the section Create a Delta Lake table.
  13. Run the cells in the section Read from Delta Lake table to verify the new Delta table. There are five records in this table.
  14. Run the cells in the section Insert records. The query inserts two new records: record_id=00006, and record_id=00007.
  15. Run the cells in the section Update records. The query updates the price of the existing records record_id=00007, and record_id=00007 from 500 to 300.
  16. Run the cells in the section Upsert records. to see how upsert works on Delta. This code inserts one new record, and updates the one existing record. You can verify that there is a new record product_id=00008, and the existing record product_id=00001’s price has been updated from 250 to 400.
  17. Run the cells in the section Alter DeltaLake table. The queries add one new column, and update the values in the column.
  18. Run the cells in the section Delete records. You can verify that the record product_id=00006 because it’s product_name is Pen.
  19. Run the cells in the section View History to describe the history of operations that was triggered against the target Delta table.

On this notebook, you could complete the basic Spark SQL operations on Delta tables.

Apache Iceberg

To read/write Apache Iceberg tables in the AWS Glue Studio notebook, complete the following:

  1. Download iceberg_sql.ipynb.
  2. Open AWS Glue Studio.
  3. Choose Jobs.
  4. Choose Jupyter notebook and then choose Upload and edit an existing notebook. From Choose file, select your ipynb file and choose Open, then choose Create.
  5. On the Notebook setup page, for Job name, enter your job name.
  6. For IAM role, select your IAM role. Choose Create job. After a short time period, the Jupyter notebook editor appears.
  7. In the first cell, replace the placeholder with your Delta connection name, and run the cell:
    %connections iceberg-0131-byoc-connection (Alternatively you can use your connection name created from the marketplace connector).
  8. In the second cell, replace the S3 bucket name placeholder with your S3 bucket name, and run the cell.
  9. Run the cells in the section Initialize SparkSession.
  10. Run the cells in the section Clean up existing resources.
  11. Run the cells in the section Create Iceberg table with sample data to create a new Iceberg table with sample data.
  12. Run the cells in the section Read from Iceberg table.
  13. Run the cells in the section Upsert records into Iceberg table.
  14. Run the cells in the section Delete records.
  15. Run the cells in the section View History and Snapshots.

On this notebook, you could complete the basic Spark SQL operations on Iceberg tables.

Conclusion

This post summarized how to utilize Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue platform, as well as demonstrate how each format works with a Glue Studio notebook. You can start using those data lake formats easily in Spark DataFrames and Spark SQL on the Glue jobs or the Glue Studio notebooks.

This post focused on interactive coding and querying on notebooks. The upcoming part 2 will focus on the experience using AWS Glue Studio Visual Editor and Glue DynamicFrames for customers who prefer visual authoring without the need to write code.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys learning different use cases from customers and sharing knowledge about big data technologies with the wider community.

Dylan Qu is a Specialist Solutions Architect focused on Big Data & Analytics with AWS. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Monjumi Sarma is a Data Lab Solutions Architect at AWS. She helps customers architect data analytics solutions, which gives them an accelerated path towards modernization initiatives.

Data warehouse and business intelligence technology consolidation using AWS

Post Syndicated from Bappaditya Datta original https://aws.amazon.com/blogs/architecture/data-warehouse-and-business-intelligence-technology-consolidation-using-aws/

Organizations have been using data warehouse and business intelligence (DWBI) workloads to support business decision making for many years. These workloads are brought to the Amazon Web Services (AWS) platform to utilize the benefit of AWS cloud. However, these workloads are built using multiple vendor tools and technologies, and the customer faces the burden of administrative overhead.

This post provides architectural guidance to consolidate multiple DWBI technologies to AWS Managed Services to help reduce the administrative overhead, bring operational ease, and business efficiency. Two scenarios are explored:

  1. Upstream transactional databases are already on AWS
  2. Upstream transactional databases are present at on-premise datacenter

Challenges faced by an organization

Organizations are engaged in managing multiple DWBI technologies due to acquisitions, mergers, and the lift-and-shift of workloads. These workloads use extract, transform, and load (ETL) tools to read relational data from upstream transactional databases, process it, and store it in a data warehouse. Thereafter, these workloads use business intelligence tools to generate valuable insight and present it to users in form of reports and dashboards.

These DWBI technologies are generally installed and maintained on their own server. Figure 1 demonstrates the increased the administrative overhead for the organization but also creates challenges in maintaining the team’s overall knowledge.

DWBI workload with multiple tools

Figure 1. DWBI workload with multiple tools

Therefore, organizations are looking to consolidate technology usage and continue supporting important business functions.

Scenario 1

As we know, three major functions of DWBI workstream are:

  • ETL data using a tool
  • Store/manage the data in a data warehouse
  • Generate information from the data using business intelligence

Each of these functions can be performed efficiently using an AWS service. For example, AWS Glue can be used for ETL, Amazon Redshift for data warehouse, and Amazon QuickSight for business intelligence.

With the use of mentioned AWS services, organizations will be able to consolidate their DWBI technology usage. Organizations also will be able to quickly adapt to these services, as their engineering team can more easily use their DWBI knowledge with these services. For example, using SQL knowledge in AWS Glue jobs with SprakSQL, in Amazon Redshift queries, and in Amazon QuickSight dashboards.

Figure 2 demonstrates the redesigned the architecture of Figure 1 using AWS services. In this architecture, ETL functions are consolidated in AWS Glue. An AWS Glue crawler is used to auto-catalogue the source and target table metadata; then, AWS Glue ETL jobs use these catalogues to read data from source and write to target (data warehouse). AWS Glue jobs also apply necessary transformations (such as join, filter, and aggregate) to the data before writing. Additionally, an AWS Glue trigger is used to schedule the job executions. Alternatively, AWS Managed Workflows for Apache Airflow can be used to schedule jobs.

Consolidated workload with source on AWS

Figure 2. Consolidated workload with source on AWS

Similarly, data warehousing function is consolidated with Amazon Redshift. Amazon Redshift is used to store and organize enriched data and also enforce appropriate data access control for both workloads and users.

Lastly, business intelligence functions are consolidated using Amazon QuickSight. It used to create necessary dashboards that source data from Amazon Redshift and apply complex business logic to produce necessary charts and graphs needed for business insights. It is also used to implement necessary access restrictions to dashboards and data.

Scenario 2

In situation where source databases are in on-premises datacenter, the overall solution will be similar to Scenario 1, with an additional step to move the data continually from on-premise database to an Amazon Simple Storage Service (Amazon S3) bucket. The data movement can be efficiently handled by AWS Database Migration Service (AWS DMS).

To make the source database accessible to AWS DMS, a connection needs to established between the AWS cloud and on-premise network. Based on performance and throughput needs, the organization can choose either AWS Direct Connect service or AWS Site-to-Site VPN service to securely move the data. For the purpose of this discussion, we are considering AWS Direct Connect.

In Figure 3, AWS DMS task is used to perform a full-load followed by change data capture to continuously move the data to an S3 bucket. In this scenario, AWS Glue is used to catalogue and read the data from S3 bucket. The remaining portion of the dataflow is the same as the one mentioned in Scenario 1.

Consolidated workload with source at datacenter

Figure 3. Consolidated workload with source at datacenter

Scaling

Both of the updated architectures provide necessary scaling:

  • Auto scaling feature can be used to scale-up or -down AWS Glue ETL job resources
  • Concurrency scaling feature can be used to support virtually unlimited concurrent users and queries in Amazon Redshift
  • Amazon QuickSight resources (web server, Amazon QuickSight engine, and SPICE) are auto scaled by design

Security, monitoring, and auditing

Also, the updated architectures provide necessary security by using access control, data encryption at-rest and in transit, monitoring, and auditing.

Additionally, both Amazon Redshift and Amazon QuickSight provides their own authentication and access controls. Therefore, a user can be a local user or a federated one. With the help of these authentications, an organization will be able to control access to data in Amazon Redshift and also access to the dashboard in Amazon QuickSight.

Conclusion

In this blog post, we discussed how AWS Glue, Amazon Redshift, and Amazon QuickSight can be used to consolidate DWBI technologies. We also have discussed how an architecture can help an organization build a scalable, secure workload with auto scaling, access control, log monitoring and activity auditing.

Ready to get started?

How William Hill migrated NoSQL workloads at scale to Amazon Keyspaces

Post Syndicated from Kunal Gautam original https://aws.amazon.com/blogs/big-data/how-william-hill-migrated-nosql-workloads-at-scale-to-amazon-keyspaces/

Social gaming and online sports betting are competitive environments. The game must be able to handle large volumes of unpredictable traffic while simultaneously promising zero downtime. In this domain, user retention is no longer just desirable, it’s critical. William Hill is a global online gambling company based in London, England, and it is the founding member of the UK Betting and Gaming Council. They share the mission to champion the betting and gaming industry and set world-class standards to make sure of an enjoyable, fair, and safe betting and gambling experience for all of their customers. In sports betting, William Hill is an industry-leading brand, awarded with prestigious industry titles like the IGA Awards Sports Betting Operator of the year in 2019, 2020, and 2022, and the SBC Awards Racing Sportsbook of the Year in 2019. William Hill has been acquired by Caesars Entertainment, Inc (NASDAQ: CZR) in April 2021, and it’s the largest casino-entertainment company in the US and one of the world’s most diversified casino-entertainment providers. At the heart of William Hill gaming platform is a NoSQL database that maintains 100% uptime, scales in real-time to handle millions of users or more, and provides users with a responsive and personalized experience across all of their devices.

In this post, we’ll discuss how William Hill moved their workload from Apache Cassandra to Amazon Keyspaces (for Apache Cassandra) with zero downtime using AWS Glue ETL.

William Hill was facing challenges regarding scalability, cluster instability, high operational costs, and manual patching and server maintenance. They were looking for a NoSQL solution which was scalable, highly-available, and completely managed. This let them focus on providing better user experience rather than maintaining infrastructure. William Hill Limited decided to move forward with Amazon Keyspaces, since it can run Apache Cassandra workloads on AWS using the same Cassandra application code and developer tools used today, without the need to provision, patch, manage servers, install, maintain, or operate software.

Solution overview

William Hill Limited wanted to migrate their existing Apache Cassandra workloads to Amazon Keyspaces with a replication lag of minutes, with minimum migration costs and development efforts. Therefore, AWS Glue ETL was leveraged to deliver the desired outcome.

AWS Glue is a serverless data integration service that provides multiple benefits for migration:

  • No infrastructure to maintain; allocates the necessary computing power and runs multiple migration jobs simultaneously.
  • All-in-one pricing model that includes infrastructure and is 55% cheaper than other cloud data integration options.
  • No lock in with the service; possible to develop data migration pipelines in open-source Apache Spark (Spark SQL, PySpark, and Scala).
  • Migration pipeline can be scaled fearlessly with Amazon Keyspaces and AWS Glue.
  • Built-in pipeline monitoring to make sure of in-migration continuity.
  • AWS Glue ETL jobs make it possible to perform bulk data extraction from Apache Cassandra and ingest to Amazon Keyspaces.

In this post, we’ll take you through William Hill’s journey of building the migration pipeline from scratch to migrate the Apache Cassandra workload to Amazon Keyspaces by leveraging AWS Glue ETL with DataStax Spark Cassandra connector.

For the purpose of this post, let’s look at a typical Cassandra Network setup on AWS and the mechanism used to establish the connection with AWS Glue ETL. The migration solution described also works for Apache Cassandra hosted on on-premises clusters.

Architecture overview

The architecture demonstrates the migration environment that requires Amazon Keyspaces, AWS Glue, Amazon Simple Storage Service (Amazon S3), and the Apache Cassandra cluster. To avoid a high CPU utilization/saturation on the Apache Cassandra cluster during the migration process, you might want to deploy another Cassandra datacenter to isolate your production from the migration workload to make the migration process seamless for your customers.

Amazon S3 has been used for staging while migrating data from Apache Cassandra to Amazon Keyspaces to make sure that the IO load on Cassandra serving live traffic on production is minimized, in case the data upload to Amazon Keyspaces fails and a retry must be done.

Prerequisites

The Apache Cassandra cluster is hosted on Amazon Elastic Compute Cloud (Amazon EC2) instances, spread across three availability zones, and hosted in private subnets. AWS Glue ETL is hosted on Amazon Virtual Private Cloud (Amazon VPC) and thus needs a AWS Glue Studio custom Connectors and Connections to be setup to communicate with the Apache Cassandra nodes hosted on the private subnets in the customer VPC. Thereby, this enables the connection to the Cassandra cluster hosted in the VPC. The DataStax Spark Cassandra Connector must be downloaded and saved onto an Amazon S3 bucket: s3://$MIGRATION_BUCKET/jars/spark-cassandra-connector-assembly_2.12-3.2.0.jar.

Let’s create an AWS Glue Studio custom connector named cassandra_connection and its corresponding connection named conn-cassandra-custom for AWS region us-east-1.

For the connector created, create an AWS Glue Studio connection and populate it with network information VPC, and a Subnet allowing for AWS Glue ETL to establish a connection with Apache Casandra.

  • Name: conn-cassandra-custom
  • Network Options

Let’s begin by creating a keyspace and table in Amazon Keyspaces using Amazon Keyspaces Console or CQLSH, and then create a target keyspace named target_keyspace and a target table named target_table.

CREATE KEYSPACE target_keyspace WITH replication = {'class': 'SingleRegionStrategy'};

CREATE TABLE target_keyspace.target_table (
    userid      uuid,
    level       text,
    gameid      int,
    description text,
    nickname    text,
    zip         text,
    email       text,
    updatetime  text,
    PRIMARY KEY (userid, level, gameid)
) WITH default_time_to_live = 0 AND CUSTOM_PROPERTIES = {
	'capacity_mode':{
		'throughput_mode':'PROVISIONED',
		'write_capacity_units':76388,
		'read_capacity_units':3612
	}
} AND CLUSTERING ORDER BY (level ASC, gameid ASC);

After the table has been created, switch the table to on-demand mode to pre-warm the table and avoid AWS Glue ETL job throttling failures. The following script will update the throughput mode.

ALTER TABLE target_keyspace.target_table 
WITH CUSTOM_PROPERTIES = {
	'capacity_mode':{
		'throughput_mode':'PAY_PER_REQUEST'
	}
} 

Let’s go ahead and create two Amazon S3 buckets to support the migration process. The first bucket (s3://your-spark-cassandra-connector-bucket-name)should store the spark Cassandra connector assembly jar file, Cassandra, and Keyspaces configuration YAML files.

The second bucket (s3://your-migration-stage-bucket-name) will be used to store intermediate parquet files to identify the delta between the Cassandra cluster and the Amazon Keyspaces table to track changes between subsequent executions of the AWS Glue ETL jobs.

In the following KeyspacesConnector.conf, set your contact points to connect to Amazon Keyspaces, and replace the username and the password to the AWS credentials.

Using the RateLimitingRequestThrottler we can make sure that requests don’t exceed the configured Keyspaces capacity. The G1.X DPU creates one executor per worker. The RateLimitingRequestThrottler in this example is set for 1000 requests per second. With this configuration, and G.1X DPU, you’ll achieve 1000 request per AWS Glue worker. Adjust the max-requests-per-second accordingly to fit your workload. Increase the number of workers to scale throughput to a table.

datastax-java-driver {
  basic.request.consistency = "LOCAL_QUORUM"
  basic.contact-points = ["cassandra.us-east-1.amazonaws.com:9142"]
   advanced.reconnect-on-init = true
   basic.load-balancing-policy {
        local-datacenter = "us-east-1"
    }
    advanced.auth-provider = {
       class = PlainTextAuthProvider
       username = "user-at-sample"
       password = "S@MPLE=PASSWORD="
    }
    advanced.throttler = {
       class = RateLimitingRequestThrottler
       max-requests-per-second = 1000
       max-queue-size = 50000
       drain-interval = 1 millisecond
    }
    advanced.ssl-engine-factory {
      class = DefaultSslEngineFactory
      hostname-validation = false
    }
    advanced.connection.pool.local.size = 1
}

Similarly, create a CassandraConnector.conf file, set the contact points to connect to the Cassandra cluster, and replace the username and the password respectively.

datastax-java-driver {
  basic.request.consistency = "LOCAL_QUORUM"
  basic.contact-points = ["127.0.0.1:9042"]
   advanced.reconnect-on-init = true
   basic.load-balancing-policy {
        local-datacenter = "datacenter1"
    }
    advanced.auth-provider = {
       class = PlainTextAuthProvider
       username = "user-at-sample"
       password = "S@MPLE=PASSWORD="
    }
}

Build AWS Glue ETL migration pipeline with Amazon Keyspaces

To build reliable, consistent delta upload Glue ETL pipeline, let’s decouple the migration process into two AWS Glue ETLs.

  • CassandraToS3 Glue ETL: Read data from the Apache Cassandra cluster and transfer the migration workload to Amazon S3 in the Apache Parquet format. To identify incremental changes in the Cassandra tables, the job stores separate parquet files with primary keys with an updated timestamp.
  • S3toKeyspaces Glue ETL: Uploads the migration workload from Amazon S3 to Amazon Keyspaces. During the first run, the ETL uploads the complete data set from Amazon S3 to Amazon Keyspaces, and for the subsequent run calculates the incremental changes by comparing the updated timestamp across two subsequent runs and calculating the incremental difference. The job also takes care of inserting new records, updating existing records, and deleting records based on the incremental difference.

In this example, we’ll use Scala to write the AWS Glue ETL, but you can also use PySpark.

Let’s go ahead and create an AWS Glue ETL job named CassandraToS3 with the following job parameters:

aws glue create-job \
    --name "CassandraToS3" \
    --role "GlueKeyspacesMigration" \
    --description "Offload data from the Cassandra to S3" \
    --glue-version "3.0" \
    --number-of-workers 2 \
    --worker-type "G.1X" \
    --connections "conn-cassandra-custom" \
    --command "Name=glueetl,ScriptLocation=s3://$MIGRATION_BUCKET/scripts/CassandraToS3.scala" \
    --max-retries 0 \
    --default-arguments '{
        "--job-language":"scala",
        "--KEYSPACE_NAME":"source_keyspace",
        "--TABLE_NAME":"source_table",
        "--S3_URI_FULL_CHANGE":"s3://$MIGRATION_BUCKET/full-dataset/",
        "--S3_URI_CURRENT_CHANGE":"s3://$MIGRATION_BUCKET/incremental-dataset/current/",
        "--S3_URI_NEW_CHANGE":"s3://$MIGRATION_BUCKET/incremental-dataset/new/",
        "--extra-files":"s3://$MIGRATION_BUCKET/conf/CassandraConnector.conf",
        "--conf":"spark.cassandra.connection.config.profile.path=CassandraConnector.conf",
        "--class":"GlueApp"
    }'

The CassandraToS3 Glue ETL job reads data from the Apache Cassandra table source_keyspace.source_table and writes it to the S3 bucket in the Apache Parquet format. The job rotates the parquet files to help identify delta changes in the data between consecutive job executions. To identify inserts, updates, and deletes, you must know primary key and columns write times (updated timestamp) in the Cassandra cluster up front. Our primary key consists of several columns userid, level, gameid, and a write time column updatetime. If you have multiple updated columns, then you must use more than one write time columns with an aggregation function. For example, for email and updatetime, take the maximum value between write times for email and updatetime.

The following AWS Glue spark code offloads data to Amazon S3 using the spark-cassandra-connector. The script takes four parameters KEYSPACE_NAME, KEYSPACE_TABLE, S3_URI_CURRENT_CHANGE, S3_URI_CURRENT_CHANGE, and S3_URI_NEW_CHANGE.

To upload the data from Amazon S3 to Amazon Keyspaces, you must create a S3toKeyspaces Glue ETL job using the Glue spark code to read the parquet files from the Amazon S3 bucket created as an output of CassandraToS3 Glue job and identify inserts, updates, deletes, and execute requests against the target table in Amazon Keyspaces. The code sample provided takes four parameters: KEYSPACE_NAME, KEYSPACE_TABLE, S3_URI_CURRENT_CHANGE, S3_URI_CURRENT_CHANGE, and S3_URI_NEW_CHANGE.

Let’s go ahead and create our second AWS Glue ETL job S3toKeyspaces with the following job parameters:

aws glue create-job \
    --name "S3toKeyspaces" \
    --role "GlueKeyspacesMigration" \
    --description "Push data to Amazon Keyspaces" \
    --glue-version "3.0" \
    --number-of-workers 2 \
    --worker-type "G.1X" \
    --command "Name=glueetl,ScriptLocation=s3://amazon-keyspaces-backups/scripts/S3toKeyspaces.scala" \
    --default-arguments '{
        "--job-language":"scala",
        "--KEYSPACE_NAME":"target_keyspace",
        "--TABLE_NAME":"target_table",
        "--S3_URI_FULL_CHANGE":"s3://$MIGRATION_BUCKET/full-dataset/",
        "--S3_URI_CURRENT_CHANGE":"s3://$MIGRATION_BUCKET/incremental-dataset/current/",
        "--S3_URI_NEW_CHANGE":"s3://$MIGRATION_BUCKET/incremental-dataset/new/",
        "--extra-files":"s3://$MIGRATION_BUCKET/conf/KeyspacesConnector.conf",
        "--conf":"spark.cassandra.connection.config.profile.path=KeyspacesConnector.conf",
        "--class":"GlueApp"
    }'

Job scheduling

The final step is to configure AWS Glue Triggers or Amazon EventBridge depending on your scheduling needs to trigger S3toKeyspaces Glue ETL when the job CassandraToS3 has succeeded. If you want to run the CassandraToS3 based on the schedule and configure the schedule option, then the following example showcases how to schedule cassandraToS3 to run every 15 minutes.

Job tuning

There are Spark settings recommended to begin with Amazon Keyspaces, which can then be increased later as appropriate for your workload.

  • Use a Spark partition size (groups multiple Cassandra rows) smaller than 8 MBs to avoid replaying large Spark tasks during a task failure.
  • Use a low concurrent number of writes per DPU with a large number of retries. Add the following options to the job parameters: --conf spark.cassandra.query.retry.count=500 --conf spark.cassandra.output.concurrent.writes=3.
  • Set spark.task.maxFailures to a bounded value. For example, you can start from 32 and increase as needed. This option can help you increase a number of tasks reties during a table pre-warm stage. Add the following option to the job parameters: --conf spark.task.maxFailures=32
  • Another recommendation is to turn off batching to improve random access patterns. Add the following options to the job parameters:
    spark.cassandra.output.batch.size.rows=1
    spark.cassandra.output.batch.grouping.key=none spark.cassandra.output.batch.grouping.buffer.size=100
  • Randomize your workload. Amazon Keyspaces partitions data using partition keys. Although Amazon Keyspaces has built-in logic to help load balance requests for the same partition key, loading the data is faster and more efficient if you randomize the order because you can take advantage of the built-in load balancing of writing to different partitions. To spread the writes across the partitions evenly, you must randomize the data in the dataframe. You might use a rand function to shuffle rows in the dataframe.

Summary

William Hill was able to migrate their workload from Apache Cassandra to Amazon Keyspaces at scale using AWS Glue, without the needs to make any changes on their application tech stack. The adoption of Amazon Keyspaces has provided them with the headroom to focus on their Application and customer experience, as with Amazon Keyspaces there’s no need to manage servers, get performance at scale, highly-scalable, and secure solution with the ability to handle the sudden spike in demand.

In this post, you saw how to use AWS Glue to migrate the Cassandra workload to Amazon Keyspaces, and simultaneously keep your Cassandra source databases completely functional during the migration process. When your applications are ready, you can choose to cut over your applications to Amazon Keyspaces with minimal replication lag in sub minutes between the Cassandra cluster and Amazon Keyspaces. You can also use a similar pipeline to replicate the data back to the Cassandra cluster from Amazon Keyspaces to maintain data consistency, if needed. Here you can find the documents and code to help accelerate your migration to Amazon Keyspaces.


About the Authors

Nikolai Kolesnikov is a Senior Data Architect and helps AWS Professional Services customers build highly-scalable applications using Amazon Keyspaces. He also leads Amazon Keyspaces ProServe customer engagements.

Kunal Gautam is a Senior Big Data Architect at Amazon Web Services. Having experience in building his own Startup and working along with enterprises, he brings a unique perspective to get people, business and technology work in tandem for customers. He is passionate about helping customers in their digital transformation journey and enables them to build scalable data and advance analytics solutions to gain timely insights and make critical business decisions. In his spare time, Kunal enjoys Marathons, Tech Meetups and Meditation retreats.

Analyzing Amazon SES event data with AWS Analytics Services

Post Syndicated from Oscar Mendoza original https://aws.amazon.com/blogs/messaging-and-targeting/analyzing-amazon-ses-event-data-with-aws-analytics-services/

In this post, we will walk through using AWS Services, such as, Amazon Kinesis Firehose, Amazon Athena and Amazon QuickSight to monitor Amazon SES email sending events with the granularity and level of detail required to get insights from your customers engage with the emails you send.

Nowadays, email Marketers rely on internal applications to create their campaigns or any communications requirements, such us newsletters or promotional content. From those activities, they need to collect as much information as possible to analyze and improve their pipeline to get better interaction with the customers. Data such us bounces, rejections, success reception, delivery delays, complaints or open rate can be a powerful tool to understand the customers. Usually applications work with high-level data points without detailed logging or granular information that could help improve even better the effectiveness of their campaigns.

Amazon Simple Email Service (SES) is a smart tool for companies that wants a cost-effective, flexible, and scalable email service solution to easily integrate with their own products. Amazon SES provides methods to control your sending activity with built-in integration with Amazon CloudWatch Metrics and also provides a mechanism to collect the email sending events data.

In this post, we propose an architecture and step-by-step guide to track your email sending activities at a granular level, where you can configure several types of email sending events, including sends, deliveries, opens, clicks, bounces, complaints, rejections, rendering failures, and delivery delays. We will use the configuration set feature of Amazon SES to send detailed logging to our analytics services to store, query and create dashboards for a detailed view.

Overview of solution

This architecture uses Amazon SES built-in features and AWS analytics services to provide a quick and cost-effective solution to address your mail tracking requirements. The following services will be implemented or configured:

The following diagram shows the architecture of the solution:

Serverless Architecture to Analyze Amazon SES events

Figure 1. Serverless Architecture to Analyze Amazon SES events

The flow of the events starts when a customer uses Amazon SES to send an email. Each of those send events will be capture by the configuration set feature and forward the events to a Kinesis Firehose delivery stream to buffer and store those events on an Amazon S3 bucket.

After storing the events, it will be required to create a database and table schema and store it on AWS Glue Data Catalog in order for Amazon Athena to be able to properly query those events on S3. Finally, we will use Amazon QuickSight to create interactive dashboard to search and visualize all your sending activity with an email level of detailed.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Walkthrough

Step 1: Use AWS CloudFormation to deploy some additional prerequisites

You can get started with our sample AWS CloudFormation template that includes some prerequisites. This template creates an Amazon S3 Bucket, an IAM role needed to access from Amazon SES to Amazon Kinesis Data Firehose.

To download the CloudFormation template, run one of the following commands, depending on your operating system:

In Windows:

curl https://raw.githubusercontent.com/aws-samples/amazon-ses-analytics-blog/main/SES-Blog-PreRequisites.yml -o SES-Blog-PreRequisites.yml

In MacOS

wget https://raw.githubusercontent.com/aws-samples/amazon-ses-analytics-blog/main/SES-Blog-PreRequisites.yml

To deploy the template, use the following AWS CLI command:

aws cloudformation deploy --template-file ./SES-Blog-PreRequisites.yml --stack-name ses-dashboard-prerequisites --capabilities CAPABILITY_NAMED_IAM

After the template finishes creating resources, you see the IAM Service role and the Delivery Stream on the stack Outputs tab. You are going to use these resources in the following steps.

IAM Service role and Delivery Stream created by CloudFormation template

Figure 2. CloudFormation template outputs

Step 2: Creating a configuration set in SES and setting the default configuration set for a verified identity

SES can track the number of send, delivery, open, click, bounce, and complaint events for each email you send. You can use event publishing to send information about these events to other AWS service. In this case we are going to send the events to Kinesis Firehose. To do this, a configuration set is required.

To create a configuration set, complete the following steps:

  1. On the AWS Console, choose the Amazon Simple Email Service.
  2. Choose Configuration sets.
  3. Click on Create set.

    Create a configuration set in Amazon SES

    Figure 3. Amazon SES Create Configuration Set

  4. Set a Configuration set name.
  5. Leave the other configurations by default.

    Write a name for your configuration set

    Figure 4. Configuration Set Name

  6. Once the configuration set is created, select Event destinations

    Configuration set created successfully

    Figure 5. Configuration set created successfully

  7. Click on Add destination
  8. Select the event types you would like to analyze and then click on next.

    Sending Events to analyze

    Figure 6. Sending Events to analyze

  9. Select Amazon Kinesis Data Firehose as the destination, choose the delivery stream and the IAM role created previously, click on next and in the review page, click on Add destination.

    Destination for Amazon SES sending events

    Figure 7. Destination for Amazon SES sending events

  10. Once you have created the configuration set and added the event destination, you can define the Default configuration set for the verified identity (domain or email address). In the SES console, choose Verified identities.

    Amazon SES Verified Identity

    Figure 8 Amazon SES Verified Identity

  11. Choose the verified identity from which you want to collect events and select Configuration set. Click on Edit.

    Edit Configuration Set for Verified Identity

    Figure 9. Edit Configuration Set for Verified Identity

  12. Click on the checkbox Assign a default configuration set and choose the configuration set created previously.

    Assign default configuration set

    Figure 10. Assign default configuration set

  13. Once you have completed the previous steps, your events will be sent to Amazon S3. Due to the buffer’s configuration on the Kinesis Delivery Stream, the data will be loaded every 5 minutes or every 5 MiB to Amazon S3. You can check the structure created on the bucket and see json logs with SES events data.

    Amazon S3 bucket structure

    Figure 11. Amazon S3 bucket structure

Step 3: Using Amazon Athena to query the SES event logs

Amazon SES publishes email sending event records to Amazon Kinesis Data Firehose in JSON format. The top-level JSON object contains an eventType string, a mail object, and either a Bounce, Complaint, Delivery, Send, Reject, Open, Click, Rendering Failure, or DeliveryDelay object, depending on the type of event.

  1. In order to simplify the analysis of email sending events, create the sesmaster table by running the following script in Amazon Athena. Don’t forget to change the location in the following script with your own bucket containing the data of email sending events.
    CREATE EXTERNAL TABLE sesmaster (
    eventType string,
    complaint struct < arrivaldate: string,
    complainedrecipients: array < struct < emailaddress: string >>,
    complaintfeedbacktype: string,
    feedbackid: string,
    `timestamp`: string,
    useragent: string >,
    bounce struct < bouncedrecipients: array < struct < action: string,
    diagnosticcode: string,
    emailaddress: string,
    status: string >>,
    bouncesubtype: string,
    bouncetype: string,
    feedbackid: string,
    reportingmta: string,
    `timestamp`: string >,
    mail struct < timestamp: string,
    source: string,
    sourcearn: string,
    sendingaccountid: string,
    messageid: string,
    destination: string,
    headerstruncated: boolean,
    headers: array < struct < name: string,
    value: string >>,
    commonheaders: struct < `from`: array < string >,
    to: array < string >,
    messageid: string,
    subject: string >,
    tags: struct < ses_source_tls_version: string,
    ses_operation: string,
    ses_configurationset: string,
    ses_source_ip: string,
    ses_outgoing_ip: string,
    ses_from_domain: string,
    ses_caller_identity: string >>,
    send string,
    delivery struct < processingtimemillis: int,
    recipients: array < string >,
    reportingmta: string,
    smtpresponse: string,
    `timestamp`: string >,
    open struct < ipaddress: string,
    `timestamp`: string,
    userAgent: string >,
    reject struct < reason: string >,
    click struct < ipAddress: string,
    `timestamp`: string,
    userAgent: string,
    link: string >
    )
    ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES (
    "mapping.ses_caller_identity" = "ses:caller-identity",
    "mapping.ses_configurationset" = "ses:configuration-set",
    "mapping.ses_from_domain" = "ses:from-domain",
    "mapping.ses_operation" = "ses:opeation",
    "mapping.ses_outgoing_ip" = "ses:outgoing-ip",
    "mapping.ses_source_ip" = "ses:source-ip",
    "mapping.ses_source_tls_version" = "ses:source-tls-version"
    )
    LOCATION 's3://aws-s3-ses-analytics-<aws-account-number>/'
    

    The sesmaster table uses the org.openx.data.jsonserde.JsonSerDe SerDe library to deserialize the JSON data.

    We have leveraged the support for JSON arrays and maps and the support for nested data structures. Those features ease the process of preparation and visualization of data.

    In the sesmaster table, the following mappings were applied to avoid errors due to name of JSON fields containing colons.

    • “mapping.ses_configurationset”=”ses:configuration-set”
    • “mapping.ses_source_ip”=”ses:source-ip”
    • “mapping.ses_from_domain”=”ses:from-domain”
    • “mapping.ses_caller_identity”=”ses:caller-identity” “mapping.ses_outgoing_ip”=”ses:outgoing-ip”
  2. Once the sesmaster table is ready, it is a good strategy to create curated views of its data. The first view called vwSESMaster contains all the records of email sending events and all the fields which are unique on each event. Create the vwSESMaster view by running the following script in Amazon Athena.
    CREATE OR REPLACE VIEW vwSESMaster AS
    SELECT
    eventtype as eventtype
    , mail.messageId as mailmessageid
    , mail.timestamp as mailtimestamp
    , mail.source as mailsource
    , mail.sendingAccountId as mailsendingAccountId
    , mail.commonHeaders.subject as mailsubject
    , mail.tags.ses_configurationset as mailses_configurationset
    , mail.tags.ses_source_ip as mailses_source_ip
    , mail.tags.ses_from_domain as mailses_from_domain
    , mail.tags.ses_outgoing_ip as mailses_outgoing_ip
    , delivery.processingtimemillis as deliveryprocessingtimemillis
    , delivery.reportingmta as deliveryreportingmta
    , delivery.smtpresponse as deliverysmtpresponse
    , delivery.timestamp as deliverytimestamp
    , delivery.recipients[1] as deliveryrecipient
    , open.ipaddress as openipaddress
    , open.timestamp as opentimestamp
    , open.userAgent as openuseragent
    , bounce.bounceType as bouncebounceType
    , bounce.bouncesubtype as bouncebouncesubtype
    , bounce.feedbackid as bouncefeedbackid
    , bounce.timestamp as bouncetimestamp
    , bounce.reportingMTA as bouncereportingmta
    , click.ipAddress as clickipaddress
    , click.timestamp as clicktimestamp
    , click.userAgent as clickuseragent
    , click.link as clicklink
    , complaint.timestamp as complainttimestamp
    , complaint.userAgent as complaintuseragent
    , complaint.complaintFeedbackType as complaintcomplaintfeedbacktype
    , complaint.arrivalDate as complaintarrivaldate
    , reject.reason as rejectreason
    FROM
    sesmaster

    The sesmaster table contains some fields which are represented by nested arrays, so it is necessary to flatten them into multiples rows. Following you can see the event types and the fields which need to be flatten.

    • Event type SEND: field mail.commonHeaders
    • Event type BOUNCE: field bounce.bouncedrecipients
    • Event type COMPLAINT: field complaint.complainedrecipients

    To flatten those arrays into multiple rows, we used the CROSS JOIN in conjunction with the UNNEST operator using the following strategy for all the three events:

    • Create a temporal view with the mail.messageID and the field to be flattened.
    • Create another temporal view with the array flattened into multiple rows.
    • Create the final view joining the sesmaster table with the second temporal view by event type and mail.messageID.

    To create those views, follow the next steps.

  3. Run the following scripts in Amazon Athena to flat the mail.commonHeaders array in the SEND event type
    CREATE OR REPLACE VIEW vwSendMailTmpSendTo AS 
    SELECT
    mail.messageId as messageid
    , mail.commonHeaders.to as recipients
    FROM
    sesmaster
    WHERE 
    eventtype='Send'
    
    CREATE OR REPLACE VIEW vwsendmailrecipients AS 
    SELECT
    messageid
    , recipient
    FROM
    ("vwSendMailTmpSendTo"
    CROSS JOIN UNNEST(recipients) t (recipient))
    
    CREATE OR REPLACE VIEW vwSentMails AS
    SELECT 
    eventtype as eventtype
    , mail.messageId as mailmessageid
    , mail.timestamp as mailtimestamp
    , mail.source as mailsource
    , mail.sendingAccountId as mailsendingAccountId
    , mail.commonHeaders.subject as mailsubject
    , mail.tags.ses_configurationset as mailses_configurationset
    , mail.tags.ses_source_ip as mailses_source_ip
    , mail.tags.ses_from_domain as mailses_from_domain
    , mail.tags.ses_outgoing_ip as mailses_outgoing_ip
    , dest.recipient as mailto
    FROM
    sesmaster as sm
    ,vwsendmailrecipients as dest
    WHERE
    sm.eventtype = 'Send'
    and sm.mail.messageid = dest.messageid
  4. Run the following scripts in Amazon Athena to flat the bounce.bouncedrecipients array in the BOUNCE event type
    CREATE OR REPLACE VIEW vwbouncemailtmprecipients AS 
    SELECT
    mail.messageId as messageid
    , bounce.bouncedrecipients
    FROM
    sesmaster
    WHERE (eventtype = 'Bounce')
    
    CREATE OR REPLACE VIEW vwbouncemailrecipients AS 
    SELECT
    messageid
    , recipient.action
    , recipient.diagnosticcode
    , recipient.emailaddress
    FROM
    (vwbouncemailtmprecipients
    CROSS JOIN UNNEST(bouncedrecipients) t (recipient))
    
    CREATE OR REPLACE VIEW vwBouncedMails AS
    SELECT
    eventtype as eventtype
    , mail.messageId as mailmessageid
    , mail.timestamp as mailtimestamp
    , mail.source as mailsource
    , mail.sendingAccountId as mailsendingAccountId
    , mail.commonHeaders.subject as mailsubject
    , mail.tags.ses_configurationset as mailses_configurationset
    , mail.tags.ses_source_ip as mailses_source_ip
    , mail.tags.ses_from_domain as mailses_from_domain
    , mail.tags.ses_outgoing_ip as mailses_outgoing_ip
    , bounce.bounceType as bouncebounceType
    , bounce.bouncesubtype as bouncebouncesubtype
    , bounce.feedbackid as bouncefeedbackid
    , bounce.timestamp as bouncetimestamp
    , bounce.reportingMTA as bouncereportingmta
    , bd.action as bounceaction
    , bd.diagnosticcode as bouncediagnosticcode
    , bd.emailaddress as bounceemailaddress
    FROM
    sesmaster as sm
    ,vwbouncemailrecipients as bd
    WHERE
    sm.eventtype = 'Bounce'
    and sm.mail.messageid = bd.messageid
    
  5. Run the following scripts in Amazon Athena to flat the complaint.complainedrecipients array in the COMPLAINT event type
    CREATE OR REPLACE VIEW vwcomplainttmprecipients AS 
    SELECT
    mail.messageId as messageid
    , complaint.complainedrecipients
    FROM
    sesmaster
    WHERE (eventtype = 'Complaint')
    
    CREATE OR REPLACE VIEW vwcomplainedrecipients AS 
    SELECT
    messageid
    , recipient.emailaddress
    FROM
    (vwcomplainttmprecipients 
    CROSS JOIN UNNEST(complainedrecipients) t (recipient))
    

    At the end we have one table and four views which can be used in Amazon QuickSight to analyze email sending events:

    • Table sesmaster
    • View vwSESMaster
    • View vwSentMails
    • View vwBouncedMails
    • View vwComplainedemails

Step 4: Analyze and visualize data with Amazon QuickSight

 In this blog post, we use Amazon QuickSight to analyze and to visualize email sending events from the sesmaster and the four curated views created previously. Amazon QuickSight can directly access data through Athena. Its pay-per-session pricing enables you to put analytical insights into the hands of everyone in your organization.

Let’s set this up together. We first need to select our table and our views to create new data sources in Athena and then we use these data sources to populate the visualization. We are creating just an example of visualization. Feel free to create your own visualization based on your information needs.

Before we can use the data in Amazon QuickSight, we need to first grant access to the underlying S3 bucket. If you haven’t done so already for other analyses, see our documentation on how to do so.

  1. On the Amazon QuickSight home page, choose Datasets from the menu on the left side, then choose New dataset from the upper-right corner, set and pick Athena as data source. In the following dialog box, give the data source a descriptive name and choose Create data source.

    Create New Athena Data Source

    Figure 12. Create New Athena Data Source

  2. In the following dialog box, select the Catalog and the Database containing your sesmaster and curated views. Let’s select the sesmaster table in order to create some basic Key Performance Indicators. Select the table sesmaster and click on the Select

    Select Sesmaster Table

    Figure 13. Select Sesmaster Table

  3. Our sesmaster table now is a data source for Amazon QuickSight and we can turn to visualizing the data.

    QuickSight Visualize Data

    Figure 14. QuickSight Visualize Data

  4. You can see the list fields on the left. The canvas on the right is still empty. Before we populate it with data, let’s select Key Performance Indicator from the available visual types.

    QuickSight Visual Types

    Figure 15. QuickSight Visual Types

  5. To populate the graph, drag and drop the fields from the field list on the left onto their respective destinations. In our case, we put the field send onto the value well and use count as aggregation.

    Add Send field to visualization

    Figure 16. Add Send field to visualization

  6. Add another visual from the left-upper side and select Key Performance Indicator as visual type.
    Add a new visual

    Figure 17. Add a new visual

    Key Performance Indicator Visual Type

    Figure 18. Key Performance Indicator Visual Type

  7. Put the field Delivery onto the value well and use count as aggregation.

    Add Delivery Field to visualization

    Figure 19. Add Delivery Field to visualization

  8. Repeat the same procedure, (steps 1 to 4) to count the number of Open, Click, Bounce, Complaint and Reject Events. At the end, you should see something similar to the following visualization. After resizing and rearranging the visuals, you should get an analysis like the shown in the image below.

    Preview of Key Performance Indicators

    Figure 20. Preview of Key Performance Indicators

  9. Let´s add another dataset by clicking the pencil on the right of the current Dataset.

    Add a New Dataset

    Figure 21. Add a New Dataset

  10. On the following dialog box, select Add Dataset.

    Add a New Dataset

    Figure 22. Add a New Dataset

  11. Select the view called vwsesmaster and click Select.
    Add vwsesmaster dataset

    Figure 23. Add vwsesmaster dataset

    Now you can see all the available fields of the vwsesmaster view.

    New fields from vwsesmaster dataset

    Figure 24. New fields from vwsesmaster dataset

  12. Let’s create a new visual and select the Table visual type.

    QuickSight Visual Types

    Figure 25. QuickSight Visual Types

  13. Drag and drop the fields from the field list on the left onto their respective destinations. In our case, we put the fields eventtype, mailmessageid, and mailsubject onto the Group By well, but you can add as many fields as you need.

    Add eventtype, mailmessageid and mailsubject fields

    Figure 26. Add eventtype, mailmessageid and mailsubject fields

  14. Now let’s create a filter for this visual in order to filter by type of event. Be sure you select the table and then click on Filter on the left menu.

    Add a Filter

    Figure 27. Add a Filter

  15. Click on Create One and select the field eventtype on the popup window. Now select the eventtype filter to see the following options.

    Create eventtype filter

    Figure 28. Create eventtype filter

  16. Click on the dots on the right of the eventtype filter and select Add to Sheet.

    Add filter to sheet

    Figure 29. Add filter to sheet

  17. Leave all the default values, scroll down and select Apply

    Apply filters with default values

    Figure 30. Apply filters with default values

  18. Now you can filter the vwsesmaster view by eventtype.

    Filter vwsesmasterview by eventtype

    Figure 31. Filter vwsesmasterview by eventtype

  19. You can continue customizing your visualization with all the available data in the sesmaster table, the vwsesmaster view and even add more datasets to include data from the vwSentMails, vwBouncedMails, and vwComplainedemails views. Below, you can see some other visualizations created from those views.
    Final visualization 1

    Figure 32. Final visualization 1

    Final visualization 2

    Figure 33. Final visualization 2

    Final visualization 3

    Figure 34. Final visualization 3

Clean up

To avoid ongoing charges, clean up the resources you created as part of this post:

  1. Delete the visualizations created in Amazon Quicksight.
  2. Unsubscribe from Amazon QuickSight if you are not using it for other projects.
  3. Delete the views and tables created in Amazon Athena.
  4. Delete the Amazon SES configuration set.
  5. Delete the Amazon SES events stored in S3.
  6. Delete the CloudFormation stack in order to delete the Amazon Kinesis Delivery Stream.

Conclusion

In this blog we showed how you can use AWS native services and features to quickly create an email tracking solution based on Amazon SES events to have a more detailed view on your sending activities. This solution uses a full serverless architecture without having to manage the underlying infrastructure and giving you the flexibility to use the solution for small, medium or intense Amazon SES usage, without having to take care of any servers.

We showed you some samples of dashboards and analysis that can be built for most of customers requirements, but of course you can evolve this solution and customize it according to your needs, adding or removing charts, filters or events to the dashboard. Please refer to the following documentation for the available Amazon SES Events, their structure and also how to create analysis and dashboards on Amazon QuickSight:

From a performance and cost efficiency perspective there are still several configurations that can be done to improve the solution, for example using a columnar file formant like parquet, compressing with snappy or setting your S3 partition strategy according to your email sending usage. Another improvement could be importing data into SPICE to read data in Amazon Quicksight. Using SPICE results in the data being loaded from Athena only once, until it is either manually refreshed or automatically refreshed using a schedule.

You can use this walkthrough to configure your first SES dashboard and start visualizing events detail. You can adjust the services described in this blog according to your company requirements.

About the authors

Oscar Mendoza AWS Solutions Architect Oscar Mendoza is a Solutions Architect at AWS based in Bogotá, Colombia. Oscar works with our customers to provide guidance in architectural best practices and to build Well Architected solutions on the AWS platform. He enjoys spending time with his family and his dog and playing music.
Luis Eduardo Torres AWS Solutions Architect Luis Eduardo Torres is a Solutions Architect at AWS based in Bogotá, Colombia. He helps companies to build their business using the AWS cloud platform. He has a great interest in Analytics and has been leading the Analytics track of AWS Podcast in Spanish.
Santiago Benavidez AWS Solutions Architect Santiago Benavídez is a Solutions Architect at AWS based in Buenos Aires, Argentina, with more than 13 years of experience in IT, currently helping DNB/ISV customers to achieve their business goals using the breadth and depth of AWS services, designing highly available, resilient and cost-effective architectures.

Migrate from Snowflake to Amazon Redshift using AWS Glue Python shell

Post Syndicated from Raks Khare original https://aws.amazon.com/blogs/big-data/migrate-from-snowflake-to-amazon-redshift-using-aws-glue-python-shell/

As the most widely used cloud data warehouse, Amazon Redshift makes it simple and cost-effective to analyze your data using standard SQL and your existing ETL (extract, transform, and load), business intelligence (BI), and reporting tools. Tens of thousands of customers use Amazon Redshift to analyze exabytes of data per day and power analytics workloads such as BI, predictive analytics, and real-time streaming analytics without having to manage the data warehouse infrastructure. It natively integrates with other AWS services, facilitating the process of building enterprise-grade analytics applications in a manner that is not only cost-effective, but also avoids point solutions.

We are continuously innovating and releasing new features of Amazon Redshift, enabling the implementation of a wide range of data use cases and meeting requirements with performance and scale. For example, Amazon Redshift Serverless allows you to run and scale analytics workloads without having to provision and manage data warehouse clusters. Other features that help power analytics at scale with Amazon Redshift include automatic concurrency scaling for read and write queries, automatic workload management (WLM) for concurrency scaling, automatic table optimization, the new RA3 instances with managed storage to scale cloud data warehouses and reduce costs, cross-Region data sharing, data exchange, and the SUPER data type to store semi-structured data or documents as values. For the latest feature releases for Amazon Redshift, see Amazon Redshift What’s New. In addition to improving performance and scale, you can also gain up to three times better price performance with Amazon Redshift than other cloud data warehouses.

To take advantage of the performance, security, and scale of Amazon Redshift, customers are looking to migrate their data from their existing cloud warehouse in a way that is both cost optimized and performant. This post describes how to migrate a large volume of data from Snowflake to Amazon Redshift using AWS Glue Python shell in a manner that meets both these goals.

AWS Glue is serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue provides all the capabilities needed for data integration, allowing you to analyze your data in minutes instead of weeks or months. AWS Glue supports the ability to use a Python shell job to run Python scripts as a shell, enabling you to author ETL processes in a familiar language. In addition, AWS Glue allows you to manage ETL jobs using AWS Glue workflows, Amazon Managed Workflows for Apache Airflow (Amazon MWAA), and AWS Step Functions, automating and facilitating the orchestration of ETL steps.

Solution overview

The following architecture shows how an AWS Glue Python shell job migrates the data from Snowflake to Amazon Redshift in this solution.

Architecture

The solution is comprised of two stages:

  • Extract – The first part of the solution extracts data from Snowflake into an Amazon Simple Storage Service (Amazon S3) data lake
  • Load – The second part of the solution reads the data from the same S3 bucket and loads it into Amazon Redshift

For both stages, we connect the AWS Glue Python shell jobs to Snowflake and Amazon Redshift using database connectors for Python. The first AWS Glue Python shell job reads a SQL file from an S3 bucket to run the relevant COPY commands on the Snowflake database using Snowflake compute capacity and parallelism to migrate the data to Amazon S3. When this is complete, the second AWS Glue Python shell job reads another SQL file, and runs the corresponding COPY commands on the Amazon Redshift database using Redshift compute capacity and parallelism to load the data from the same S3 bucket.

Both jobs are orchestrated using AWS Glue workflows, as shown in the following screenshot. The workflow pushes data processing logic down to the respective data warehouses by running COPY commands on the databases themselves, minimizing the processing capacity required by AWS Glue to just the resources needed to run the Python scripts. The COPY commands load data in parallel both to and from Amazon S3, providing one of the fastest and most scalable mechanisms to transfer data from Snowflake to Amazon Redshift.

Because all heavy lifting around data processing is pushed down to the data warehouses, this solution is designed to provide a cost-optimized and highly performant mechanism to migrate a large volume of data from Snowflake to Amazon Redshift with ease.

Glue Workflow

The entire solution is packaged in an AWS CloudFormation template for simplicity of deployment and automatic provisioning of most of the required resources and permissions.

The high-level steps to implement the solution are as follows:

  1. Generate the Snowflake SQL file.
  2. Deploy the CloudFormation template to provision the required resources and permissions.
  3. Provide Snowflake access to newly created S3 bucket.
  4. Run the AWS Glue workflow to migrate the data.

Prerequisites

Before you get started, you can optionally build the latest version of the Snowflake Connector for Python package locally and generate the wheel (.whl) package. For instructions, refer to How to build.

If you don’t provide the latest version of the package, the CloudFormation template uses a pre-built .whl file that may not be on the most current version of Snowflake Connector for Python.

By default, the CloudFormation template migrates data from all tables in the TPCH_SF1 schema of the SNOWFLAKE_SAMPLE_DATA database, which is a sample dataset provided by Snowflake when an account is created. The following stored procedure is used to dynamically generate the Snowflake COPY commands required to migrate the dataset to Amazon S3. It accepts the database name, schema name, and stage name as the parameters.

CREATE OR REPLACE PROCEDURE generate_copy(db_name VARCHAR, schema_name VARCHAR, stage_name VARCHAR)
   returns varchar not null
   language javascript
   as
   $$
var return_value = "";
var sql_query = "select table_catalog, table_schema, lower(table_name) as table_name from " + DB_NAME + ".information_schema.tables where table_schema = '" + SCHEMA_NAME + "'" ;
   var sql_statement = snowflake.createStatement(
          {
          sqlText: sql_query
          }
       );
/* Creates result set */
var result_scan = sql_statement.execute();
while (result_scan.next())  {
       return_value += "\n";
       return_value += "COPY INTO @"
       return_value += STAGE_NAME
       return_value += "/"
       return_value += result_scan.getColumnValue(3);
       return_value += "/"
       return_value += "\n";
       return_value += "FROM ";
       return_value += result_scan.getColumnValue(1);
       return_value += "." + result_scan.getColumnValue(2);
       return_value += "." + result_scan.getColumnValue(3);
       return_value += "\n";
       return_value += "FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = '|' COMPRESSION = GZIP)";
       return_value += "\n";
       return_value += "OVERWRITE = TRUE;"
       return_value += "\n";
       }
return return_value;
$$
;

Deploy the required resources and permissions using AWS CloudFormation

You can use the provided CloudFormation template to deploy this solution. This template automatically provisions an Amazon Redshift cluster with your desired configuration in a private subnet, maintaining a high standard of security.

  1. Sign in to the AWS Management Console, preferably as admin user.
  2. Select your desired Region, preferably the same Region where your Snowflake instance is provisioned.
  3. Choose Launch Stack:
  4. Choose Next.
  5. For Stack name, enter a meaningful name for the stack, for example, blog-resources.

The Parameters section is divided into two subsections: Source Snowflake Infrastructure and Target Redshift Configuration.

  1. For Snowflake Unload SQL Script, it defaults to S3 location (URI) of a SQL file which migrates the sample data in the TPCH_SF1 schema of the SNOWFLAKE_SAMPLE_DATA database.
  2. For Data S3 Bucket, enter a prefix for the name of the S3 bucket that is automatically provisioned to stage the Snowflake data, for example, sf-migrated-data.
  3. For Snowflake Driver, if applicable, enter the S3 location (URI) of the .whl package built earlier as a prerequisite. By default, it uses a pre-built .whl file.
  4. For Snowflake Account Name, enter your Snowflake account name.

You can use the following query in Snowflake to return your Snowflake account name:

SELECT CURRENT_ACCOUNT();
  1. For Snowflake Username, enter your user name to connect to the Snowflake account.
  2. For Snowflake Password, enter the password for the preceding user.
  3. For Snowflake Warehouse Name, enter the warehouse name for running the SQL queries.

Make sure the aforementioned user has access to the warehouse.

  1. For Snowflake Database Name, enter the database name. The default is SNOWFLAKE_SAMPLE_DATA.
  2. For Snowflake Schema Name, enter schema name. The default is TPCH_SF1.

CFN Param Snowflake

  1. For VPC CIDR Block, enter the desired CIDR block of Redshift cluster. The default is 10.0.0.0/16.
  2. For Subnet 1 CIDR Block, enter the CIDR block of the first subnet. The default is 10.0.0.0/24.
  3. For Subnet 2 CIDR Block, enter the CIDR block of the first subnet. The default is 10.0.1.0/24.
  4. For Redshift Load SQL Script, it defaults to S3 location (URI) of a SQL file which migrates the sample data in S3 to Redshift.

The following database view in Redshift is used to dynamically generate Redshift COPY commands required to migrate the dataset from Amazon S3. It accepts the schema name as the filter criteria.

CREATE OR REPLACE VIEW v_generate_copy
AS
SELECT
    schemaname ,
    tablename  ,
    seq        ,
    ddl
FROM
    (
        SELECT
            table_id   ,
            schemaname ,
            tablename  ,
            seq        ,
            ddl
        FROM
            (
                --COPY TABLE
                SELECT
                    c.oid::bigint  as table_id   ,
                    n.nspname      AS schemaname ,
                    c.relname      AS tablename  ,
                    0              AS seq        ,
                    'COPY ' + n.nspname + '.' + c.relname + ' FROM ' AS ddl
                FROM
                    pg_namespace AS n
                INNER JOIN
                    pg_class AS c
                ON
                    n.oid = c.relnamespace
                WHERE
                    c.relkind = 'r'
                --COPY TABLE continued                
                UNION                
                SELECT
                    c.oid::bigint as table_id   ,
                    n.nspname     AS schemaname ,
                    c.relname     AS tablename  ,
                    2             AS seq        ,
                    '''${' + '2}' + c.relname + '/'' iam_role ''${' + '1}'' gzip delimiter ''|'' EMPTYASNULL REGION ''us-east-1''' AS ddl
                FROM
                    pg_namespace AS n
                INNER JOIN
                    pg_class AS c
                ON
                    n.oid = c.relnamespace
                WHERE
                    c.relkind = 'r'
                --END SEMICOLON                
                UNION                
                SELECT
                    c.oid::bigint as table_id  ,
                    n.nspname     AS schemaname,
                    c.relname     AS tablename ,
                    600000005     AS seq       ,
                    ';'           AS ddl
                FROM
                    pg_namespace AS n
                INNER JOIN
                    pg_class AS c
                ON
                    n.oid = c.relnamespace
                WHERE
                    c.relkind = 'r' 
             )
        ORDER BY
            table_id  ,
            schemaname,
            tablename ,
            seq 
    );

SELECT ddl
FROM v_generate_copy
WHERE schemaname = 'tpch_sf1';
  1. For Redshift Database Name, enter your desired database name, for example, dev.
  2. For Number of Redshift Nodes, enter the desired compute nodes, for example, 2.
  3. For Redshift Node Type, choose the desired node type, for example, ra3.4xlarge.
  4. For Redshift Password, enter your desired password with the following constraints: it must be 8–64 characters in length, and contain at least one uppercase letter, one lowercase letter, and one number.
  5. For Redshift Port, enter the Amazon Redshift port number to connect to. The default port is 5439.

CFN Param Redshift 1 CFN Param Redshift 2

  1. Choose Next.
  2. Review and choose Create stack.

It takes around 5 minutes for the template to finish creating all resources and permissions. Most of the resources have the prefix of the stack name you specified for easy identification of the resources later. For more details on the deployed resources, see the appendix at the end of this post.

Create an IAM role and external Amazon S3 stage for Snowflake access to the data S3 bucket

In order for Snowflake to access the TargetDataS3Bucket created earlier by CloudFormation template, you must create an AWS Identity and Access Management (IAM) role and external Amazon S3 stage for Snowflake access to the S3 bucket. For instructions, refer to Configuring Secure Access to Amazon S3.

When you create an external stage in Snowflake, use the value for TargetDataS3Bucket on the Outputs tab of your deployed CloudFormation stack for the Amazon S3 URL of your stage.

CF Output

Make sure to name the external stage unload_to_s3 if you’re migrating the sample data using the default scripts provided in the CloudFormation template.

Convert Snowflake tables to Amazon Redshift

You can simply run the following DDL statements to create TPCH_SF1 schema objects in Amazon Redshift. You can also use AWS Schema Conversion Tool (AWS SCT) to convert Snowflake custom objects to Amazon Redshift. For instructions on converting your schema, refer to Accelerate Snowflake to Amazon Redshift migration using AWS Schema Conversion Tool.

CREATE SCHEMA TPCH_SF1;
SET SEARCH_PATH to TPCH_SF1;
CREATE TABLE customer (
  c_custkey int8 not null ,
  c_name varchar(25) not null,
  c_address varchar(40) not null,
  c_nationkey int4 not null,
  c_phone char(15) not null,
  c_acctbal numeric(12,2) not null,
  c_mktsegment char(10) not null,
  c_comment varchar(117) not null,
  Primary Key(C_CUSTKEY)
) ;

CREATE TABLE lineitem (
  l_orderkey int8 not null ,
  l_partkey int8 not null,
  l_suppkey int4 not null,
  l_linenumber int4 not null,
  l_quantity numeric(12,2) not null,
  l_extendedprice numeric(12,2) not null,
  l_discount numeric(12,2) not null,
  l_tax numeric(12,2) not null,
  l_returnflag char(1) not null,
  l_linestatus char(1) not null,
  l_shipdate date not null ,
  l_commitdate date not null,
  l_receiptdate date not null,
  l_shipinstruct char(25) not null,
  l_shipmode char(10) not null,
  l_comment varchar(44) not null,
  Primary Key(L_ORDERKEY, L_LINENUMBER)
)  ;

CREATE TABLE nation (
  n_nationkey int4 not null,
  n_name char(25) not null ,
  n_regionkey int4 not null,
  n_comment varchar(152) not null,
  Primary Key(N_NATIONKEY)                                
) ;

CREATE TABLE orders (
  o_orderkey int8 not null,
  o_custkey int8 not null,
  o_orderstatus char(1) not null,
  o_totalprice numeric(12,2) not null,
  o_orderdate date not null,
  o_orderpriority char(15) not null,
  o_clerk char(15) not null,
  o_shippriority int4 not null,
  o_comment varchar(79) not null,
  Primary Key(O_ORDERKEY)
) ;

CREATE TABLE part (
  p_partkey int8 not null ,
  p_name varchar(55) not null,
  p_mfgr char(25) not null,
  p_brand char(10) not null,
  p_type varchar(25) not null,
  p_size int4 not null,
  p_container char(10) not null,
  p_retailprice numeric(12,2) not null,
  p_comment varchar(23) not null,
  PRIMARY KEY (P_PARTKEY)
) ;

CREATE TABLE partsupp (
  ps_partkey int8 not null,
  ps_suppkey int4 not null,
  ps_availqty int4 not null,
  ps_supplycost numeric(12,2) not null,
  ps_comment varchar(199) not null,
  Primary Key(PS_PARTKEY, PS_SUPPKEY)
) ;

CREATE TABLE region (
  r_regionkey int4 not null,
  r_name char(25) not null ,
  r_comment varchar(152) not null,
  Primary Key(R_REGIONKEY)                             
) ;

CREATE TABLE supplier (
  s_suppkey int4 not null,
  s_name char(25) not null,
  s_address varchar(40) not null,
  s_nationkey int4 not null,
  s_phone char(15) not null,
  s_acctbal numeric(12,2) not null,
  s_comment varchar(101) not null,
  Primary Key(S_SUPPKEY)
);

Run an AWS Glue workflow for data migration

When you’re ready to start the data migration, complete the following steps:

  1. On the AWS Glue console, choose Workflows in the navigation pane.
  2. Select the workflow to run (<stack name>snowflake-to-redshift-migration).
  3. On the Actions menu, choose Run.Glue Workflow Run
  4. To check the status of the workflow, choose the workflow and on the History tab, select the Run ID and choose View run details.
    Glue Workflow Status
  5. When the workflow is complete, navigate to the Amazon Redshift console and launch the Amazon Redshift query editor v2 to verify the successful migration of data.
  6. Run the following query in Amazon Redshift to get row counts of all tables migrated from Snowflake to Amazon Redshift. Make sure to adjust the table_schema value accordingly if you’re not migrating the sample data.
SELECT tab.table_schema,
       tab.table_name,
       nvl(tinf.tbl_rows,0) tbl_rows,
       nvl(tinf.size,0) size
FROM svv_tables tab
LEFT JOIN svv_table_info tinf 
          on tab.table_schema = tinf.schema 
          and tab.table_name = tinf.”table”
WHERE tab.table_type = 'BASE TABLE'
      and tab.table_schema in ('tpch_sf1')
ORDER BY tbl_rows;

Redshift Editor

  1. Run the following query in Snowflake to compare and validate the data:
USE DATABASE snowflake_sample_data;
SELECT  TABLE_CATALOG,
        TABLE_SCHEMA,
        TABLE_NAME,
        ROW_COUNT,
        BYTES AS SIZE,
        COMMENT
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = 'TPCH_SF1'
ORDER BY ROW_COUNT;

Snowflake Editor

Clean up

To avoid incurring future charges, delete the resources you created as part of the CloudFormation stack by navigating to the AWS CloudFormation console, selecting the stack blog-resources, and choosing Delete.

Conclusion

In this post, we discussed how to perform an efficient, fast, and cost-effective migration from Snowflake to Amazon Redshift. Migrations from one data warehouse environment to another can typically be very time-consuming and resource-intensive; this solution uses the power of cloud-based compute by pushing down the processing to the respective warehouses. Orchestrating this migration with the AWS Glue Python shell provides additional cost optimization.

With this solution, you can facilitate your migration from Snowflake to Amazon Redshift. If you’re interested in further exploring the potential of using Amazon Redshift, please reach out to your AWS Account Team for a proof of concept.

Appendix: Resources deployed by AWS CloudFormation

The CloudFormation stack deploys the following resources in your AWS account:

  • Networking resourcesAmazon Virtual Private Cloud (Amazon VPC), subnets, ACL, and security group.
  • Amazon S3 bucket – This is referenced as TargetDataS3Bucket on the Outputs tab of the CloudFormation stack. This bucket holds the data being migrated from Snowflake to Amazon Redshift.
  • AWS Secrets Manager secrets – Two secrets in AWS Secrets Manager store credentials for Snowflake and Amazon Redshift.
  • VPC endpoints – The two VPC endpoints are deployed to establish a private connection from VPC resources like AWS Glue to services that run outside of the VPC, such as Secrets Manager and Amazon S3.
  • IAM roles – IAM roles for AWS Glue, Lambda, and Amazon Redshift. If the CloudFormation template is to be deployed in a production environment, you need to adjust the IAM policies so they’re not as permissive as presented in this post (which were set for simplicity and demonstration). Particularly, AWS Glue and Amazon Redshift don’t require all the actions granted in the *FullAccess policies, which would be considered overly permissive.
  • Amazon Redshift cluster – An Amazon Redshift cluster is created in a private subnet, which isn’t publicly accessible.
  • AWS Glue connection – The connection for Amazon Redshift makes sure that the AWS Glue job runs within the same VPC as Amazon Redshift. This also ensures that AWS Glue can access the Amazon Redshift cluster in a private subnet.
  • AWS Glue jobs – Two AWS Glue Python shell jobs are created:
    • <stack name>-glue-snowflake-unload – The first job runs the SQL scripts in Snowflake to copy data from the source database to Amazon S3. The Python script is available in S3. The Snowflake job accepts two parameters:
      • SQLSCRIPT – The Amazon S3 location of the SQL script to run in Snowflake to migrate data to Amazon S3. This is referenced as the Snowflake Unload SQL Script parameter in the input section of the CloudFormation template.
      • SECRET – The Secrets Manager ARN that stores Snowflake connection details.
    • <stack name>-glue-redshift-load – The second job runs another SQL script in Amazon Redshift to copy data from Amazon S3 to the target Amazon Redshift database. The Python script link is available in S3. The Amazon Redshift job accepts three parameters:
      • SQLSCRIPT – The Amazon S3 location of the SQL script to run in Amazon Redshift to migrate data from Amazon S3. If you provide custom SQL script to migrate the Snowflake data to Amazon S3 (as mentioned in the prerequisites), the file location is referenced as LoadFileLocation on the Outputs tab of the CloudFormation stack.
      • SECRET – The Secrets Manager ARN that stores Amazon Redshift connection details.
      • PARAMS – This includes any additional parameters required for the SQL script, including the Amazon Redshift IAM role used in the COPY commands and the S3 bucket staging the Snowflake data. Multiple parameter values can be provided separated by a comma.
  • AWS Glue workflow – The orchestration of Snowflake and Amazon Redshift AWS Glue Python shell jobs is managed via an AWS Glue workflow. The workflow <stack name>snowflake-to-redshift-migration runs later for actual migration of data.

About the Authors

Raks KhareRaks Khare is an Analytics Specialist Solutions Architect at AWS based out of Pennsylvania. He helps customers architect data analytics solutions at scale on the AWS platform.

Julia BeckJulia Beck is an Analytics Specialist Solutions Architect at AWS. She supports customers in validating analytics solutions by architecting proof of concept workloads designed to meet their specific needs.

Orchestrating AWS Glue crawlers using AWS Step Functions

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/orchestrating-aws-glue-crawlers-using-aws-step-functions/

This blog post is written by Justin Callison, General Manager, AWS Workflow.

Organizations generate terabytes of data every day in a variety of semistructured formats. AWS Glue and Amazon Athena can give you a simpler and more cost-effective way to analyze this data with no infrastructure to manage. AWS Glue crawlers identify the schema of your data and manage the metadata required to analyze the data in place, without the need to transform this data and load into a data warehouse.

The timing of when your crawlers run and complete is important. You must ensure the crawler runs after your data has updated and before you query it with Athena or analyze with an AWS Glue job. If not, your analysis may experience errors or return incomplete results.

In this blog, you learn how to use AWS Step Functions, a low-code visual workflow service that integrates with over 220 AWS services. The service orchestrates your crawlers to control when they start, confirm completion, and combine them into end-to-end, serverless data processing workflows.

Using Step Functions to orchestrate multiple AWS Glue crawlers, provides a number of benefits when compared to implementing a solution directly with code. Firstly, the workflow provides an instant visual understanding of the application, and any errors that might occur during execution. Step Functions’ ability to run nested workflows inside a Map state helps to decouple and reuse application components with native array iteration. Finally, the Step Functions wait state lets the workflow periodically poll the status of the crawl job, without incurring additional cost for idol wait time.

Deploying the example

With this example, you create three datasets in Amazon S3, then use Step Functions to orchestrate AWS Glue crawlers to analyze the datasets and make them available to query using Athena.

You deploy the example with AWS CloudFormation using the following steps:

  1. Download the template.yaml file from here.
  2. Log in to the AWS Management Console and go to AWS CloudFormation.
  3. Navigate to Stacks -> Create stack and select With new resources (standard).
  4. Select Template is ready and Upload a template file, then Choose File and select the template.yaml file that you downloaded in Step 1 and choose Next.
  5. Enter a stack name, such as glue-stepfunctions-demo, and choose Next.
  6. Choose Next, check the acknowledgement boxes in the Capabilities and transforms section, then choose Create stack.
  7. After deployment, the status updates to CREATE_COMPLETE.

Create your datasets

Navigate to Step Functions in the AWS Management Console and select the create-dataset state machine from the list. This state machine uses Express Workflows and the Parallel state to build three datasets concurrently in S3. The first two datasets include information by user and location respectively and include files per day over the 5-year period from 2016 to 2020. The third dataset is a simpler, all-time summary of data by location.

To create the datasets, you choose Start execution from the toolbar for the create-dataset state machine, then choose Start execution again in the dialog box. This runs the state machine and creates the datasets in S3.

Navigate to the S3 console and view the glue-demo-databucket created for this example. In this bucket, in a folder named data, there are three subfolders, each containing a dataset.

The all-time-location-summaries folder contains a set of JSON files, one for each location.

The daily-user-summaries and daily-location-summaries contain a folder structure with nested folders for each year, month, and date. In addition to making this data easier to navigate via the console, this folder structure provides hints to AWS Glue that it can use to partition this dataset and make it more efficient to query.

Crawling

You now use AWS Glue crawlers to analyze these datasets and make them available to query. Navigate to the AWS Glue console, select Crawlers to see the list of Crawlers that you created when you deployed this example. Select the daily-user-summaries crawler to view details and note that they have tags assigned to indicate metadata such as the datatype of the data and whether the dataset is-partitioned.

Now, return to the Step Functions console and view the run-crawlers-with-tags state machine. This state machine uses AWS SDK service integrations to get a list of all crawlers matching the tag criteria you enter. It then uses the map state and the optimized service integration for Step Functions to execute the run-crawler state machine for each of the matching crawlers concurrently. The run-crawler state machine starts each crawler and monitors status until the crawler completes. Once each of the individual crawlers have completed, the run-crawlers-with-tags state machine also completes.

To initiate the crawlers:

  1. Choose Start execution from the top of the page when viewing the run-crawlers-with-tags state machine
  2. Provide the following as Input
    {"tags": {"datatype": "json"}}
  3. Choose Start execution.

After 2-3 minutes, the execution finishes with a Succeeded status once all three crawlers have completed. During this time, you can navigate to the run-crawler state machine to view the individual, nested executions per crawler or to the AWS Glue console to see the status of the crawlers.

Querying the data using Amazon Athena

Now, navigate to the Athena console where you can see the database and tables created by your crawlers. Note that AWS Glue recognized the partitioning scheme and included fields for year, month, and date in addition to user and usage fields for the data contained in the JSON files.

If you have not used Athena in this account before, you see a message instructing you to set a query result location. Choose View settings -> Manage -> Browse S3 and select the athena-results bucket that you created when you deployed the example. Choose Save then return to the Editor to continue.

You can now run queries such as the following, to calculate the total usage for all users over 5 years.

SELECT SUM(usage) all_time_usage FROM “daily_user_summaries”

You can also add filters, as shown in the following example, which limit results to those from 2016.

SELECT SUM(usage) all_time_usage FROM “daily_user_summaries” WHERE year = ‘2016’

Note this second query scanned only 17% as much data (133 KB vs 797 KB) and completed faster. This is because Athena used the partitioning information to avoid querying the full dataset. While the differences in this example are small, for real-world datasets with terabytes of data, your cost and latency savings from partitioning data can be substantial.

The disadvantage of a partitioning scheme is that new folders are not included in query results until you add new partitions. Re-running your crawler identifies and adds the new partitions and using Step Functions to orchestrate these crawlers makes that task simpler.

Extending the example

You can use these example state machines as they are in your AWS accounts to manage your existing crawlers. You can use Amazon S3 event notifications with Amazon EventBridge to trigger crawlers based on data changes. With the Optimized service integration for Amazon Athena, you can extend your workflows to execute queries against these crawled datasets. And you can use these examples to integrate crawler execution into your end-to-end data processing workflows, creating reliable, auditable workflows from ingestion through to analysis.

Conclusion

In this blog post, you learn how to use Step Functions to orchestrate AWS Glue crawlers. You deploy an example that generates three datasets, then uses Step Functions to start and coordinate crawler runs that analyze this data and make it available to query using Athena.

To learn more about Step Functions, visit Serverless Land.

Accelerate Amazon DynamoDB data access in AWS Glue jobs using the new AWS Glue DynamoDB Export connector

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/accelerate-amazon-dynamodb-data-access-in-aws-glue-jobs-using-the-new-aws-glue-dynamodb-elt-connector/

Modern data architectures encourage the integration of data lakes, data warehouses, and purpose-built data stores, enabling unified governance and easy data movement. With a modern data architecture on AWS, you can store data in a data lake and use a ring of purpose-built data services around the lake, allowing you to make decisions with speed and agility.

To achieve a modern data architecture, AWS Glue is the key service that integrates data over a data lake, data warehouse, and purpose-built data stores. AWS Glue simplifies data movement like inside-out, outside-in, or around the perimeter. A powerful purpose-built data store is Amazon DynamoDB, which is widely used by hundreds of thousands of companies, including Amazon.com. It’s common to move data from DynamoDB to a data lake built on top of Amazon Simple Storage Service (Amazon S3). Many customers move data from DynamoDB to Amazon S3 using AWS Glue extract, transform, and load (ETL) jobs.

Today, we’re pleased to announce the general availability of a new AWS Glue DynamoDB export connector. It’s built on top of the DynamoDB table export feature. It’s a scalable and cost-efficient way to read large DynamoDB table data in AWS Glue ETL jobs. This post describes the benefit of this new export connector and its use cases.

The following are typical use cases to read from DynamoDB tables using AWS Glue ETL jobs:

  • Move the data from DynamoDB tables to different data stores
  • Integrate the data with other services and applications
  • Retain historical snapshots for auditing
  • Build an S3 data lake from the DynamoDB data and analyze the data from various services, such as Amazon Athena, Amazon Redshift, and Amazon SageMaker

The new AWS Glue DynamoDB export connector

The old version of the AWS Glue DynamoDB connector reads DynamoDB tables through the DynamoDB Scan API. Instead, the new AWS Glue DynamoDB export connector reads DynamoDB data from the snapshot, which is exported from DynamoDB tables. This approach has following benefits:

  • It doesn’t consume read capacity units of the source DynamoDB tables
  • The read performance is consistent for large DynamoDB tables

Especially for large DynamoDB tables more than 100 GB, this new connector is significantly faster than the traditional connector.

To use this new export connector, you need to enable point-in-time recovery (PITR) for the source DynamoDB table in advance.

How to use the new connector on AWS Glue Studio Visual Editor

AWS Glue Studio Visual Editor is a graphical interface that makes it easy to create, run, and monitor AWS Glue ETL jobs in AWS Glue. The new DynamoDB export connector is available on AWS Glue Studio Visual Editor. You can choose Amazon DynamoDB as the source.

After you choose Create, you see the visual Directed Acyclic Graph (DAG). Here, you can choose your DynamoDB table that exists in this account or Region. This allows you to select DynamoDB tables (with PITR enabled) directly as a source in AWS Glue Studio. This provides a one-click export from any of your DynamoDB tables to Amazon S3. You can also easily add any data sources and targets or transformations to the DAG. For example, it allows you to join two different DynamoDB tables and export the result to Amazon S3, as shown in the following screenshot.

The following two connection options are automatically added. This location is used to store temporary data during the DynamoDB export phase. You can set S3 bucket lifecycle policies to expire temporary data.

  • dynamodb.s3.bucket – The S3 bucket to store temporary data during DynamoDB export
  • dynamodb.s3.prefix – The S3 prefix to store temporary data during DynamoDB export

How to use the new connector on the job script code

You can use the new export connector when you create an AWS Glue DynamicFrame in the job script code by configuring the following connection options:

  • dynamodb.export – (Required) You need to set this to ddb or s3
  • dynamodb.tableArn – (Required) Your source DynamoDB table ARN
  • dynamodb.unnestDDBJson – (Optional) If set to true, performs an unnest transformation of the DynamoDB JSON structure that is present in exports. The default value is false.
  • dynamodb.s3.bucket – (Optional) The S3 bucket to store temporary data during DynamoDB export
  • dynamodb.s3.prefix – (Optional) The S3 prefix to store temporary data during DynamoDB export

The following is the sample Python code to create a DynamicFrame using the new export connector:

dyf = glue_context.create_dynamic_frame.from_options(
    connection_type="dynamodb",
    connection_options={
        "dynamodb.export": "ddb",
        "dynamodb.tableArn": "test_source",
        "dynamodb.unnestDDBJson": True,
        "dynamodb.s3.bucket": "bucket name",
        "dynamodb.s3.prefix": "bucket prefix"
    }
)

The new export connector doesn’t require configurations related to AWS Glue job parallelism, unlike the old connector. Now you no longer need to change the configuration when you scale out the AWS Glue job. It also doesn’t require any configuration regarding DynamoDB table read/write capacity and its capacity mode (on demand or provisioned).

DynamoDB table schema handling

By default, the new export connector reads data in DynamoDB JSON structure that is present in exports. The following is an example schema of the frame using the Amazon Customer Review Dataset:

root
|-- Item: struct (nullable = true)
| |-- product_id: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- review_id: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- total_votes: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- product_title: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- star_rating: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- customer_id: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- marketplace: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- helpful_votes: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- review_headline: struct (nullable = true)
| | |-- S: string (nullable = true)
| | |-- NULL: boolean (nullable = true)
| |-- review_date: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- vine: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- review_body: struct (nullable = true)
| | |-- S: string (nullable = true)
| | |-- NULL: boolean (nullable = true)
| |-- verified_purchase: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- product_category: struct (nullable = true)
| | |-- S: string (nullable = true)
| |-- year: struct (nullable = true)
| | |-- N: string (nullable = true)
| |-- product_parent: struct (nullable = true)
| | |-- S: string (nullable = true)

To read DynamoDB item columns without handling nested data, you can set dynamodb.unnestDDBJson to True. The following is an example of the schema of the same data where dynamodb.unnestDDBJson is set to True:

root
|-- product_id: string (nullable = true)
|-- review_id: string (nullable = true)
|-- total_votes: string (nullable = true)
|-- product_title: string (nullable = true)
|-- star_rating: string (nullable = true)
|-- customer_id: string (nullable = true)
|-- marketplace: string (nullable = true)
|-- helpful_votes: string (nullable = true)
|-- review_headline: string (nullable = true)
|-- review_date: string (nullable = true)
|-- vine: string (nullable = true)
|-- review_body: string (nullable = true)
|-- verified_purchase: string (nullable = true)
|-- product_category: string (nullable = true)
|-- year: string (nullable = true)
|-- product_parent: string (nullable = true)

Data freshness

Data freshness is the measure of staleness of the data from the live tables in the original source. In the new export connecor, the option dynamodb.export impacts data freshness.

When dynamodb.export is set to ddb, the AWS Glue job invokes a new export and then reads the export placed in an S3 bucket into DynamicFrame. It reads exports of the live table, so data can be fresh. On the other hand, when dynamodb.export is set to s3, the AWS Glue job skips invoking a new export and directly reads an export already placed in an S3 bucket. It reads exports of the past table, so data can be stale, but you can reduce overhead to trigger the exports.

The following table explains the data freshness and pros and cons of each option.

.. dynamodb.export Config Data Freshness Data Source Pros Cons
New export connector s3 Stale Export of the past table
  • RCU is not consumed
  • Can skip triggering exports
  • Data can be stale
New export connector ddb Fresh Export of the live table
  • Data can be fresh
  • RCU is not consumed
  • Overhead to trigger exports and wait for completion
Old connector N/A Most fresh Scan of the live tables
  • Data can be fresh
  • Read capacity unit (RCU) is consumed

Performance

The following benchmark shows the performance improvements between the old version of the AWS Glue DynamoDB connector and the new export connector. The comparison uses the DynamoDB tables storing the TPC-DS benchmark dataset with different scales from 10 MB to 2 TB. The sample Spark job reads from the DynamoDB table and calculates the count of the items. All the Spark jobs are run on AWS Glue 3.0, G.2X, 60 workers.

The following chart compares AWS Glue job duration between the old connector and the new export connector. For small DynamoDB tables, the old connector is faster. For large tables more than 80 GB, the new export connector is faster. In other words, the DynamoDB export connector is recommended for jobs that take the old connector more than 5–10 minutes to run. Also, the chart shows that the duration of the new export connector increases slowly as data size increases, although the duration of the old connector increases rapidly as data size increases. This means that the new export connector is suitable especially for larger tables.

The following chart compares dollar cost between the old connector and the new export connector. It contains the AWS Glue DPU hour cost summed with the cost for reading data from DynamoDB. For the old connector, we include the read request cost. For the new export connector, we include the cost in the DynamoDB data export to Amazon S3. Both are calculated in DynamoDB on-demand capacity mode.

With AWS Glue Auto Scaling

AWS Glue Auto Scaling is a new feature to automatically resize computing resources for better performance at lower cost. You can take advantage of AWS Glue Auto Scaling with the new DynamoDB export connector.

As the following chart shows, with AWS Glue Auto Scaling, the duration of the new export connector is shorter than the old connector when the size of the source DynamoDB table is 100 GB or more. It shows a similar trend without AWS Glue Auto Scaling.

You get the cost benefits as only Spark driver is active for most of the time duration during the DynamoDB export (which is nearly 30% of the total job duration time with the old scan-based connector).

Conclusion

AWS Glue is a key service to integrate with multiple data stores. At AWS, we keep improving the performance and cost-efficiency of our services. In this post, we announced the availability of the new AWS Glue DynamoDB export connector. With this new connector, you can easily integrate your large data on DynamoDB tables with different data stores. It helps you read the large tables faster from AWS Glue jobs at lower cost.

The new AWS Glue DynamoDB export connector is now generally available in all supported Glue Regions. Let’s start using the new AWS Glue DynamoDB export connector today! We are looking forward to your feedback and stories on how you utilize the connector for your needs.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts that help customers build data lakes on the cloud.

Neil Gupta is a Software Development Engineer on the AWS Glue team. He enjoys tackling big data problems and learning more about distributed systems.

Andrew Kim is a Software Development Engineer on the AWS Glue team. His passion is to build scalable and effective solutions to challenging problems and working with distributed systems.

Savio Dsouza is a Software Development Manager on the AWS Glue team. His team works on distributed systems for efficiently managing data lakes on AWS and optimizing Apache Spark for performance and reliability.

Use the AWS Glue connector to read and write Apache Iceberg tables with ACID transactions and perform time travel

Post Syndicated from Tomohiro Tanaka original https://aws.amazon.com/blogs/big-data/use-the-aws-glue-connector-to-read-and-write-apache-iceberg-tables-with-acid-transactions-and-perform-time-travel/

Nowadays, many customers have built their data lakes as the core of their data analytic systems. In a typical use case of data lakes, many concurrent queries run to retrieve consistent snapshots of business insights by aggregating query results. A large volume of data constantly comes from different data sources into the data lakes. There is also a common demand to reflect the changes occurring in the data sources into the data lakes. This means that not only inserts but also updates and deletes need to be replicated into the data lakes.

Apache Iceberg provides the capability of ACID transactions on your data lakes, which allows concurrent queries to add or delete records isolated from any existing queries with read-consistency for queries. Iceberg is an open table format designed for large analytic workloads on huge datasets. You can perform ACID transactions against your data lakes by using simple SQL expressions. It also enables time travel, rollback, hidden partitioning, and schema evolution changes, such as adding, dropping, renaming, updating, and reordering columns.

AWS Glue is one of the key elements to building data lakes. It extracts data from multiple sources and ingests your data to your data lake built on Amazon Simple Storage Service (Amazon S3) using both batch and streaming jobs. To expand the accessibility of your AWS Glue extract, transform, and load (ETL) jobs to Iceberg, AWS Glue provides an Apache Iceberg connector. The connector allows you to build Iceberg tables on your data lakes and run Iceberg operations such as ACID transactions, time travel, rollbacks, and so on from your AWS Glue ETL jobs.

In this post, we give an overview of how to set up the Iceberg connector for AWS Glue and configure the relevant resources to use Iceberg with AWS Glue jobs. We also demonstrate how to run typical Iceberg operations on AWS Glue interactive sessions with an example use case.

Apache Iceberg connector for AWS Glue

With the Apache Iceberg connector for AWS Glue, you can take advantage of the following Iceberg capabilities:

  • Basic operations on Iceberg tables – This includes creating Iceberg tables in the AWS Glue Data Catalog and inserting, updating, and deleting records with ACID transactions in the Iceberg tables
  • Inserting and updating records – You can run UPSERT (update and insert) queries for your Iceberg table
  • Time travel on Iceberg tables – You can read a specific version of an Iceberg table from table snapshots that Iceberg manages
  • Rollback of table versions – You can revert an Iceberg table back to a specific version of the table

Iceberg offers additional useful capabilities such as hidden partitioning; schema evolution with add, drop, update, and rename support; automatic data compaction; and more. For more details about Iceberg, refer to the Apache Iceberg documentation.

Next, we demonstrate how the Apache Iceberg connector for AWS Glue works for each Iceberg capability based on an example use case.

Overview of example customer scenario

Let’s assume that an ecommerce company sells products on their online platform. Customers can buy products and write reviews to each product. Customers can add, update, or delete their reviews at any time. The customer reviews are an important source for analyzing customer sentiment and business trends.

In this scenario, we have the following teams in our organization:

  • Data engineering team – Responsible for building and managing data platforms.
  • Data analyst team – Responsible for analyzing customer reviews and creating business reports. This team queries the reviews daily, creates a business intelligence (BI) report, and shares it with sales team.
  • Customer support team – Responsible for replying to customer inquiries. This team queries the reviews when they get inquiries about the reviews.

Our solution has the following requirements:

  • Query scalability is important because the website is huge.
  • Individual customer reviews can be added, updated, and deleted.
  • The data analyst team needs to use both notebooks and ad hoc queries for their analysis.
  • The customer support team sometimes needs to view the history of the customer reviews.
  • Customer reviews can always be added, updated, and deleted, even while one of the teams is querying the reviews for analysis. This means that any result in a query isn’t affected by uncommitted customer review write operations.
  • Any changes in customer reviews that are made by the organization’s various teams need to be reflected in BI reports and query results.

In this post, we build a data lake of customer review data on top of Amazon S3. To meet these requirements, we introduce Apache Iceberg to enable adding, updating, and deleting records; ACID transactions; and time travel queries. We also use an AWS Glue Studio notebook to integrate and query the data at scale. First, we set up the connector so we can create an AWS Glue connection for Iceberg.

Set up the Apache Iceberg connector and create the Iceberg connection

We first set up Apache Iceberg connector for AWS Glue to use Apache Iceberg with AWS Glue jobs. Particularly, in this section, we set up the Apache Iceberg connector for AWS Glue and create an AWS Glue job with the connector. Complete the following steps:

  1. Navigate to the Apache Iceberg connector for AWS Glue page in AWS Marketplace.
  2. Choose Continue to Subscribe.

  1. Review the information under Terms and Conditions, and choose Accept Terms to continue.

  1. When the subscription is complete, choose Continue to Configuration.

  1. For Fulfillment option, choose Glue 3.0. (1.0 and 2.0 are also available options.)
  2. For Software version, choose the latest software version.

As of this writing, 0.12.0-2 is the latest version of the Apache Iceberg connector for AWS Glue.

  1. Choose Continue to Launch.

  1. Choose Usage instructions.
  2. Choose Activate the Glue connector from AWS Glue Studio.

You’re redirected to AWS Glue Studio.

  1. For Name, enter a name for your connection (for example, iceberg-connection).

  1. Choose Create connection and activate connector.

A message appears that the connection was successfully added, and the connection is now visible on the AWS Glue Studio console.

Configure resources and permissions

We use a provided AWS CloudFormation template to set up Iceberg configuration for AWS Glue. AWS CloudFormation creates the following resources:

  • An S3 bucket to store an Iceberg configuration file and actual data
  • An AWS Lambda function to generate an Iceberg configuration file based on parameters provided by a user for the CloudFormation template, and to clean up the resources created through this post
  • AWS Identity and Access Management (IAM) roles and policies with necessary permissions
  • An AWS Glue database in the Data Catalog to register Iceberg tables

To deploy the CloudFormation template, complete the following steps:

  1. Choose Launch Stack:

Launch Button

  1. For DynamoDBTableName, enter a name for an Amazon DynamoDB table that is created automatically when AWS Glue creates an Iceberg table.

This table is used for an AWS Glue job to obtain a commit lock to avoid concurrently modifying records in Iceberg tables. For more details about commit locking, refer to DynamoDB for Commit Locking. Note that you shouldn’t specify the name of an existing table.

  1. For IcebergDatabaseName, enter a name for the AWS Glue database that is created in the Data Catalog and used for registering Iceberg tables.
  2. Choose Next.

  1. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  2. Choose Create stack.

Start an AWS Glue Studio notebook to use Apache Iceberg

After you launch the CloudFormation stack, you create an AWS Glue Studio notebook to perform Iceberg operations. Complete the following steps:

  1. Download the Jupyter notebook file.
  2. On the AWS Glue console, choose Jobs in the navigation pane.
  3. Under Create job, select Jupyter Notebook.

  1. Select Upload and edit an existing notebook and upload iceberg-with-glue.ipynb.

  1. Choose Create.
  2. For Job name, enter a name.
  3. For IAM role, choose IcebergConnectorGlueJobRole, which was created via the CloudFormation template.
  4. Choose Start notebook job.

The process takes a few minutes to complete, after which you can see an AWS Glue Studio notebook view.

  1. Choose Save to save the notebook.

Set up the Iceberg configuration

To set up the Iceberg configuration, complete the following steps:

  1. Run the following cells with multiple options (magics). Note that you set your connection name for the %connections magic in the cell.

For more information, refer to Configuring AWS Glue Interactive Sessions for Jupyter and AWS Glue Studio notebooks.

A message Session <session-id> has been created appears when your AWS Glue Studio notebook is ready.

In the last cell in this section, you load your Iceberg configuration, which you specified when launching the CloudFormation stack. The Iceberg configuration includes a warehouse path for Iceberg actual data, a DynamoDB table name for commit locking, a database name for your Iceberg tables, and more.

To load the configuration, set the S3 bucket name that was created via the CloudFormation stack.

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Choose the stack you created.
  3. On the Outputs tab, copy the S3 bucket name.

  1. Set the S3 name as the S3_BUCKET parameter in your notebook.

  1. Run the cell and load the Iceberg configuration that you set.

Initialize the job with Iceberg configurations

We continue to run cells to initiate a SparkSession in this section.

  1. Set an Iceberg warehouse path and a DynamoDB table name for Iceberg commit locking from the user_config parameter.
  2. Initialize a SparkSession by setting the Iceberg configurations.
  3. With the SparkSession object, create SparkContext and GlueContext objects.

The following screenshot shows the relevant section in the notebook.

We provide the details of each parameter that you configure for the SparkSession in the appendix of this post.

For this post, we demonstrate setting the Spark configuration for Iceberg. You can also set the configuration as AWS Glue job parameters. For more information, refer to the Usage Information section in the Iceberg connector product page.

Use case walkthrough

To walk through our use case, we use two tables; acr_iceberg and acr_iceberg_report. The table acr_iceberg contains the customer review data. The table acr_iceberg_report contains BI analysis results based on the customer review data. All changes to acr_iceberg also impact acr_iceberg_report. The table acr_iceberg_report needs to be updated daily, right before sharing business reports with stakeholders.

To demonstrate this use case, we walk through the following typical steps:

  1. A data engineering team registers the acr_iceberg and acr_iceberg_report tables in the Glue Data Catalog.
  2. Customers (ecommerce users) add reviews to products in the Industrial_Supplies category. These reviews are added to the Iceberg table.
  3. A customer requests to update their reviews. We simulate updating the customer review in the acr_iceberg table.
  4. We reflect the customer’s request of the updated review in acr_iceberg into acr_iceberg_report.
  5. We revert the customer’s request of the updated review for the customer review table acr_iceberg, and reflect the reversion in acr_iceberg_report.

1. Create Iceberg tables of customer reviews and BI reports

In this step, the data engineering team creates the acr_iceberg Iceberg table for customer reviews data (based on the Amazon Customer Reviews Dataset), and the team creates the acr_iceberg_report Iceberg table for BI reports.

Create the acr_iceberg table for customer reviews

The following code initially extracts the Amazon customer reviews, which are stored in a public S3 bucket. Then it creates an Iceberg table of the customer reviews and loads these reviews into your specified S3 bucket (created via CloudFormation stack). Note that the script loads partial datasets to avoid taking a lot of time to load the data.

# Loading the dataset and creating an Iceberg table. This will take about 3-5 minutes.
spark.read \
    .option('basePath', INPUT_BASE_PATH) \
    .parquet(*INPUT_CATEGORIES) \
    .writeTo(f'{CATALOG}.{DATABASE}.{TABLE}') \
    .tableProperty('format-version', '2') \
    .create()

Regarding the tableProperty parameter, we specify format version 2 to make the table version compatible with Amazon Athena. For more information about Athena support for Iceberg tables, refer to Considerations and limitations. To learn more about the difference between Iceberg table versions 1 and 2, refer to Appendix E: Format version changes.

Let’s run the following cells. Running the second cell takes around 3–5 minutes.

After you run the cells, the acr_iceberg table is available in your specified database in the Glue Data Catalog.

You can also see the actual data and metadata of the Iceberg table in the S3 bucket that is created through the CloudFormation stack. Iceberg creates the table and writes actual data and relevant metadata that includes table schema, table version information, and so on. See the following objects in your S3 bucket:

$ aws s3 ls 's3://your-bucket/data/' --recursive
YYYY-MM-dd hh:mm:ss   83616660 data/iceberg_blog_default.db/acr_iceberg/data/00000-44-c2983230-c43a-4f4a-9b89-1f7c13e59645-00001.parquet
YYYY-MM-dd hh:mm:ss   83247771 
...
YYYY-MM-dd hh:mm:ss       5134 data/iceberg_blog_default.db/acr_iceberg/metadata/00000-bc5d3ea2-280f-4e28-a71f-4c2b749ed637.metadata.json
YYYY-MM-dd hh:mm:ss     116950 data/iceberg_blog_default.db/acr_iceberg/metadata/411308cd-1f4d-4535-9444-f6b56a56697f-m0.avro
YYYY-MM-dd hh:mm:ss       3821 data/iceberg_blog_default.db/acr_iceberg/metadata/snap-6122957686233868728-1-411308cd-1f4d-4535-9444-f6b56a56697f.avro

The job tries to create a DynamoDB table, which you specified in the CloudFormation stack (in the following screenshot, its name is myGlueLockTable), if it doesn’t exist already. As we discussed earlier, the DynamoDB table is used for commit locking for Iceberg tables.

Create the acr_iceberg_report Iceberg table for BI reports

The data engineer team also creates the acr_iceberg_report table for BI reports in the Glue Data Catalog. This table initially has the following records.

comment_count avg_star product_category
1240 4.20729367860598 Camera
95 4.80167540490342 Industrial_Supplies
663 3.80123467540571 PC

To create the table, run the following cell.

The two Iceberg tables have been created. Let’s check the acr_iceberg table records by running a query.

Determine the average star rating for each product category by querying the Iceberg table

You can see the Iceberg table records by using a SELECT statement. In this section, we query the acr_iceberg table to simulate seeing a current BI report data by running an ad hoc query.

Run the following cell in the notebook to get the aggregated number of customer comments and mean star rating for each product_category.

The cell output has the following results.

Another way to query Iceberg tables is using Amazon Athena (when you use the Athena with Iceberg tables, you need to set up the Iceberg environment) or Amazon EMR.

2. Add customer reviews in the Iceberg table

In this section, customers add comments for some products in the Industrial Supplies product category, and we add these comments to the acr_iceberg table. To demonstrate this scenario, we create a Spark DataFrame based on the following new customer reviews and then add them to the table with an INSERT statement.

marketplace customer_id review_id product_id product_
parent
product_
title
star_
rating
helpful_
votes
total_
votes
vine verified_
purchase
review_
headline
review_
body
review_
date
year product_
category
US 12345689 ISB35E4556F144 I00EDBY7X8 989172340 plastic containers 5 0 0 N Y Five Stars Great product! 2022-02-01 2022 Industrial_
Supplies
US 78901234 IS4392CD4C3C4 I00D7JFOPC 952000001 battery tester 3 0 0 N Y nice one, but
it broke
some days later
nope 2022-02-01 2022 Industrial_
Supplies
US 12345123 IS97B103F8B24C I002LHA74O 818426953 spray bottle 2 1 1 N N Two Stars the bottle isn’t
as big as pictured.
2022-02-01 2022 Industrial_
Supplies
US 23000093 ISAB4268D46F3X I00ARPLCGY 562945918 3d printer 5 3 3 N Y Super great very useful 2022-02-01 2022 Industrial_
Supplies
US 89874312 ISAB4268137V2Y I80ARDQCY 564669018 circuit board 4 0 0 Y Y Great, but
a little bit expensive
you should buy this,
but note the price
2022-02-01 2022 Industrial_
Supplies

Run the following cells in the notebook to insert the customer comments to the Iceberg table. The process takes about 1 minute.

Run the next cell to see an addition to the product category Industrial_Supplies with 5 under comment_count.

3. Update a customer review in the Iceberg table

In the previous section, we added new customer reviews to the acr_iceberg Iceberg table. In this section, a customer requests an update of their review. Specifically, customer 78901234 requests the following update of the review ID IS4392CD4C3C4.

  • change star_rating from 3 to 5
  • update the review_headline from nice one, but it broke some days later to very good

We update the customer comment by using an UPDATE query by running the following cell.

We can review the updated record by running the next cell as follows.

Also, when you run this cell for the reporting table, you can see the updated avg_star column value for the Industrial_Supplies product category. Specifically, the avg_star value has been updated from 3.8 to 4.2 as a result of the star_rating changing from 3 to 5:

4. Reflect changes in the customer reviews table in the BI report table with a MERGE INTO query

In this section, we reflect the changes in the acr_iceberg table into the BI report table acr_iceberg_report. To do so, we run the MERGE INTO query and combine the two tables based on the condition of the product_category column in each table. This query works as follows:

  • When the product_category column in each table is the same, the query returns the sum of each column record
  • When the column in each table is not the same, the query just inserts a new record

This MERGE INTO operation is also referred to as an UPSERT (update and insert).

Run the following cell to reflect the update of customer reviews in the acr_iceberg table into the acr_iceberg_report BI table.

After the MERGE INTO query is complete, you can see the updated acr_iceberg_report table by running the following cell.

The MERGE INTO query performed the following changes:

  • In the Camera, Industrial_Supplies, and PC product categories, each comment_count is the sum between the initial value of the acr_iceberg_report table and the aggregated table value. For example, in the Industrial_Supplies product category row, the comment_count 100 is calculated by 95 (in the initial version of acr_iceberg_report) + 5 (in the aggregated report table).
  • In addition to comment_count, the avg_star in the Camera, Industrial_Supplies, or PC product category row is also computed by averaging between each avg_star value in acr_iceberg_report and in the aggregated table.
  • In other product categories, each comment_count and avg_star is the same as each value in the aggregated table, which means that each value in the aggregated table is inserted into the acr_iceberg_report table.

5. Roll back the Iceberg tables and reflect changes in the BI report table

In this section, the customer who requested the update of the review now requests to revert the updated review.

Iceberg stores versioning tables through the operations for Iceberg tables. We can see the information of each version of table by inspecting tables, and we can also time travel or roll back tables to an old table version.

To complete the customer request to revert the updated review, we need to revert the table version of acr_iceberg to the earlier version when we first added the reviews. Additionally, we need to update the acr_iceberg_report table to reflect the rollback of the acr_iceberg table version. Specifically, we need to perform the following three steps to complete these operations:

  1. Check the history of table changes of acr_iceberg and acr_iceberg_report to get each table snapshot.
  2. Roll back acr_iceberg to the version when first we inserted records, and also roll back the acr_iceberg_report table to the initial version to reflect the customer review update.
  3. Merge the acr_iceberg table with the acr_iceberg_report table again.

Get the metadata of each report table

As a first step, we check table versions by inspecting the table. Run the following cells.

Now you can see the following table versions in acr_iceberg and acr_iceberg_report:

  • acr_iceberg has three versions:
    • The oldest one is the initial version of this table, which shows the append operation
    • The second oldest one is the record insertion, which shows the append operation
    • The latest one is the update, which shows the overwrite operation
  • acr_iceberg_report has two versions:
    • The oldest one is the initial version of this table, which shows the append operation
    • The other one is from the MERGE INTO query in the previous section, which shows the overwrite operation

As shown in the following screenshot, we roll back to the acr_iceberg table version, inserting records based on the customer revert request. We also roll back to the acr_iceberg_report table version in the initial version to discard the MERGE INTO operation in the previous section.

Roll back the acr_iceberg and acr_iceberg_report tables

Based on your snapshot IDs, you can roll back each table version:

  • For acr_iceberg, use the second-oldest snapshot_id (in this example, 5440744662350048750) and replace <Type snapshot_id in ace_iceberg table> in the following cell with this snapshot_id.
  • For acr_iceberg_report table, use the initial snapshot_id (in this example, 7958428388396549892) and replace <Type snaphost_id in ace_iceberg_report table> in the following cell with this snapshot_id.

After you specify the snapshot_id for each rollback query, run the following cells.

When this step is complete, you can see the previous and current snapshot IDs of each table.

Each Iceberg table has been reverted to the specific version now.

Reflect changes in acr_iceberg into acr_iceberg_report again

We reflect the acr_iceberg table reversion into the current acr_iceberg_report table. To complete this, run the following cell.

After you rerun the MERGE INTO query, run the following cell to see the new table records. When we compare the table records, we observe that the avg_star value in Industrial_Supplies is lower than the value of the previous table avg_star.

You were able to reflect a customer’s request of reverting their updated review on the BI report table. Specifically, you can get the updated avg_star record in the Industrial_Supplies product category.

Clean up

To clean up all resources that you created, delete the CloudFormation stack.

Conclusion

In this post, we walked through using the Apache Iceberg connector with AWS Glue ETL jobs. We created an Iceberg table built on Amazon S3, and ran queries such as reading the Iceberg table data, inserting a record, merging two tables, and time travel.

The operations for the Iceberg table that we demonstrated in this post aren’t all of the operations Iceberg supports. Refer to the Apache Iceberg documentation for information about more operations.

Appendix: Spark configurations to use Apache Iceberg on AWS Glue

As we mentioned earlier, the notebook sets up a Spark configuration to integrate Iceberg with AWS Glue. The following table shows what each parameter defines.

Spark configuration key Value Description
spark.sql.catalog.{CATALOG} org.apache.iceberg.spark.SparkCatalog Specifies a Spark catalog interface that communicates with Iceberg tables.
spark.sql.catalog.{CATALOG}.warehouse {WAREHOUSE_PATH} A warehouse path for jobs to write iceberg metadata and actual data.
spark.sql.catalog.{CATALOG}.catalog-impl org.apache.iceberg.aws.
glue.GlueCatalog
The implementation of the Spark catalog class to communicate between Iceberg tables and the AWS Glue Data Catalog.
spark.sql.catalog.{CATALOG}.io-impl org.apache.iceberg.aws.s3.S3FileIO Used for Iceberg to communicate with Amazon S3.
spark.sql.catalog.{CATALOG}.lock-impl org.apache.iceberg.aws.glue.
DynamoLockManager
Used for Iceberg to manage table locks.
spark.sql.catalog.{CATALOG}.lock.table {DYNAMODB_TABLE} A DynamoDB table name to store table locks.
spark.sql.extensions org.apache.icerberg.spark.extensions.
IcebergSparkSessionExtensions
The implementation that enables Spark to run Iceberg-specific SQL commands.
spark.sql.session.timeZone UTC Sets the time zone of the Spark environment to UTC for further Iceberg time travel queries. The epoch time is in the UTC time zone.

About the Author

Tomohiro Tanaka is a Cloud Support Engineer at Amazon Web Services. He builds Glue connectors such as Apache Iceberg connector and TPC-DS connector. He’s passionate about helping customers build data lakes using ETL workloads. In his free time, he also enjoys coffee breaks with his colleagues and making coffee at home.

Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue

Post Syndicated from Kishore Dhamodaran original https://aws.amazon.com/blogs/big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/

Most businesses store their critical data in a data lake, where you can bring data from various sources to a centralized storage. The data is processed by specialized big data compute engines, such as Amazon Athena for interactive queries, Amazon EMR for Apache Spark applications, Amazon SageMaker for machine learning, and Amazon QuickSight for data visualization.

Apache Iceberg is an open-source table format for data stored in data lakes. It is optimized for data access patterns in Amazon Simple Storage Service (Amazon S3) cloud object storage. Iceberg helps data engineers tackle complex challenges in data lakes such as managing continuously evolving datasets while maintaining query performance. Iceberg allows you to do the following:

  • Maintain transactional consistency where files can be added, removed, or modified atomically with full read isolation and multiple concurrent writes
  • Implement full schema evolution to process safe table schema updates as the table data evolves
  • Organize tables into flexible partition layouts with partition evolution, enabling updates to partition schemes as queries and data volume changes without relying on physical directories
  • Perform row-level update and delete operations to satisfy new regulatory requirements such as the General Data Protection Regulation (GDPR)
  • Provide versioned tables and support time travel queries to query historical data and verify changes between updates
  • Roll back tables to prior versions to return tables to a known good state in case of any issues

In 2021, AWS teams contributed the Apache Iceberg integration with the AWS Glue Data Catalog to open source, which enables you to use open-source compute engines like Apache Spark with Iceberg on AWS Glue. In 2022, Amazon Athena announced support of Iceberg and Amazon EMR added support of Iceberg starting with version 6.5.0.

In this post, we show you how to use Amazon EMR Spark to create an Iceberg table, load sample books review data, and use Athena to query, perform schema evolution, row-level update and delete, and time travel, all coordinated through the AWS Glue Data Catalog.

Solution overview

We use the Amazon Customer Reviews public dataset as our source data. The dataset contains data files in Apache Parquet format on Amazon S3. We load all the book-related Amazon review data as an Iceberg table to demonstrate the advantages of using the Iceberg table format on top of raw Parquet files. The following diagram illustrates our solution architecture.

Architecture that shows the flow from Amazon EMR loading data into Amazon S3, and queried by Amazon Athena through AWS Glue Data Catalog.

To set up and test this solution, we complete the following high-level steps:

  1. Create an S3 bucket.
  2. Create an EMR cluster.
  3. Create an EMR notebook.
  4. Configure a Spark session.
  5. Load data into the Iceberg table.
  6. Query the data in Athena.
  7. Perform a row-level update in Athena.
  8. Perform a schema evolution in Athena.
  9. Perform time travel in Athena.
  10. Consume Iceberg data across Amazon EMR and Athena.

Prerequisites

To follow along with this walkthrough, you must have the following:

  • An AWS Account with a role that has sufficient access to provision the required resources.

Create an S3 bucket

To create an S3 bucket that holds your Iceberg data, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. For Bucket name, enter a name (for this post, we enter aws-lake-house-iceberg-blog-demo).

Because S3 bucket names are globally unique, choose a different name when you create your bucket.

  1. For AWS Region, choose your preferred Region (for this post, we use us-east-1).

Create a new Amazon S3 bucket. Choose us-east-1 as region

  1. Complete the remaining steps to create your bucket.
  2. If this is the first time that you’re using Athena to run queries, create another globally unique S3 bucket to hold your Athena query output.

Create an EMR cluster

Now we’re ready to start an EMR cluster to run Iceberg jobs using Spark.

  1. On the Amazon EMR console, choose Create cluster.
  2. Choose Advanced options.
  3. For Software Configuration, choose your Amazon EMR release version.

Iceberg requires release 6.5.0 and above.

  1. Select JupyterEnterpriseGateway and Spark as the software to install.
  2. For Edit software settings, select Enter configuration and enter [{"classification":"iceberg-defaults","properties":{"iceberg.enabled":true}}].
  3. Leave other settings at their default and choose Next.

Choose Amazon EMR release 6.6.0 and JupyterEnterpriseGateway and Spark. Enter configuration information.

  1. You can change the hardware used by the Amazon EMR cluster in this step. In this demo, we use the default setting.
  2. Choose Next.
  3. For Cluster name, enter Iceberg Spark Cluster.
  4. Leave the remaining settings unchanged and choose Next.

Provide Iceberg Spark Cluster as the Cluster name

  1. You can configure security settings such as adding an EC2 key pair to access your EMR cluster locally. In this demo, we use the default setting.
  2. Choose Create cluster.

You’re redirected to the cluster detail page, where you wait for the EMR cluster to transition from Starting to Waiting.

Create an EMR notebook

When the cluster is active and in the Waiting state, we’re ready to run Spark programs in the cluster. For this demo, we use an EMR notebook to run Spark commands.

  1. On the Amazon EMR console, choose Notebooks in the navigation pane.
  2. Choose Create notebook.
  3. For Notebook name, enter a name (for this post, we enter iceberg-spark-notebook).
  4. For Cluster, select Choose an existing cluster and choose Iceberg Spark Cluster.
  5. For AWS service role, choose Create a new role to create EMR_Notebook_DefaultRole or choose a different role to access resources in the notebook.
  6. Choose Create notebook.

Create an Amazon EMR notebook. Use EMR_Notebooks_DefaultRole

You’re redirected to the notebook detail page.

  1. Choose Open in JupyterLab next to your notebook.
  2. Choose to create a new notebook.
  3. Under Notebook, choose Spark.

Choose Spark from the options provided in the Launcher

Configure a Spark session

In your notebook, run the following code:

%%configure -f
{
  "conf": {
    "spark.sql.catalog.demo": "org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.demo.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
    "spark.sql.catalog.demo.warehouse": "s3://<your-iceberg-blog-demo-bucket>",
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
  }
}

This sets the following Spark session configurations:

  • spark.sql.catalog.demo – Registers a Spark catalog named demo, which uses the Iceberg Spark catalog plugin
  • spark.sql.catalog.demo.catalog-impl – The demo Spark catalog uses AWS Glue as the physical catalog to store Iceberg database and table information
  • spark.sql.catalog.demo.warehouse – The demo Spark catalog stores all Iceberg metadata and data files under the root path s3://<your-iceberg-blog-demo-bucket>
  • spark.sql.extensions – Adds support to Iceberg Spark SQL extensions, which allows you to run Iceberg Spark procedures and some Iceberg-only SQL commands (you use this in a later step)

Load data into the Iceberg table

In our Spark session, run the following commands to load data:

// create a database in AWS Glue named reviews if not exist
spark.sql("CREATE DATABASE IF NOT EXISTS demo.reviews")

// load reviews related to books
val book_reviews_location = "s3://amazon-reviews-pds/parquet/product_category=Books/*.parquet"
val book_reviews = spark.read.parquet(book_reviews_location)

// write book reviews data to an Iceberg v2 table
book_reviews.writeTo("demo.reviews.book_reviews").tableProperty("format-version", "2").createOrReplace()

Iceberg format v2 is needed to support row-level updates and deletes. See Format Versioning for more details.

It may take up to 15 minutes for the commands to complete. When it’s complete, you should be able to see the table on the AWS Glue console, under the reviews database, with the table_type property shown as ICEBERG.

Shows the table properties for book_reviews table

The table schema is inferred from the source Parquet data files. You can also create the table with a specific schema before loading data using Spark SQL, Athena SQL, or Iceberg Java and Python SDKs.

Query in Athena

Navigate to the Athena console and choose Query editor. If this is your first time using the Athena query editor, you need to configure to use the S3 bucket you created earlier to store the query results.

The table book_reviews is available for querying. Run the following query:

SELECT * FROM reviews.book_reviews LIMIT 5;

The following screenshot shows the first five records from the table being displayed.

Amazon Athena query the first 5 rows and show the results

Perform a row-level update in Athena

In the next few steps, let’s focus on a record in the table with review ID RZDVOUQG1GBG7. Currently, it has no total votes when we run the following query:

SELECT total_votes FROM reviews.book_reviews 
WHERE review_id = 'RZDVOUQG1GBG7'

Query total_votes for a particular review which shows a value of 0

Let’s update the total_votes value to 2 using the following query:

UPDATE reviews.book_reviews
SET total_votes = 2
WHERE review_id = 'RZDVOUQG1GBG7'

Update query to set the total_votes for the previous review_id to 2

After your update command runs successfully, run the below query and note the updated result showing a total of two votes:

SELECT total_votes FROM reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'

Athena enforces ACID transaction guarantee for all the write operations against an Iceberg table. This is done through the Iceberg format’s optimistic locking specification. When concurrent attempts are made to update the same record, a commit conflict occurs. In this scenario, Athena displays a transaction conflict error, as shown in the following screenshot.

Concurrent updates causes a failure. This shows the TRANSACTION_CONFLICT error during this scenario.

Delete queries work in a similar way; see DELETE for more details.

Perform a schema evolution in Athena

Suppose the review suddenly goes viral and gets 10 billion votes:

UPDATE reviews.book_reviews
SET total_votes = 10000000000
WHERE review_id = 'RZDVOUQG1GBG7'

Based on the AWS Glue table information, the total_votes is an integer column. If you try to update a value of 10 billion, which is greater than the maximum allowed integer value, you get an error reporting a type mismatch.

Updating to a very large value greater than maximum allowed integer value results in an error

Iceberg supports most schema evolution features as metadata-only operations, which don’t require a table rewrite. This includes add, drop, rename, reorder column, and promote column types. To solve this issue, you can change the integer column total_votes to a BIGINT type by running the following DDL:

ALTER TABLE reviews.book_reviews
CHANGE COLUMN total_votes total_votes BIGINT;

You can now update the value successfully:

UPDATE reviews.book_reviews
SET total_votes = 10000000000
WHERE review_id = 'RZDVOUQG1GBG7'

Querying the record now gives us the expected result in BIGINT:

SELECT total_votes FROM reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'

Perform time travel in Athena

In Iceberg, the transaction history is retained, and each transaction commit creates a new version. You can perform time travel to look at a historical version of a table. In Athena, you can use the following syntax to travel to a time that is after when the first version was committed:

SELECT total_votes FROM reviews.book_reviews
FOR SYSTEM_TIME AS OF localtimestamp + interval '-20' minute
WHERE review_id = 'RZDVOUQG1GBG7'

Query an earlier snapshot using time travel feature

Consume Iceberg data across Amazon EMR and Athena

One of the most important features of a data lake is for different systems to seamlessly work together through the Iceberg open-source protocol. After all the operations are performed in Athena, let’s go back to Amazon EMR and confirm that Amazon EMR Spark can consume the updated data.

First, run the same Spark SQL and see if you get the same result for the review used in the example:

val select_votes = """SELECT total_votes FROM demo.reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'"""

spark.sql(select_votes).show()

Spark shows 10 billion total votes for the review.

Shows the latest value of total_votes when querying using the Amazon EMR notebook

Check the transaction history of the operation in Athena through Spark Iceberg’s history system table:

val select_history = "SELECT * FROM demo.reviews.book_reviews.history"

spark.sql(select_history).show()

This shows three transactions corresponding to the two updates you ran in Athena.

Shows snapshots corresponding to the two updates you ran in Athena

Iceberg offers a variety of Spark procedures to optimize the table. For example, you can run an expire_snapshots procedure to remove old snapshots, and free up storage space in Amazon S3:

import java.util.Calendar
import java.text.SimpleDateFormat

val now = Calendar.getInstance().getTime()
val form = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val now_formatted = form.format(now.getTime())
val procedure = s"""CALL demo.system.expire_snapshots(
  table => 'reviews.book_reviews',
  older_than => TIMESTAMP '$now_formatted',
  retain_last => 1)"""

spark.sql(procedure)

Note that, after running this procedure, time travel can no longer be performed against expired snapshots.

Examine the history system table again and notice that it shows you only the most recent snapshot.

Running the following query in Athena results in an error “No table snapshot found before timestamp…” as older snapshots were deleted, and you are no longer able to time travel to the older snapshot:

SELECT total_votes FROM reviews.book_reviews
FOR SYSTEM_TIME AS OF localtimestamp + interval '-20' minute
WHERE review_id = 'RZDVOUQG1GBG7'

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. Run the following code in your notebook to drop the AWS Glue table and database:
// DROP the table 
spark.sql("DROP TABLE demo.reviews.book_reviews") 
// DROP the database 
spark.sql("DROP DATABASE demo.reviews")
  1. On the Amazon EMR console, choose Notebooks in the navigation pane.
  2. Select the notebook iceberg-spark-notebook and choose Delete.
  3. Choose Clusters in the navigation pane.
  4. Select the cluster Iceberg Spark Cluster and choose Terminate.
  5. Delete the S3 buckets and any other resources that you created as part of the prerequisites for this post.

Conclusion

In this post, we showed you an example of using Amazon S3, AWS Glue, Amazon EMR, and Athena to build an Iceberg data lake on AWS. An Iceberg table can seamlessly work across two popular compute engines, and you can take advantage of both to design your customized data production and consumption use cases.

With AWS Glue, Amazon EMR, and Athena, you can already use many features through AWS integrations, such as SageMaker Athena integration for machine learning, or QuickSight Athena integration for dashboard and reporting. AWS Glue also offers the Iceberg connector, which you can use to author and run Iceberg data pipelines.

In addition, Iceberg supports a variety of other open-source compute engines that you can choose from. For example, you can use Apache Flink on Amazon EMR for streaming and change data capture (CDC) use cases. The strong transaction guarantee and efficient row-level update, delete, time travel, and schema evolution experience offered by Iceberg offers a sound foundation and infinite possibilities for users to unlock the power of big data.


About the Authors

Kishore Dhamodaran is a Senior Solutions Architect at AWS. Kishore helps strategic customers with their cloud enterprise strategy and migration journey, leveraging his years of industry and cloud experience.

Jack Ye is a software engineer of the Athena Data Lake and Storage team. He is an Apache Iceberg Committer and PMC member.

Mohit Mehta is a Principal Architect at AWS with expertise in AI/ML and data analytics. He holds 12 AWS certifications and is passionate about helping customers implement cloud enterprise strategies for digital transformation. In his free time, he trains for marathons and plans hikes across major peaks around the world.

Giovanni Matteo Fumarola is the Engineering Manager of the Athena Data Lake and Storage team. He is an Apache Hadoop Committer and PMC member. He has been focusing in the big data analytics space since 2013.

Jared Keating is a Senior Cloud Consultant with AWS Professional Services. Jared assists customers with their cloud infrastructure, compliance, and automation requirements, drawing from his 20+ years of IT experience.

Implement a CDC-based UPSERT in a data lake using Apache Iceberg and AWS Glue

Post Syndicated from Sakti Mishra original https://aws.amazon.com/blogs/big-data/implement-a-cdc-based-upsert-in-a-data-lake-using-apache-iceberg-and-aws-glue/

As the implementation of data lakes and modern data architecture increases, customers’ expectations around its features also increase, which include ACID transaction, UPSERT, time travel, schema evolution, auto compaction, and many more. By default, Amazon Simple Storage Service (Amazon S3) objects are immutable, which means you can’t update records in your data lake because it supports append-only transactions. But there are use cases where you might be receiving incremental updates with change data capture (CDC) from your source systems, and you might need to update existing data in Amazon S3 to have a golden copy. Previously, you had to overwrite the complete S3 object or folders, but with the evolution of frameworks such as Apache Hudi, Apache Iceberg, Delta Lake, and governed tables in AWS Lake Formation, you can get database-like UPSERT features in Amazon S3.

Apache Hudi integration is already supported with AWS analytics services, and recently AWS Glue, Amazon EMR, and Amazon Athena announced support for Apache Iceberg. Apache Iceberg is an open table format originally developed at Netflix, which got open-sourced as an Apache project in 2018 and graduated from incubator mid-2020. It’s designed to support ACID transactions and UPSERT on petabyte-scale data lakes, and is getting popular because of its flexible SQL syntax for CDC-based MERGE, full schema evolution, and hidden partitioning features.

In this post, we walk you through a solution to implement CDC-based UPSERT or MERGE in an S3 data lake using Apache Iceberg and AWS Glue.

Configure Apache Iceberg with AWS Glue

You can integrate Apache Iceberg JARs into AWS Glue through its AWS Marketplace connector. The connector supports AWS Glue versions 1.0, 2.0, and 3.0, and is free to use. Configuring this connector is as easy as clicking few buttons on the user interface.

The following steps guide you through the setup process:

  1. Navigate to the AWS Marketplace connector page.
  2. Choose Continue to Subscribe and then Accept Terms.
  3. Choose Continue to Configuration.
  4. Choose the AWS Glue version and software version.
  5. Choose Continue to Launch.
  6. Choose Usage Instruction, which opens a page that has a link to activate the connector.
  7. Create a connection by providing a name and choosing Create connection and activate connector.

You can confirm your new connection on the AWS Glue Studio Connectors page.

To use this connector, when you create an AWS Glue job, make sure you add this connector to your job. Later in the implementation steps, when you create an AWS Glue job, we show how to use the connector you just configured.

Solution overview

Let’s assume you have a relational database that has product inventory data, and you want to move it into an S3 data lake on a continuous basis, so that your downstream applications or consumers can use it for analytics. After your initial data movement to Amazon S3, you’re supposed to receive incremental updates from the source database as CSV files using AWS DMS or equivalent tools, where each record has an additional column to represent an insert, update, or delete operation. While processing the incremental CDC data, one of the primary requirements you have is merging the CDC data in the data lake and providing the capability to query previous versions of the data.

To solve this use case, we present the following simple architecture that integrates Amazon S3 for the data lake, AWS Glue with the Apache Iceberg connector for ETL (extract, transform, and load), and Athena for querying the data using standard SQL. Athena helps in querying the latest product inventory data from the Iceberg table’s latest snapshot, and Iceberg’s time travel feature helps in identifying a product’s price at any previous date.

The following diagram illustrates the solution architecture.

The solution workflow consists of the following steps:

  • Data ingestion:
    • Steps 1.1 and 1.2 use AWS Database Migration Service (AWS DMS), which connects to the source database and moves incremental data (CDC) to Amazon S3 in CSV format.
    • Steps 1.3 and 1.4 consist of the AWS Glue PySpark job, which reads incremental data from the S3 input bucket, performs deduplication of the records, and then invokes Apache Iceberg’s MERGE statements to merge the data with the target UPSERT S3 bucket.
  • Data access:
    • Steps 2.1 and 2.2 represent Athena integration to query data from the Iceberg table using standard SQL and validate the time travel feature of Iceberg.
  • Data Catalog:
    • The AWS Glue Data Catalog is treated as a centralized catalog, which is used by AWS Glue and Athena. An AWS Glue crawler is integrated on top of S3 buckets to automatically detect the schema.

We have referenced AWS DMS as part of the architecture, but while showcasing the solution steps, we assume that the AWS DMS output is already available in Amazon S3, and focus on processing the data using AWS Glue and Apache Iceberg.

To demo the implementation steps, we use sample product inventory data that has the following attributes:

  • op – Represents the operation on the source record. This shows values I to represent insert operations, U to represent updates, and D to represent deletes. You need to make sure this attribute is included in your CDC incremental data before it gets written to Amazon S3. AWS DMS enables you to include this attribute, but if you’re using other mechanisms to move data, make sure you capture this attribute, so that your ETL logic can take appropriate action while merging it.
  • product_id – This is the primary key column in the source database’s products table.
  • category – This column represents the product’s category, such as Electronics or Cosmetics.
  • product_name – This is the name of the product.
  • quantity_available – This is the quantity available in the inventory for a product. When we showcase the incremental data for UPSERT or MERGE, we reduce the quantity available for the product to showcase the functionality.
  • last_update_time – This is the time when the product record was updated at the source database.

If you’re using AWS DMS to move data from your relational database to Amazon S3, then by default AWS DMS includes the op attribute for incremental CDC data, but it’s not included by default for the initial load. If you’re using CSV as your target file format, you can include IncludeOpForFullLoad as true in your S3 target endpoint setting of AWS DMS to have the op attribute included in your initial full load file. To learn more about the Amazon S3 settings in AWS DMS, refer to S3Settings.

To implement the solution, we create AWS resources such as an S3 bucket and an AWS Glue job, and integrate the Iceberg code for processing. Before we run the AWS Glue job, we have to upload the sample CSV files to the input bucket and process it with AWS Glue PySpark code for the output.

Prerequisites

Before getting started on the implementation, make sure you have the required permissions to perform the following in your AWS account:

  • Create AWS Identity and Access Management (IAM) roles as needed
  • Read or write to an S3 bucket
  • Create and run AWS Glue crawlers and jobs
  • Manage a database, table, and workgroups, and run queries in Athena

For this post, we use the us-east-1 Region, but you can integrate it in your preferred Region if the AWS services included in the architecture are available in that Region.

Now let’s dive into the implementation steps.

Create an S3 bucket for input and output

To create an S3 bucket, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. Specify the bucket name as glue-iceberg-demo, and leave the remaining fields as default.
    S3 bucket names are globally unique. While implementing the solution, you may get an error saying the bucket name already exists. Make sure to provide a unique name and use the same name while implementing the rest of the implementation steps. Formatting the bucket name as <Bucket-Name>-${AWS_ACCOUNT_ID}-${AWS_REGION_CODE} might help you get a unique name.
  4. Choose Create bucket.
  5. On the bucket details page, choose Create folder.
  6. Create two subfolders: raw-csv-input and iceberg-output.
  7. Upload the LOAD00000001.csv file into the raw-csv-input folder of the bucket.

The following screenshot provides a sample of the input dataset.

Create input and output tables using Athena

To create input and output Iceberg tables in the AWS Glue Data Catalog, open the Athena console and run the following queries in sequence:

-- Create database for the demo
CREATE DATABASE iceberg_demo;
-- Create external table in input CSV files. Replace the S3 path with your bucket name
CREATE EXTERNAL TABLE iceberg_demo.raw_csv_input(
  op string, 
  product_id bigint, 
  category string, 
  product_name string, 
  quantity_available bigint, 
  last_update_time string)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://glue-iceberg-demo/raw-csv-input/'
TBLPROPERTIES (
  'areColumnsQuoted'='false', 
  'classification'='csv', 
  'columnsOrdered'='true', 
  'compressionType'='none', 
  'delimiter'=',', 
  'typeOfData'='file');
-- Create output Iceberg table with partitioning. Replace the S3 bucket name with your bucket name
CREATE TABLE iceberg_demo.iceberg_output (
  product_id bigint,
  category string,
  product_name string,
  quantity_available bigint,
  last_update_time timestamp) 
PARTITIONED BY (category, bucket(16,product_id)) 
LOCATION 's3://glue-iceberg-demo/iceberg-output/' 
TBLPROPERTIES (
  'table_type'='ICEBERG',
  'format'='parquet',
  'write_target_data_file_size_bytes'='536870912' 
)
-- Validate the input data
SELECT * FROM iceberg_demo.raw_csv_input;

Alternatively, you can integrate an AWS Glue crawler on top of the input to create the table. Next, let’s create the AWS Glue PySpark job to process the input data.

Create the AWS Glue job

Complete the following steps to create an AWS Glue job:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Choose Create job.
  3. Select Spark script editor.
  4. For Options, select Create a new script with boilerplate code.
  5. Choose Create.
  6. Replace the script with the following script:
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    from pyspark.sql.functions import *
    from awsglue.dynamicframe import DynamicFrame
    
    from pyspark.sql.window import Window
    from pyspark.sql.functions import rank, max
    
    from pyspark.conf import SparkConf
    
    args = getResolvedOptions(sys.argv, ['JOB_NAME', 'iceberg_job_catalog_warehouse'])
    conf = SparkConf()
    
    ## Please make sure to pass runtime argument --iceberg_job_catalog_warehouse with value as the S3 path 
    conf.set("spark.sql.catalog.job_catalog.warehouse", args['iceberg_job_catalog_warehouse'])
    conf.set("spark.sql.catalog.job_catalog", "org.apache.iceberg.spark.SparkCatalog")
    conf.set("spark.sql.catalog.job_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    conf.set("spark.sql.catalog.job_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
    conf.set("spark.sql.iceberg.handle-timestamp-without-timezone","true")
    
    sc = SparkContext(conf=conf)
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args["JOB_NAME"], args)
    
    ## Read Input Table
    IncrementalInputDyF = glueContext.create_dynamic_frame.from_catalog(database = "iceberg_demo", table_name = "raw_csv_input", transformation_ctx = "IncrementalInputDyF")
    IncrementalInputDF = IncrementalInputDyF.toDF()
    
    if not IncrementalInputDF.rdd.isEmpty():
        ## Apply De-duplication logic on input data, to pickup latest record based on timestamp and operation 
        IDWindowDF = Window.partitionBy(IncrementalInputDF.product_id).orderBy(IncrementalInputDF.last_update_time).rangeBetween(-sys.maxsize, sys.maxsize)
                      
        # Add new columns to capture first and last OP value and what is the latest timestamp
        inputDFWithTS= IncrementalInputDF.withColumn("max_op_date",max(IncrementalInputDF.last_update_time).over(IDWindowDF))
        
        # Filter out new records that are inserted, then select latest record from existing records and merge both to get deduplicated output 
        NewInsertsDF = inputDFWithTS.filter("last_update_time=max_op_date").filter("op='I'")
        UpdateDeleteDf = inputDFWithTS.filter("last_update_time=max_op_date").filter("op IN ('U','D')")
        finalInputDF = NewInsertsDF.unionAll(UpdateDeleteDf)
    
        # Register the deduplicated input as temporary table to use in Iceberg Spark SQL statements
        finalInputDF.createOrReplaceTempView("incremental_input_data")
        finalInputDF.show()
        
        ## Perform merge operation on incremental input data with MERGE INTO. This section of the code uses Spark SQL to showcase the expressive SQL approach of Iceberg to perform a Merge operation
        IcebergMergeOutputDF = spark.sql("""
        MERGE INTO job_catalog.iceberg_demo.iceberg_output t
        USING (SELECT op, product_id, category, product_name, quantity_available, to_timestamp(last_update_time) as last_update_time FROM incremental_input_data) s
        ON t.product_id = s.product_id
        WHEN MATCHED AND s.op = 'D' THEN DELETE
        WHEN MATCHED THEN UPDATE SET t.quantity_available = s.quantity_available, t.last_update_time = s.last_update_time 
        WHEN NOT MATCHED THEN INSERT (product_id, category, product_name, quantity_available, last_update_time) VALUES (s.product_id, s.category, s.product_name, s.quantity_available, s.last_update_time)
        """)
    
        job.commit()

  7. On the Job details tab, specify the job name.
  8. For IAM Role, assign an IAM role that has the required permissions to run an AWS Glue job and read and write to the S3 bucket.
  9. For Glue version, choose Glue 3.0.
  10. For Language, choose Python 3.
  11. Make sure Job bookmark has default value of Enable.
  12. Under Connections, choose the Iceberg connector.
  13. Under Job parameters, specify Key as --iceberg_job_catalog_warehouse and Value as your S3 path (e.g. s3://<bucket-name>/<iceberg-warehouse-path>).
  14. Choose Save and then Run, which should write the input data to the Iceberg table with a MERGE statement.

Because the target table is empty in the first run, the Iceberg MERGE statement runs an INSERT statement for all records.

Query the Iceberg table using Athena

After you have successfully run the AWS Glue job, you can validate the output in Athena with the following SQL query:

SELECT * FROM iceberg_demo.iceberg_output limit 10;

The output of the query should match the input, with one difference: The Iceberg output table doesn’t have the op column.

Upload incremental (CDC) data for further processing

After we process the initial full load file, let’s upload the following two incremental files, which include insert, update, and delete records for a few products.

The following is a snapshot of first incremental file (20220302-1134010000.csv).

The following is a snapshot of the second incremental file (20220302-1135010000.csv), which shows that record 102 has another update transaction before the next ETL job processing.

After you upload both incremental files, you should see them in the S3 bucket.

Run the AWS Glue job again to process incremental files

Because we enabled bookmarks on the AWS Glue job, the next job picks up only the two new incremental files and performs a merge operation on the Iceberg table.

To run the job again, complete the following steps:

  • On the AWS Glue console, choose Jobs in the navigation pane.
  • Select the job and choose Run.

As explained earlier, the PySpark script is expected to deduplicate the input data before merging to the target Iceberg table, which means it only picks up the latest record of the 102 product.

For this post, we run the job manually, but you can configure your AWS Glue jobs to run as part of an AWS Glue workflow or via AWS Step Functions (for more information, see Manage AWS Glue Jobs with Step Functions).

Query the Iceberg table using Athena, after incremental data processing

After incremental data processing is complete, you can run the same SELECT statement again and validate that the quantity value is updated for record 102 and product record 103 is deleted.

The following screenshot shows the output.

Query the previous version of data with Iceberg’s time travel feature

You can run the following SQL query in Athena that uses the AS OF TIME statement of Iceberg to query the previous version of the data:

-SELECT * FROM iceberg_demo.iceberg_output FOR SYSTEM_TIME AS OF TIMESTAMP '2022-03-23 18:56:00'

The following screenshot shows the output. As you can see, the quantity value of product ID 102 is 30, which was available during the initial load.

Note that you have to change the AS OF TIMESTAMP value based on your runtime.

This concludes the implementation steps.

Considerations

The following are a few considerations you should keep in mind while integrating Apache Iceberg with AWS Glue:

  • Athena support for Iceberg became generally available recently, so make sure you review the considerations and limitations of using this feature.
  • AWS Glue provides DynamicFrame APIs to read from different source systems and write to different targets. For this post, we integrated Spark DataFrame instead of AWS Glue DynamicFrame because Iceberg’s MERGE statements aren’t supported with AWS Glue DynamicFrame APIs.
    To learn more about AWS integration, refer to Iceberg AWS Integrations.

Conclusion

This post explains how you can use the Apache Iceberg framework with AWS Glue to implement UPSERT on an S3 data lake. It provides an overview of Apache Iceberg, its features and integration approaches, and explains how you can implement it through a step-by-step guide.

I hope this gives you a great starting point for using Apache Iceberg with AWS analytics services and that you can build on top of it to implement your solution.

Appendix: AWS Glue DynamicFrame sample code to interact with Iceberg tables

  • The following code sample demonstrates how you can integrate the DynamicFrame method to read from an Iceberg table:
IcebergDyF = (
    glueContext.create_dynamic_frame.from_options(
        connection_type="marketplace.spark",
        connection_options={
            "path": "job_catalog.iceberg_demo.iceberg_output",
            "connectionName": "Iceberg Connector for Glue 3.0",
        },
        transformation_ctx="IcebergDyF",
    )
)

## Optionally, convert to Spark DataFrame if you plan to leverage Iceberg’s SQL based MERGE statements
InputIcebergDF = IcebergDyF.toDF()
  • The following sample code shows how you can integrate the DynamicFrame method to write to an Iceberg table for append-only mode:
## Use the following 2 lines to convert Spark DataFrame to DynamicFrame, if you plan to leverage DynamicFrame API to write to final target
from awsglue.dynamicframe import DynamicFrame 
finalDyF = DynamicFrame.fromDF(InputIcebergDF,glueContext,"finalDyF")

WriteIceberg = glueContext.write_dynamic_frame.from_options(
    frame= finalDyF,
    connection_type="marketplace.spark",
    connection_options={
        "path": "job_catalog.iceberg_demo.iceberg_output",
        "connectionName": "Iceberg Connector for Glue 3.0",
    },
    format="parquet",
    transformation_ctx="WriteIcebergDyF",
)

About the Author

Sakti Mishra is a Principal Data Lab Solution Architect at AWS, where he helps customers modernize their data architecture and help define end to end data strategy including data security, accessibility, governance, and more. He is also the author of the book Simplify Big Data Analytics with Amazon EMR. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places with family.

How GE Proficy Manufacturing Data Cloud replatformed to improve TCO, data SLA, and performance

Post Syndicated from Jyothin Madari original https://aws.amazon.com/blogs/big-data/how-ge-proficy-manufacturing-data-cloud-replatformed-to-improve-tco-data-sla-and-performance/

This is post is co-authored by Jyothin Madari, Madhusudhan Muppagowni and Ayush Srivastava from GE.

GE Proficy Manufacturing Data Cloud (MDC), part of the GE Digital’s Manufacturing Execution Systems (MES) suite of solutions, allows GED’s customers to increase the derived value easily and quickly from the MES by reliably bringing enterprise-wide manufacturing data into the cloud and transforming it into a structured dataset for advanced analytics and deeper insights into the manufacturing processes.

In this post, we share how MDC modernized the hybrid cloud strategy by replatforming. This solution improved scalability, their data availability Service Level Agreement (SLA), and performance.

Challenge

MDC v1 was built on Predix services using industrial use case-optimized Predix services such as Predix Columnar Store (Cassandra) and Predix Insights (Amazon EMR). MDC evolved in both features and the underlying platform over the past year with a goal to improve TCO, data SLA, and performance. MDC’s customer base grew and the number of sites from customers grew to over 100 in the past couple of years. The increased number of sites needed more compute and storage capacity. This increased infrastructure and operational cost significantly, while introducing increased data latency and lowering the data freshness interval from the cloud.

How we started

MDC evaluated several vendors for their storage and compute capabilities using various measurements: security, performance, scalability, ease of management and operation, reduction of overall cost and increase in ROI, partnership, and migration help (technology assistance). The MDC team saw opportunities to improve the product by using native AWS services such as Amazon Redshift, AWS Glue, and Amazon Managed Workflows for Apache Airflow (Amazon MWAA), which made the product more performant and scalable while reducing operation costs and making it future-ready for advanced analytics and new customer use cases.

The GE Digital team, comprised of domain experts, developers, and QA, worked shoulder to shoulder with the AWS ProServe team, comprised of Solution Architects, Data Architects, and Big Data Experts, in determining the key architectural changes required and solutions to implementation challenges.

Overview of solution

The following diagram illustrates the high-level architecture of the solution.

This is a broad overview, and the specifics of networking and security between components are out of scope for this post.

The solution includes the following main steps and components:

  1. CDC and log collector – Compressed CSV data is collected from over 100 Manufacturing Data Sources Proficy Plant Applications and sinked into an Amazon Simple Storage Service (Amazon S3) bucket.
  2. S3 raw bucket – Our data lands in Amazon S3 without any transformation, but appropriately partitioned (tenant, site, date, and so on) for the ease of future processing.
  3. AWS Lambda – When the file lands in the S3 raw bucket, it triggers an S3 event notification, which invokes AWS Lambda. Lambda extracts metadata (bucket name, key name, date, and so on) from the event and saves it in Amazon DynamoDB.
  4. AWS Glue – Our goal is now to take CSV files, with varying schemas, and convert them into Apache Parquet format. An AWS Glue extract, transform, and load (ETL) job reads a list of files to be processed from the DynamoDB table and fetches them from the S3 raw bucket. We have preconfigured unified AVRO schemas in the AWS Glue Schema Registry for schema conversion. Converted data lands in the S3 raw Parquet bucket.
  5. S3 raw Parquet bucket – Data in this bucket is still raw and unmodified; only the format was changed. This intermediary storage is required due to schema and column order mismatch in CSV files.
  6. Amazon Redshift – The majority of transformations and data enrichment happens in this step. Amazon Redshift Spectrum consumes data from the S3 raw Parquet bucket and external PostgreSQL dimension tables (through a federated query). Transformations are performed via stored procedures, where we encapsulate logic for data transformation, data validation, and business-specific logic. The Amazon Redshift cluster is configured with concurrency scaling, auto workload management (WLM) with caching, and the latest RA3 instance types.
  7. MDC API – These custom-built, web-based, REST API microservices talk on the backend with Amazon Redshift and expose data to external users, business intelligence (BI) tools, and partners.
  8. Amazon Redshift data export and archival – On a scheduled basis, Amazon Redshift exports (UNLOAD command) contextualized and business-defined aggregated data. Exports are landed in the S3 bucket as Apache Parquet files.
  9. S3 Parquet export bucket – This bucket stores the exported data (hundreds of TBs) used by external users who need to run extensive, heavy analytics and AI or machine learning (ML) with various tools (such as Amazon EMR, Amazon Athena, Apache Spark, and Dremio).
  10. End-users – External users consume data from the API. The main use case here is reporting and visual analytics.
  11. Amazon MWAA – The orchestrator of the solution, Amazon MWAA is used for scheduling Amazon Redshift stored procedures, AWS Glue ETL jobs, and Amazon Redshift exports at regular intervals with error handling and retries built in.

Bringing it all together

MDC replaced both Predix Columnar Store (Cassandra) and Predix Insights (Amazon EMR) with Amazon Redshift for both storage of the MDC data models and compute (ELT). Amazon MWAA is used to schedule the workloads that do the bulk of the ELT. Lambda, AWS Glue, and DynamoDB are used to normalize the schema differences between sites. It was important not to disrupt MDC customers while replatforming. To achieve this, MDC used a phased approach to migrate the data models to Amazon Redshift. They used federated queries to query existing PostgreSQL for dimensional data, which facilitated having some of the data models in Amazon Redshift, while the others were in Cassandra with no interruption to MDC customers. Redshift Spectrum facilitated querying the raw data in Amazon S3 directly both for ETL and data validation.

75% of the MDC team along with the AWS ProServe team and AWS Solution Architects collaborated with the GE Digital Security Team and Platform Team to implement the architecture with AWS native services. It took approximately 9 months to implement, secure, and performance tune the architecture and migrate data models in three phases. Each phase has gone through a GE Digital internal security review. Amazon Redshift Auto WLM, short query acceleration, and tuning the sort keys to optimize querying patterns improved the Proficy MDC API performance. Because the unload of the data from Amazon Redshift was fast, Proficy MDC is now able to export the data much more frequently to our end customers.

Conclusion

With replatforming, Proficy MDC was able to improve ETL performance by approximately 75%. Data latency and freshness improved by approximately 87%. The solution reduced TCO of the platform by approximately 50%. Proficy MDC was also able reduce the infrastructure and operational cost. Improved performance and reduced latency has allowed us to speed up the next steps in our journey to modernize the enterprise data architecture and hybrid cloud data platform.


About the Authors

Jyothin Madari leads the Manufacturing Data Cloud (MDC) engineering team; part of the manufacturing suite of products at GE Digital. He has 18 years of experience, 4 of which is with GE Digital. Most recently he has been working on data migration projects with an aim to reduce costs and improve performance. He is an AWS Certified Cloud Practitioner, a keen learner and loves solving interesting problems. Connect with him on LinkedIn.

Madhusudhan (Madhu) Muppagowni is a Technical Architect and Principal Software Developer based in Silicon Valley, Bay Area, California.  He is passionate about Software Development and Architecture. He thrives on producing Well-Architected and Secure SaaS Products, Data Pipelines that can make a real impact.  He loves outdoors and an avid hiker and backpacker. Connect with him on LinkedIn.

Ayush Srivastava is a Senior Staff Engineer and Technical Anchor based in Hyderabad, India. He is passionate about Software Development and Architecture. He has Demonstrated track record of successfully technical anchoring small to large Secure SaaS Products, Data Pipelines from start to finish. He loves exploring different places and he says “I’m in love with cities I have never been to and people I have never met.” Connect with him on LinkedIn.

Karen Grygoryan is Data Architect with AWS ProServe. Connect with him on LinkedIn.

Gnanasekaran Kailasam is a Data Architect at AWS. He has worked with building data warehouses and big data solutions for over 16 years. He loves to learn new technologies and solving, automating, and simplifying customer problems with easy-to-use cloud data solutions on AWS. Connect with him on LinkedIn.

Optimize Federated Query Performance using EXPLAIN and EXPLAIN ANALYZE in Amazon Athena

Post Syndicated from Nishchai JM original https://aws.amazon.com/blogs/big-data/optimize-federated-query-performance-using-explain-and-explain-analyze-in-amazon-athena/

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. In 2019, Athena added support for federated queries to run SQL queries across data stored in relational, non-relational, object, and custom data sources.

In 2021, Athena added support for the EXPLAIN statement, which can help you understand and improve the efficiency of your queries. The EXPLAIN statement provides a detailed breakdown of a query’s run plan. You can analyze the plan to identify and reduce query complexity and improve its runtime. You can also use EXPLAIN to validate SQL syntax prior to running the query. Doing so helps prevent errors that would have occurred while running the query.

Athena also added EXPLAIN ANALYZE, which displays the computational cost of your queries alongside their run plans. Administrators can benefit from using EXPLAIN ANALYZE because it provides a scanned data count, which helps you reduce financial impact due to user queries and apply optimizations for better cost control.

In this post, we demonstrate how to use and interpret EXPLAIN and EXPLAIN ANALYZE statements to improve Athena query performance when querying multiple data sources.

Solution overview

To demonstrate using EXPLAIN and EXPLAIN ANALYZE statements, we use the following services and resources:

Athena uses the AWS Glue Data Catalog to store and retrieve table metadata for the Amazon S3 data in your AWS account. The table metadata lets the Athena query engine know how to find, read, and process the data that you want to query. We use Athena data source connectors to connect to data sources external to Amazon S3.

Prerequisites

To deploy the CloudFormation template, you must have the following:

Provision resources with AWS CloudFormation

To deploy the CloudFormation template, complete the following steps:

  1. Choose Launch Stack:

  1. Follow the prompts on the AWS CloudFormation console to create the stack.
  2. Note the key-value pairs on the stack’s Outputs tab.

You use these values when configuring the Athena data source connectors.

The CloudFormation template creates the following resources:

  • S3 buckets to store data and act as temporary spill buckets for Lambda
  • AWS Glue Data Catalog tables for the data in the S3 buckets
  • A DynamoDB table and Amazon RDS for MySQL tables, which are used to join multiple tables from different sources
  • A VPC, subnets, and endpoints, which are needed for Amazon RDS for MySQL and DynamoDB

The following figure shows the high-level data model for the data load.

Create the DynamoDB data source connector

To create the DynamoDB connector for Athena, complete the following steps:

  1. On the Athena console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. For Data sources, select Amazon DynamoDB.
  4. Choose Next.

  1. For Data source name, enter DDB.

  1. For Lambda function, choose Create Lambda function.

This opens a new tab in your browser.

  1. For Application name, enter AthenaDynamoDBConnector.
  2. For SpillBucket, enter the value from the CloudFormation stack for AthenaSpillBucket.
  3. For AthenaCatalogName, enter dynamodb-lambda-func.
  4. Leave the remaining values at their defaults.
  5. Select I acknowledge that this app creates custom IAM roles and resource policies.
  6. Choose Deploy.

You’re returned to the Connect data sources section on the Athena console.

  1. Choose the refresh icon next to Lambda function.
  2. Choose the Lambda function you just created (dynamodb-lambda-func).

  1. Choose Next.
  2. Review the settings and choose Create data source.
  3. If you haven’t already set up the Athena query results location, choose View settings on the Athena query editor page.

  1. Choose Manage.
  2. For Location of query result, browse to the S3 bucket specified for the Athena spill bucket in the CloudFormation template.
  3. Add Athena-query to the S3 path.
  4. Choose Save.

  1. In the Athena query editor, for Data source, choose DDB.
  2. For Database, choose default.

You can now explore the schema for the sportseventinfo table; the data is the same in DynamoDB.

  1. Choose the options icon for the sportseventinfo table and choose Preview Table.

Create the Amazon RDS for MySQL data source connector

Now let’s create the connector for Amazon RDS for MySQL.

  1. On the Athena console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. For Data sources, select MySQL.
  4. Choose Next.

  1. For Data source name, enter MySQL.

  1. For Lambda function, choose Create Lambda function.

  1. For Application name, enter AthenaMySQLConnector.
  2. For SecretNamePrefix, enter AthenaMySQLFederation.
  3. For SpillBucket, enter the value from the CloudFormation stack for AthenaSpillBucket.
  4. For DefaultConnectionString, enter the value from the CloudFormation stack for MySQLConnection.
  5. For LambdaFunctionName, enter mysql-lambda-func.
  6. For SecurityGroupIds, enter the value from the CloudFormation stack for RDSSecurityGroup.
  7. For SubnetIds, enter the value from the CloudFormation stack for RDSSubnets.
  8. Select I acknowledge that this app creates custom IAM roles and resource policies.
  9. Choose Deploy.

  1. On the Lambda console, open the function you created (mysql-lambda-func).
  2. On the Configuration tab, under Environment variables, choose Edit.

  1. Choose Add environment variable.
  2. Enter a new key-value pair:
    • For Key, enter MYSQL_connection_string.
    • For Value, enter the value from the CloudFormation stack for MySQLConnection.
  3. Choose Save.

  1. Return to the Connect data sources section on the Athena console.
  2. Choose the refresh icon next to Lambda function.
  3. Choose the Lambda function you created (mysql-lamdba-function).

  1. Choose Next.
  2. Review the settings and choose Create data source.
  3. In the Athena query editor, for Data Source, choose MYSQL.
  4. For Database, choose sportsdata.

  1. Choose the options icon by the tables and choose Preview Table to examine the data and schema.

In the following sections, we demonstrate different ways to optimize our queries.

Optimal join order using EXPLAIN plan

A join is a basic SQL operation to query data on multiple tables using relations on matching columns. Join operations affect how much data is read from a table, how much data is transferred to the intermediate stages through networks, and how much memory is needed to build up a hash table to facilitate a join.

If you have multiple join operations and these join tables aren’t in the correct order, you may experience performance issues. To demonstrate this, we use the following tables from difference sources and join them in a certain order. Then we observe the query runtime and improve performance by using the EXPLAIN feature from Athena, which provides some suggestions for optimizing the query.

The CloudFormation template you ran earlier loaded data into the following services:

AWS Storage Table Name Number of Rows
Amazon DynamoDB sportseventinfo 657
Amazon S3 person 7,025,585
Amazon S3 ticketinfo 2,488

Let’s construct a query to find all those who participated in the event by type of tickets. The query runtime with the following join took approximately 7 mins to complete:

SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."person" p, 
"AwsDataCatalog"."athenablog"."ticketinfo" t 
WHERE 
t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

Now let’s use EXPLAIN on the query to see its run plan. We use the same query as before, but add explain (TYPE DISTRIBUTED):

EXPLAIN (TYPE DISTRIBUTED)
SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."person" p, 
"AwsDataCatalog"."athenablog"."ticketinfo" t 
WHERE 
t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

The following screenshot shows our output

Notice the cross-join in Fragment 1. The joins are converted to a Cartesian product for each table, where every record in a table is compared to every record in another table. Therefore, this query takes a significant amount of time to complete.

To optimize our query, we can rewrite it by reordering the joining tables as sportseventinfo first, ticketinfo second, and person last. The reason for this is because the WHERE clause, which is being converted to a JOIN ON clause during the query plan stage, doesn’t have the join relationship between the person table and sportseventinfo table. Therefore, the query plan generator converted the join type to cross-joins (a Cartesian product), which less efficient. Reordering the tables aligns the WHERE clause to the INNER JOIN type, which satisfies the JOIN ON clause and runtime is reduced from 7 minutes to 10 seconds.

The code for our optimized query is as follows:

SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."ticketinfo" t, 
"AwsDataCatalog"."athenablog"."person" p 
WHERE 
t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

The following is the EXPLAIN output of our query after reordering the join clause:

EXPLAIN (TYPE DISTRIBUTED) 
SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."ticketinfo" t, 
"AwsDataCatalog"."athenablog"."person" p 
WHERE t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

The following screenshot shows our output.

The cross-join changed to INNER JOIN with join on columns (eventid, id, ticketholder_id), which results in the query running faster. Joins between the ticketinfo and person tables converted to the PARTITION distribution type, where both left and right tables are hash-partitioned across all worker nodes due to the size of the person table. The join between the sportseventinfo table and ticketinfo are converted to the REPLICATED distribution type, where one table is hash-partitioned across all worker nodes and the other table is replicated to all worker nodes to perform the join operation.

For more information about how to analyze these results, refer to Understanding Athena EXPLAIN statement results.

As a best practice, we recommend having a JOIN statement along with an ON clause, as shown in the following code:

SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"AwsDataCatalog"."athenablog"."person" p 
JOIN "AwsDataCatalog"."athenablog"."ticketinfo" t ON t.ticketholder_id = p.id 
JOIN "ddb"."default"."sportseventinfo" e ON t.sporting_event_id = cast(e.eventid as double)

Also as a best practice when you join two tables, specify the larger table on the left side of join and the smaller table on the right side of the join. Athena distributes the table on the right to worker nodes, and then streams the table on the left to do the join. If the table on the right is smaller, then less memory is used and the query runs faster.

In the following sections, we present examples of how to optimize pushdowns for filter predicates and projection filter operations for the Athena data source using EXPLAIN ANALYZE.

Pushdown optimization for the Athena connector for Amazon RDS for MySQL

A pushdown is an optimization to improve the performance of a SQL query by moving its processing as close to the data as possible. Pushdowns can drastically reduce SQL statement processing time by filtering data before transferring it over the network and filtering data before loading it into memory. The Athena connector for Amazon RDS for MySQL supports pushdowns for filter predicates and projection pushdowns.

The following table summarizes the services and tables we use to demonstrate a pushdown using Aurora MySQL.

Table Name Number of Rows Size in KB
player_partitioned 5,157 318.86
sport_team_partitioned 62 5.32

We use the following query as an example of a filtering predicate and projection filter:

SELECT full_name,
name 
FROM "sportsdata"."player_partitioned" a 
JOIN "sportsdata"."sport_team_partitioned" b ON a.sport_team_id=b.id 
WHERE a.id='1.0'

This query selects the players and their team based on their ID. It serves as an example of both filter operations in the WHERE clause and projection because it selects only two columns.

We use EXPLAIN ANALYZE to get the cost for the running this query:

EXPLAIN ANALYZE 
SELECT full_name,
name 
FROM "MYSQL"."sportsdata"."player_partitioned" a 
JOIN "MYSQL"."sportsdata"."sport_team_partitioned" b ON a.sport_team_id=b.id 
WHERE a.id='1.0'

The following screenshot shows the output in Fragment 2 for the table player_partitioned, in which we observe that the connector has a successful pushdown filter on the source side, so it tries to scan only one record out of the 5,157 records in the table. The output also shows that the query scan has only two columns (full_name as the projection column and sport_team_id and the join column), and uses SELECT and JOIN, which indicates the projection pushdown is successful. This helps reduce the data scan when using Athena data source connectors.

Now let’s look at the conditions in which a filter predicate pushdown doesn’t work with Athena connectors.

LIKE statement in filter predicates

We start with the following example query to demonstrate using the LIKE statement in filter predicates:

SELECT * 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name LIKE '%Aar%'

We then add EXPLAIN ANALYZE:

EXPLAIN ANALYZE 
SELECT * 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name LIKE '%Aar%'

The EXPLAIN ANALYZE output shows that the query performs the table scan (scanning the table player_partitioned, which contains 5,157 records) for all the records even though the WHERE clause only has 30 records matching the condition %Aar%. Therefore, the data scan shows the complete table size even with the WHERE clause.

We can optimize the same query by selecting only the required columns:

EXPLAIN ANALYZE 
SELECT sport_team_id,
full_name 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name LIKE '%Aar%'

From the EXPLAIN ANALYZE output, we can observe that the connector supports the projection filter pushdown, because we select only two columns. This brought the data scan size down to half of the table size.

OR statement in filter predicates

We start with the following query to demonstrate using the OR statement in filter predicates:

SELECT id,
first_name 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name = 'Aaron' OR id ='1.0'

We use EXPLAIN ANALYZE with the preceding query as follows:

EXPLAIN ANALYZE 
SELECT * 
FROM 
"MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name = 'Aaron' OR id ='1.0'

Similar to the LIKE statement, the following output shows that query scanned the table instead of pushing down to only the records that matched the WHERE clause. This query outputs only 16 records, but the data scan indicates a complete scan.

Pushdown optimization for the Athena connector for DynamoDB

For our example using the DynamoDB connector, we use the following data:

Table Number of Rows Size in KB
sportseventinfo 657 85.75

Let’s test the filter predicate and project filter operation for our DynamoDB table using the following query. This query tries to get all the events and sports for a given location. We use EXPLAIN ANALYZE for the query as follows:

EXPLAIN ANALYZE 
SELECT EventId,
Sport 
FROM "DDB"."default"."sportseventinfo" 
WHERE Location = 'Chase Field'

The output of EXPLAIN ANALYZE shows that the filter predicate retrieved only 21 records, and the project filter selected only two columns to push down to the source. Therefore, the data scan for this query is less than the table size.

Now let’s see where filter predicate pushdown doesn’t work. In the WHERE clause, if you apply the TRIM() function to the Location column and then filter, predicate pushdown optimization doesn’t apply, but we still see the projection filter optimization, which does apply. See the following code:

EXPLAIN ANALYZE 
SELECT EventId,
Sport 
FROM "DDB"."default"."sportseventinfo" 
WHERE trim(Location) = 'Chase Field'

The output of EXPLAIN ANALYZE for this query shows that the query scans all the rows but is still limited to only two columns, which shows that the filter predicate doesn’t work when the TRIM function is applied.

We’ve seen from the preceding examples that the Athena data source connector for Amazon RDS for MySQL and DynamoDB do support filter predicates and projection predicates for pushdown optimization, but we also saw that operations such as LIKE, OR, and TRIM when used in the filter predicate don’t support pushdowns to the source. Therefore, if you encounter unexplained charges in your federated Athena query, we recommend using EXPLAIN ANALYZE with the query and determine whether your Athena connector supports the pushdown operation or not.

Please note that running EXPLAIN ANALYZE incurs cost because it scans the data.

Conclusion

In this post, we showcased how to use EXPLAIN and EXPLAIN ANALYZE to analyze Athena SQL queries for data sources on AWS S3 and Athena federated SQL query for data source like DynamoDB and Amazon RDS for MySQL. You can use this as an example to optimize queries which would also result in cost savings.


About the Authors

Nishchai JM is an Analytics Specialist Solutions Architect at Amazon Web services. He specializes in building Big-data applications and help customer to modernize their applications on Cloud. He thinks Data is new oil and spends most of his time in deriving insights out of the Data.

Varad Ram is Senior Solutions Architect in Amazon Web Services. He likes to help customers adopt to cloud technologies and is particularly interested in artificial intelligence. He believes deep learning will power future technology growth. In his spare time, he like to be outdoor with his daughter and son.

Simplify and optimize Python package management for AWS Glue PySpark jobs with AWS CodeArtifact

Post Syndicated from Ashok Padmanabhan original https://aws.amazon.com/blogs/big-data/simplify-and-optimize-python-package-management-for-aws-glue-pyspark-jobs-with-aws-codeartifact/

Data engineers use various Python packages to meet their data processing requirements while building data pipelines with AWS Glue PySpark Jobs. Languages like Python and Scala are commonly used in data pipeline development. Developers can take advantage of their open-source packages or even customize their own to make it easier and faster to perform use cases, such as data manipulation and analysis. However, managing standardized packages can be cumbersome with multiple teams using different versions of packages, installing non-approved packages, and causing duplicate development effort due to the lack of visibility of what is available at the enterprise level. This can be especially challenging in large enterprises with multiple data engineering teams.

ETL Developers have requirements to use additional packages for their AWS Glue ETL jobs. With security being job zero for customers, many will restrict egress traffic from their VPC to the public internet, and they need a way to manage the packages used by applications including their data processing pipelines.

Our proposed solution will enable you with network egress restrictions to manage packages centrally with AWS CodeArtifact and use their favorite libraries in their AWS Glue ETL PySpark code. In this post, we’ll describe how CodeArtifact can be used for managing packages and modules for AWS Glue ETL jobs, and we’ll demo a solution using Glue PySpark jobs that run within VPC Subnets that have no internet access.

Solution overview

The solution uses CodeArtifact as a tool to make it easier for organizations of any size to securely store, publish, and share software packages used in their ETL with AWS Glue. VPC Endpoints will be enabled for CodeArtifact and Glue to enable private link connections. AWS Step Functions makes it easy to coordinate the orchestration of components used in the data processing pipeline. Native integrations with both CodeArtifact and AWS Glue enable the workflow to both authenticate the request to CodeArtifact and start the AWS Glue ETL job.

The following architecture shows an implementation of a solution using AWS Glue, CodeArtifact, and Step Functions to use additional Python modules without egress internet access. The solution is deployed using AWS Cloud Development Kit (AWS CDK), an open-source software development framework to define your cloud application resources using familiar programming languages.

Solution Architecture for the blog post

Fig 1: Architecture Diagram for the Solution

To illustrate how to set up this architecture, we’ll walk you through the following steps:

  1. Deploying an AWS CDK stack to provision the following AWS Resources
    1. CodeArtifact
    2. An AWS Glue job
    3. Step Functions workflow
    4. Amazon Simple Storage Service (Amazon S3) bucket
    5. A VPC with a private Subnet and VPC Endpoints to Amazon S3 and CodeArtifact
  2. Validate the Deployment.
  3. Run a Sample Workflow – This workflow will run an AWS Glue PySpark job that uses a custom Python library, and an upgraded version of boto3.
  4. Cleaning up your resources.

Prerequisites

Make sure that you complete the following steps as prerequisites:

The solution

Launching your AWS CDK Stack

Step 1: Using your device’s command line, check out our Git repository to a local directory on your device:

git clone https://github.com/aws-samples/python-lib-management-without-internet-for-aws-glue-in-private-subnets.git

Step 2: Change directories to the new directory Amazon S3 script location:

cd python-lib-management-without-internet-for-aws-glue-in-private-subnets/scripts/s3

Step 3: Download the following CSV, which contains New York City Taxi and Limousine Commission (TLC) Trip weekly trips. This will serve as the input source for the AWS Glue Job:

aws s3 cp s3://nyc-tlc/misc/FOIL_weekly_trips_apps.csv .

Step 4: Change the directories to the path where the app.py file is located (in reference to the previous step, execute the following step):

cd ../..

Step 5: Create a virtual environment:

macOS/Linux:
python3 -m venv .env

Windows:
python -m venv .env

Step 6: Activate the virtual environment after the init process completes and the virtual environment is created:

macOS/Linux:
source .env/bin/activate

Windows:
.env\Scripts\activate.bat

Step 7: Install the required dependencies:

pip3 install -r requirements.txt

Step 8: Make sure that your AWS profile is setup along with the region that you want to deploy as mentioned in the prerequisite. Synthesize the templates. AWS CDK apps use code to define the infrastructure, and when run they produce or “synthesize” a CloudFormation template for each stack defined in the application:

cdk synthesize

Step 9: BootStrap the cdk app using the following command:

cdk bootstrap aws://<AWS_ACCOUNTID>/<AWS_REGION>

Replace the place holder AWS_ACCOUNTID and AWS_REGION with your AWS account ID and the region to be deployed.

This step provisions the initial resources, including an Amazon S3 bucket for storing files and IAM roles that grant permissions needed to perform deployments.

Step 10: Deploy the solution. By default, some actions that could potentially make security changes require approval. In this deployment, you’re creating an IAM role. The following command overrides the approval prompts, but if you would like to manually accept the prompts, then omit the --require-approval never flag:

cdk deploy "*" --require-approval never

While the AWS CDK deploys the CloudFormation stacks, you can follow the deployment progress in your terminal:

AWS CDK Deployment progress in terminal

Fig 2: AWS CDK Deployment progress in terminal

Once the deployment is successful, you’ll see the successful status as follows:

AWS CDK Deployment completion success

Fig 3: AWS CDK Deployment completion success

Step 11: Log in to the AWS Console, go to CloudFormation, and see the output of the ApplicationStack stack:

AWS CloudFormation stack output

Fig 4: AWS CloudFormation stack output

Note the values of the DomainName and RepositoryName variables. We’ll use them in the next step to upload our artifacts

Step 12: We will upload a custom library into the repo that we created. This will be used by our Glue ETL job.

  • Install twine using pip:
python3 -m pip install twine

The custom python package glueutils-0.2.0.tar.gz can be found under this folder of the cloned repo:

cd scripts/custom_glue_library
  • Configure twine with the login command (additional details here ). Refer to step 11 for the DomainName and RepositoryName from the CloudFormation output:
aws codeartifact login --tool twine --domain <DomainName> --domain-owner <AWS_ACCOUNTID> --repository <RepositoryName>
  • Publish Python package assets:
twine upload --repository codeartifact glueutils-0.2.0.tar.gz
Python package publishing using twine

Fig 5: Python package publishing using twine

Validate the Deployment

The AWS CDK stack will deploy the following AWS resources:

  1. Amazon Virtual Private Cloud (Amazon VPC)
    1. One Private Subnet
  2. AWS CodeArtifact
    1. CodeArtifact Repository
    2. CodeArtifact Domain
    3. CodeArtifact Upstream Repository
  3. AWS Glue
    1. AWS Glue Job
    2. AWS Glue Database
    3. AWS Glue Connection
  4. AWS Step Function
  5. Amazon S3 Bucket for AWS CDK and also for storing scripts and CSV file
  6. IAM Roles and Policies
  7. Amazon Elastic Compute Cloud (Amazon EC2) Security Group

Step 1: Browse to the AWS account and region via the AWS Console to which the resources are deployed.

Step 2: Browse the Subnet page (https://<region> .console.aws.amazon.com/vpc/home?region=<region> #subnets:) (*Replace region with actual AWS Region to which your resources are deployed)

Step 3: Select the Subnet with name as ApplicationStack/enterprise-repo-vpc/Enterprise-Repo-Private-Subnet1

Step 4: Select the Route Table and validate that there are no Internet Gateway or NAT Gateway for routes to Internet, and that it’s similar to the following image:

Route table validation

Fig 6: Route table validation

Step 5: Navigate to the CodeArtifact console and review the repositories created. The enterprise-repo is your local repository, and pypi-store is the upstream repository connected to the PyPI, providing artifacts from pypi.org.

AWS CodeArifact repositories created

Fig 7: AWS CodeArifact repositories created

Step 6: Navigate to enterprise-repo and search for glueutils. This is the custom python package that we published.

AWS CodeArifact custom python package published

Fig 8: AWS CodeArifact custom python package published

Step 7: Navigate to Step Functions Console and review the enterprise-repo-step-function as follows:

AWS Step Functions workflow

Fig 9: AWS Step Functions workflow

The diagram shows how the Step Functions workflow will orchestrate the pattern.

  1. The first step CodeArtifactGetAuthorizationToken calls the getAuthorizationToken API to generate a temporary authorization token for accessing repositories in the domain (this token is valid for 15 mins.).
  2. The next step GenerateCodeArtifactURL takes the authorization token from the response and generates the CodeArtifact URL.
  3. Then, this will move into the GlueStartJobRun state, which makes a synchronous API call to run the AWS Glue job.

Step 8: Navigate to the AWS Glue Console and select the Jobs tab, then select enterprise-repo-glue-job.

The AWS Glue job is created with the following script and AWS Glue Connection enterprise-repo-glue-connection. The AWS Glue connection is a Data Catalog object that enables the job to connect to sources and APIs from within the VPC. The network type connection runs the job from within the private subnet to make requests to Amazon S3 and CodeArtifact over the VPC endpoint connection. This enables the job to run without any traffic through the internet.

Note the connections section in the AWS Glue PySpark Job, which makes the Glue job run on the private subnet in the VPC provisioned.

AWS Glue network connections

Fig 10: AWS Glue network connections

The job takes an Amazon S3 bucket, Glue Database, Python Job Installer Option, and Additional Python Modules as job parameters. The parameters --additional-python-modules and --python-modules-installer-option are passed to install the selected Python module from a PyPI repository hosted in AWS CodeArtifact.

The script itself first reads the Amazon S3 input path of the taxi data in the CSV format. A light transformation to sum the total trips by year, week, and app is performed. Then the output is written to an Amazon S3 path as parquet . A partitioned table in the AWS Glue Data Catalog will either be created or updated if it already exists .

You can find the Glue PySpark script here.

Run a sample workflow

The following steps will demonstrate how to run a sample workflow:

Step 1: Navigate to the Step Functions Console and select the enterprise-repo-step-function.

Step 2: Select Start execution and input the following: We’re including the glueutils and latest boto3 libraries as part of the job run. It is always recommended to pin your python dependencies to avoid any breaking change due to a future version of dependency . In the below example, the latest available version of boto3, and the 0.2.0 version of glueutils will be installed. To pin it to a specific release you may add  boto3==1.24.2   (Current latest release at the time of publishing this post).

{"pythonmodules": "boto3,glueutils==0.2.0"}

Step 3: Select Start execution and wait until Execution Status is Succeeded. This may take a few minutes.

Step 4: Navigate to the CodeArtifact Console to review the enterprise-repo repository. You’ll see the cached PyPi packages and all of their dependencies pulled down from PyPi.

Step 5: In the Glue Console under the Runs section of the enterprise-glue-job, you’ll see the parameters passed:

Fig 11 : AWS Glue job execution history

Fig 11 : AWS Glue job execution history

Note the --index-url which was passed as a parameter to the glue ETL job. The token is valid only for 15 minutes.

Step 6: Navigate to the Amazon CloudWatch Console and go to the /aws/glue-jobs log group to verify that the packages were installed from the local repo.

You will see that the 2 package names passed as parameters are installed with the corresponding versions.

Fig 12 : Amazon CloudWatch logs details for the Glue job

Fig 12 : Amazon CloudWatch logs details for the Glue job

Step 7: Navigate to the Amazon Athena console and select Query Editor.

Step 8: Run the following query to validate the output of the AWS Glue job:

SELECT year, app, SUM(total_trips) as sum_of_total_trips 
FROM 
"codeartifactblog_glue_db"."taxidataparquet" 
GROUP BY year, app;

Clean up

Make sure that you clean up all of the other AWS resources that you created in the AWS CDK Stack deployment. You can delete these resources via the AWS CDK Destroy command as follows or the CloudFormation console.

To destroy the resources using AWS CDK, follow these steps:

  1. Follow Steps 1-6 from the ‘Launching your CDK Stack’ section.
  2. Destroy the app by executing the following command:
    cdk destroy

Conclusion

In this post, we demonstrated how CodeArtifact can be used for managing Python packages and modules for AWS Glue jobs that run within VPC Subnets that have no internet access. We also demonstrated how the versions of existing packages can be updated (i.e., boto3) and a custom Python library (glueutils) that is developed locally is also managed through CodeArtifact.

This post enables you to use your favorite Python packages with AWS Glue ETL PySpark jobs by modifying the input to the AWS StepFunctions workflow (Step 2 in the Run a Sample workflow section).


About the Authors

Bret Pontillo is a Data & ML Engineer with AWS Professional Services. He works closely with enterprise customers building data lakes and analytical applications on the AWS platform. In his free time, Bret enjoys traveling, watching sports, and trying new restaurants.

Gaurav Gundal is a DevOps consultant with AWS Professional Services, helping customers build solutions on the customer platform. When not building, designing, or developing solutions, Gaurav spends time with his family, plays guitar, and enjoys traveling to different places.

Ashok Padmanabhan is a Sr. IOT Data Architect with AWS Professional Services, helping customers build data and analytics platform and solutions. When not helping customers build and design data lakes, Ashok enjoys spending time at the beach near his home in Florida.

A serverless operational data lake for retail with AWS Glue, Amazon Kinesis Data Streams, Amazon DynamoDB, and Amazon QuickSight

Post Syndicated from Gandhi Raketla original https://aws.amazon.com/blogs/big-data/a-serverless-operational-data-lake-for-retail-with-aws-glue-amazon-kinesis-data-streams-amazon-dynamodb-and-amazon-quicksight/

Do you want to reduce stockouts at stores? Do you want to improve order delivery timelines? Do you want to provide your customers with accurate product availability, down to the millisecond? A retail operational data lake can help you transform the customer experience by providing deeper insights into a variety of operational aspects of your supply chain.

In this post, we demonstrate how to create a serverless operational data lake using AWS services, including AWS Glue, Amazon Kinesis Data Streams, Amazon DynamoDB, Amazon Athena, and Amazon QuickSight.

Retail operations is a critical functional area that gives retailers a competitive edge. An efficient retail operation can optimize the supply chain for a better customer experience and cost reduction. An optimized retail operation can reduce frequent stockouts and delayed shipments, and provide accurate inventory and order details. Today, a retailer’s channels aren’t just store and web—they include mobile apps, chatbots, connected devices, and social media channels. The data is both structured and unstructured. This coupled with multiple fulfillment options like buy online and pick up at store, ship from store, or ship from distribution centers, which increases the complexity of retail operations.

Most retailers use a centralized order management system (OMS) for managing orders, inventory, shipments, payments, and other operational aspects. These legacy OMSs are unable to scale in response to the rapid changes in retail business models. The enterprise applications that are key for efficient and smooth retail operations rely on a central OMS. Applications for ecommerce, warehouse management, call centers, and mobile all require an OMS to get order status, inventory positions of different items, shipment status, and more. Another challenge with legacy OMSs is they’re not designed to handle unstructured data like weather data and IoT data that could impact inventory and order fulfillment. A legacy OMS that can’t scale prohibits you from implementing new business models that could transform your customer experience.

A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. An operational data lake addresses this challenge by providing easy access to structured and unstructured operational data in real time from various enterprise systems. You can store your data as is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning (ML)—to guide better decisions. This can ease the burden on OMSs that can instead focus on order orchestration and management.

Solution overview

In this post, we create an end-to-end pipeline to ingest, store, process, analyze, and visualize operational data like orders, inventory, and shipment updates. We use the following AWS services as key components:

  • Kinesis Data Streams to ingest all operational data in real time from various systems
  • DynamoDB, Amazon Aurora, and Amazon Simple Storage Service (Amazon S3) to store the data
  • AWS Glue DataBrew to clean and transform the data
  • AWS Glue crawlers to catalog the data
  • Athena to query the processed data
  • A QuickSight dashboard that provides insights into various operational metrics

The following diagram illustrates the solution architecture.

The data pipeline consists of stages to ingest, store, process, analyze, and finally visualize the data, which we discuss in more detail in the following sections.

Data ingestion

Orders and inventory data is ingested in real time from multiple sources like web applications, mobile apps, and connected devices into Kinesis Data Streams. Kinesis Data Streams is a massively scalable and durable real-time data streaming service. Kinesis Data Streams can continuously capture gigabytes of data per second from hundreds of thousands of sources, such as web applications, database events, inventory transactions, and payment transactions. Frontend systems like ecommerce applications and mobile apps ingest the order data as soon as items are added to a cart or an order is created. The OMS ingests orders when the order status changes. OMSs, stores, and third-party suppliers ingest inventory updates into the data stream.

To simulate orders, an AWS Lambda function is triggered by a scheduled Amazon CloudWatch event every minute to ingest orders to a data stream. This function simulates the typical order management system lifecycle (order created, scheduled, released, shipped, and delivered). Similarly, a second Lambda function is triggered by a CloudWatch event to generate inventory updates. This function simulates different inventory updates such as purchase orders created from systems like the OMS or third-party suppliers. In a production environment, this data would come from frontend applications and a centralized order management system.

Data storage

There are two types of data: hot and cold data. Hot data is consumed by frontend applications like web applications, mobile apps, and connected devices. The following are some example use cases for hot data:

  • When a customer is browsing products, the real-time availability of the item must be displayed
  • Customers interacting with Alexa to know the status of the order
  • A call center agent interacting with a customer needs to know the status of the customer order or its shipment details

The systems, APIs, and devices that consume this data need the data within seconds or milliseconds of the transactions.

Cold data is used for long-term analytics like orders over a period of time, orders by channel, top 10 items by number of orders, or planned vs. available inventory by item, warehouse, or store.

For this solution, we store orders hot data in DynamoDB. DynamoDB is a fully managed NoSQL database that delivers single-digit millisecond performance at any scale. A Lambda function processes records in the Kinesis data stream and stores it in a DynamoDB table.

Inventory hot data is stored in an Amazon Aurora MySQL-Compatible Edition database. Inventory is transactional data that requires high consistency so that customers aren’t over-promised or under-promised when they place orders. Aurora MySQL is fully managed database that is up to five times faster than standard MySQL databases and three times faster than standard PostgreSQL databases. It provides the security, availability, and reliability of commercial databases at a tenth of the cost.

Amazon S3 is object storage built to store and retrieve any amount of data from anywhere. It’s a simple storage service that offers industry-leading durability, availability, performance, security, and virtually unlimited scalability at very low cost. Order and inventory cold data is stored in Amazon S3.

Amazon Kinesis Data Firehose reads the data from the Kinesis data stream and stores it in Amazon S3. Kinesis Data Firehose is the easiest way to load streaming data into data stores and analytics tools. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, Amazon OpenSearch Service, and Splunk, enabling near-real-time analytics.

Data processing

The data processing stage involves cleaning, preparing, and transforming the data to help downstream analytics applications easily query the data. Each frontend system might have a different data format. In the data processing stage, data is cleaned and converted into a common canonical form.

For this solution, we use DataBrew to clean and convert orders into a common canonical form. DataBrew is a visual data preparation tool that makes it easy for data analysts and data scientists to prepare data with an interactive, point-and-click visual interface without writing code. DataBrew provides over 250 built-in transformations to combine, pivot, and transpose the data without writing code. The cleaning and transformation steps in DataBrew are called recipes. A scheduled DataBrew job applies the recipes to the data in an S3 bucket and stores the output in a different bucket.

AWS Glue crawlers can access data stores, extract metadata, and create table definitions in the AWS Glue Data Catalog. You can schedule a crawler to crawl the transformed data and create or update the Data Catalog. The AWS Glue Data Catalog is your persistent metadata store. It’s a managed service that lets you store, annotate, and share metadata in the AWS Cloud in the same way you would in an Apache Hive metastore. We use crawlers to populate the Data Catalog with tables.

Data analysis

We can query orders and inventory data from S3 buckets using Athena. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Views are created in Athena that can be consumed by business intelligence (BI) services like QuickSight.

Data visualization

We generate dashboards using QuickSight. QuickSight is a scalable, serverless, embeddable BI service powered by ML and built for the cloud. QuickSight lets you easily create and publish interactive BI dashboards that include ML-powered insights.

QuickSight also has features to forecast orders, detect anomalies in the order, and provide ML-powered insights. We can create analyses such as orders over a period of time, orders split by channel, top 10 locations for orders, or order fulfillment timelines (the time it took from order creation to order delivery).

Walkthrough overview

To implement this solution, you complete the following high-level steps:

  1. Create solution resources using AWS CloudFormation.
  2. Connect to the inventory database.
  3. Load the inventory database with tables.
  4. Create a VPC endpoint using Amazon Virtual Private Cloud (Amazon VPC).
  5. Create gateway endpoints for Amazon S3 on the default VPC.
  6. Enable CloudWatch rules via Amazon EventBridge to ingest the data.
  7. Transform the data using AWS Glue.
  8. Visualize the data with QuickSight.

Prerequisites

Complete the following prerequisite steps:

  1. Create AWS account if you don’t have done already.
  2. Sign up for QuickSight if you’ve never used QuickSight in this account before. To use the forecast ability in QuickSight, sign up for the Enterprise Edition.

Create resources with AWS CloudFormation

To launch the provided CloudFormation template, complete the following steps:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter a name.
  4. Provide the following parameters:
    1. The name of the S3 bucket that holds all the data for the data lake.
    2. The name of the database that holds the inventory tables.
    3. The database user name.
    4. The database password.
  5. Enter any tags you want to assign to the stack and choose Next.
  6. Select the acknowledgement check boxes and choose Create stack.

The stack takes 5–10 minutes to complete.

On the AWS CloudFormation console, you can navigate to the stack’s Outputs tab to review the resources you created.

If you open the S3 bucket you created, you can observe its folder structure. The stack creates sample order data for the last 7 days.

Connect to the inventory database

To connect to your database in the query editor, complete the following steps:

  1. On the Amazon RDS console, choose the Region you deployed the stack in.
  2. In the navigation pane, choose Query Editor.

    If you haven’t connected to this database before, the Connect to database page opens.
  3. For Database instance or cluster, choose your database.
  4. For Database username, choose Connect with a Secrets Manager ARN.
    The database user name and password provided during stack creation are stored in AWS Secrets Manager. Alternatively, you can choose Add new database credentials and enter the database user name and password you provided when creating the stack.
  5. For Secrets Manager ARN, enter the value for the key InventorySecretManager from the CloudFormation stack outputs.
  6. Optionally, enter the name of your database.
  7. Choose Connect to database.

Load the inventory database with tables

Enter the following DDL statement in the query editor and choose Run:

CREATE TABLE INVENTORY (
    ItemID varchar(25) NOT NULL,
    ShipNode varchar(25) NOT NULL,
    SupplyType varchar(25) NOT NULL,
    SupplyDemandType varchar(25) NOT NULL,
    ItemName varchar(25),
    UOM varchar(10),
    Quantity int(11) NOT NULL,
    ETA varchar(25)	 ,
    UpdatedDate DATE,
    PRIMARY KEY (ItemID,ShipNode,SupplyType)
);

Create a VPC endpoint

To create your VPC endpoint, complete the following steps:

  1. On the Amazon VPC console, choose VPC Dashboard.
  2. Choose Endpoints in the navigation pane.
  3. Choose Create Endpoint.
  4. For Service category, select AWS services.
  5. For Service name, search for rds and choose the service name ending with rds-data.
  6. For VPC, choose the default VPC.
  7. Leave the remaining settings at their default and choose Create endpoint.

Create a gateway endpoint for Amazon S3

To create your gateway endpoint, complete the following steps:

  1. On the Amazon VPC console, choose VPC Dashboard.
  2. Choose Endpoints in the navigation pane.
  3. Choose Create Endpoint.
  4. For Service category, select AWS services.
  5. For Service name, search for S3 and choose the service name with type Gateway.
  6. For VPC, choose the default VPC.
  7. For Configure route tables, select the default route table.
  8. Leave the remaining settings at their default and choose Create endpoint.

Wait for both the gateway endpoint and VPC endpoint status to change to Available.

Enable CloudWatch rules to ingest the data

We created two CloudWatch rules via the CloudFormation template to ingest the order and inventory data to Kinesis Data Streams. To enable the rules via EventBridge, complete the following steps:

  1. On the CloudWatch console, under Events in the navigation pane, choose Rules.
  2. Make sure you’re in the Region where you created the stack.
  3. Choose Go to Amazon EventBridge.
  4. Select the rule Ingest-Inventory-Update-Schedule-Rule and choose Enable.
  5. Select the rule Ingest-Order-Schedule-Rule and choose Enable.

After 5–10 minutes, the Lambda functions start ingesting orders and inventory updates to their respective streams. You can check the S3 buckets orders-landing-zone and inventory-landing-zone to confirm that the data is being populated.

Perform data transformation

Our CloudFormation stack included a DataBrew project, a DataBrew job that runs every 5 minutes, and two AWS Glue crawlers. To perform data transformation using our AWS Glue resources, complete the following steps:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Choose the project OrderDataTransform.

    You can review the project and its recipe on this page.
  3. In the navigation pane, choose Jobs.
  4. Review the job status to confirm it’s complete.
  5. On the AWS Glue console, choose Crawlers in the navigation pane.
    The crawlers crawl the transformed data and update the Data Catalog.
  6. Review the status of the two crawlers, which run every 15 minutes.
  7. Choose Tables in the navigation pane to view the two tables the crawlers created.
    If you don’t see these tables, you can run the crawlers manually to create them.

    You can query the data in the tables with Athena.
  8. On the Athena console, choose Query editor.
    If you haven’t created a query result location, you’re prompted to do that first.
  9. Choose View settings or choose the Settings tab.
  10. Choose Manage.
  11. Select the S3 bucket to store the results and choose Choose.
  12. Choose Query editor in the navigation pane.
  13. Choose either table (right-click) and choose Preview Table to view the table contents.

Visualize the data

If you have never used QuickSight in this account before, complete the prerequisite step to sign up for QuickSight. To use the ML capabilities of QuickSight (such as forecasting) sign up for the Enterprise Edition using the steps in this documentation.

While signing up for QuickSight, make sure to use the same region where you created the CloudFormation stack.

Grant QuickSight permissions

To visualize your data, you must first grant relevant permissions to QuickSight to access your data.

  1. On the QuickSight console, on the Admin drop-down menu, choose Manage QuickSight.
  2. In the navigation pane, choose Security & permissions.
  3. Under QuickSight access to AWS services, choose Manage.
  4. Select Amazon Athena.
  5. Select Amazon S3 to edit QuickSight access to your S3 buckets.
  6. Select the bucket you specified during stack creation (for this post, operational-datalake).
  7. Choose Finish.
  8. Choose Save.

Prepare the datasets

To prepare your datasets, complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. Choose Athena.
  4. For Data source name, enter retail-analysis.
  5. Choose Validate connection.
  6. After your connection is validated, choose Create data source.
  7. For Database, choose orderdatalake.
  8. For Tables, select orders_clean.
  9. Choose Edit/Preview data.
  10. For Query mode, select SPICE.
    SPICE (Super-fast, Parallel, In-memory Calculation Engine) is the robust in-memory engine that QuickSight uses.
  11. Choose the orderdatetime field (right-click), choose Change data type, and choose Date.
  12. Enter the date format as MM/dd/yyyy HH:mm:ss.
  13. Choose Validate and Update.
  14. Change the data types of the following fields to QuickSight geospatial data types:
    1. billingaddress.zipcode – Postcode
    2. billingaddress.city – City
    3. billingaddress.country – Country
    4. billingaddress.state – State
    5. shippingaddress.zipcode – Postcode
    6. shippingaddress.city – City
    7. shippingaddress.country – Country
    8. shippingaddress.state – State
  15. Choose Save & publish.
  16. Choose Cancel to exit this page.

    Let’s create another dataset for the Athena table inventory_landing_zone.
  17. Follow steps 1–7 to create a new dataset. For Table selection, choose inventory_landing_zone.
  18. Choose Edit/Preview data.
  19. For Query mode, select SPICE.
  20. Choose Save & publish.
  21. Choose Cancel to exit this page.

    Both datasets should now be listed on the Datasets page.
  22. Choose each dataset and choose Refresh now.
  23. Select Full refresh and choose Refresh.

To set up a scheduled refresh, choose Schedule a refresh and provide your schedule details.

Create an analysis

To create an analysis in QuickSight, complete the following steps:

  1. On the QuickSight console, choose Analyses in the navigation pane.
  2. Choose New analysis.
  3. Choose the orders_clean dataset.
  4. Choose Create analysis.
  5. To adjust the theme, choose Themes in the navigation pane, choose your preferred theme, and choose Apply.
  6. Name the analysis retail-analysis.

Add visualizations to the analysis

Let’s start creating visualizations. The first visualization shows orders created over time.

  1. Choose the empty graph on the dashboard and for Visual type¸ choose the line chart.
    For more information about visual types, see Visual types in Amazon QuickSight.
  2. Under Field wells, drag orderdatetime to X axis and ordernumber to Value.
  3. Set ordernumber to Aggregate: Count distinct.

    Now we can filter these orders by Created status.
  4. Choose Filter in the navigation pane and choose Create one.
  5. Search for and choose status.
  6. Choose the status filter you just created.
  7. Select Created from the filter list and choose Apply.
  8. Choose the graph (right-click) and choose Add forecast.
    The forecasting ability is only available in the Enterprise Edition. QuickSight uses a built-in version of the Random Cut Forest (RCF) algorithm. For more information, refer to Understanding the ML algorithm used by Amazon QuickSight.
  9. Leave the settings as default and choose Apply.
  10. Rename the visualization to “Orders Created Over Time.”

If the forecast is applied successfully, the visualization shows the expected number of orders as well as upper and lower bounds.

If you get the following error message, allow for the data to accumulate for a few days before adding the forecast.

Let’s create a visualization on orders by location.

  1. On the Add menu, choose Add visual.
  2. Choose the points on map visual type.
  3. Under Field wells, drag shippingaddress.zipcode to Geospatial and ordernumber to Size.
  4. Change ordernumber to Aggregate: Count distinct.

    You should now see a map indicating the orders by location.
  5. Rename the visualization accordingly.

    Next, we create a drill-down visualization on the inventory count.
  6. Choose the pencil icon.
  7. Choose Add dataset.
  8. Select the inventory_landing_zone dataset and choose Select.
  9. Choose the inventory_landing_zone dataset.
  10. Add the vertical bar chart visual type.
  11. Under Field wells, drag itemname, shipnode, and invtype to X axis, and quantity to Value.
  12. Make sure that quantity is set to Sum.

    The following screenshot shows an example visualization of order inventory.
  13. To determine how many face masks were shipped out from each ship node, choose Face Masks (right-click) and choose Drill down to shipnode.
  14. You can drill down even further to invtype to see how many face masks in a specific ship node are in which status.

The following screenshot shows this drilled-down inventory count.

As a next step, you can create a QuickSight dashboard from the analysis you created. For instructions, refer to Tutorial: Create an Amazon QuickSight dashboard.

Clean up

To avoid any ongoing charges, on the AWS CloudFormation console, select the stack you created and choose Delete. This deletes all the created resources. On the stack’s Events tab, you can track the progress of the deletion, and wait for the stack status to change to DELETE_COMPLETE.

The Amazon EventBridge rules generate orders and inventory data every 15 minutes, to avoid generating huge amount of data, please ensure to delete the stack after testing the blog.

If the deletion of any resources fails, ensure that you delete them manually. For deleting Amazon QuickSight datasets, you can follow these instructions. You can delete the QuickSight Analysis using these steps. For deleting the QuickSight subscription and closing the account, you can follow these instructions.

Conclusion

In this post, we showed you how to use AWS analytics and storage services to build a serverless operational data lake. Kinesis Data Streams lets you ingest large volumes of data, and DataBrew lets you cleanse and transform the data visually. We also showed you how to analyze and visualize the order and inventory data using AWS Glue, Athena, and QuickSight. For more information and resources for data lakes on AWS, visit Analytics on AWS.


About the Authors

Gandhi Raketla is a Senior Solutions Architect for AWS. He works with AWS customers and partners on cloud adoption, as well as architecting solutions that help customers foster agility and innovation. He specializes in the AWS data analytics domain.

Sindhura Palakodety is a Solutions Architect at AWS. She is passionate about helping customers build enterprise-scale Well-Architected solutions on the AWS Cloud and specializes in the containers and data analytics domains.