Tag Archives: Analytics

Build a real-time analytics solution with Apache Pinot on AWS

Post Syndicated from Raj Ramasubbu original https://aws.amazon.com/blogs/big-data/build-a-real-time-analytics-solution-with-apache-pinot-on-aws/

Online Analytical Processing (OLAP) is crucial in modern data-driven apps, acting as an abstraction layer connecting raw data to users for efficient analysis. It organizes data into user-friendly structures, aligning with shared business definitions, ensuring users can analyze data with ease despite changes. OLAP combines data from various data sources and aggregates and groups them as business terms and KPIs. In essence, it’s the foundation for user-centric data analysis in modern apps, because it’s the layer that translates technical assets into business-friendly terms that enable users to extract actionable insights from data.

Real-time OLAP

Traditionally, OLAP datastores were designed for batch processing to serve internal business reports. The scope of data analytics has grown, and more user personas are now seeking to extract insights themselves. These users often prefer to have direct access to the data and the ability to analyze it independently, without relying solely on scheduled updates or reports provided at fixed intervals. This has led to the emergence of real-time OLAP solutions, which are particularly relevant in the following use cases:

  • User-facing analytics – Incorporating analytics into products or applications that consumers use to gain insights, sometimes referred to as data products.
  • Business metrics – Providing KPIs, scorecards, and business-relevant benchmarks.
  • Anomaly detection – Identifying outliers or unusual behavior patterns.
  • Internal dashboards – Providing analytics that are relevant to stakeholders across the organization for internal use.
  • Queries – Offering subsets of data to users based on their roles and security levels, allowing them to manipulate data according to their specific requirements.

Overview of Apache Pinot

Building these capabilities in real time means that real-time OLAP solutions have stricter SLAs and larger scalability requirements than traditional OLAP datastores. Accordingly, a purpose-built solution is needed to address these new requirements.

Apache Pinot is an open source real-time distributed OLAP datastore designed to meet these requirements, including low latency (tens of milliseconds), high concurrency (hundreds of thousands of queries per second), near real-time data freshness, and handling petabyte-scale data volumes. It ingests data from both streaming and batch sources and organizes it into logical tables distributed across multiple nodes in a Pinot cluster, ensuring scalability.

Pinot provides functionality similar to other modern big data frameworks, supporting SQL queries, upserts, complex joins, and various indexing options.

Pinot has been tested at very large scale in large enterprises, serving over 70 LinkedIn data products, handling over 120,000 Queries Per Second (QPS), ingesting over 1.5 million events per second, and analyzing over 10,000 business metrics across over 50,000 dimensions. A notable use case is the user-facing Uber Eats Restaurant Manager dashboard, serving over 500,000 users with instant insights into restaurant performance.

Pinot clusters are designed for high availability, horizontal scalability, and live configuration changes without impacting performance. To that end, Pinot is architected as a distributed datastore to enable all of the above requirements, and utilizes similar architectural constructs as Apache Kafka and Apache Hadoop in its design.

Solution overview

In this, we will provide a step-by-step guide showing you how you can build a real-time OLAP datastore on Amazon Web Services (AWS) using Apache Pinot on Amazon Elastic Compute Cloud (Amazon EC2) and do near real-time visualization using Tableau. You can use Apache Pinot for batch processing use cases as well but, in this post, we will focus on a near real-time analytics use case.

You can use Amazon Managed Service for Apache Flink service. The objective in the preceding figure is to ingest streaming data into Pinot, where it can perform.

Blog post architecture

The objective in the preceding figure is to ingest streaming data into Pinot, where it can perform aggregations, update current data models, and serve OLAP queries in real time to consuming users and applications, which in this case is a user-facing Tableau dashboard.

The data flow as follows:

  • Data is ingested from a real-time source, such as clickstream data from a website. For the purposes of this post, we will use the Amazon Kinesis Data Generator to simulate the production of events.
  • Events are captured in a streaming storage platform such as or Amazon Managed Streaming for Apache Kafka (MSK) for downstream consumption.
  • The events are then ingested into the real-time server within Apache Pinot, which is used to process data coming from streaming sources, such as MSK and KDS. Apache Pinot consists of logical tables, which are partitioned into segments. Due to the time sensitive nature of streaming, events are directly written into memory as consuming segments, which can be thought of as parts of an active table that are continuously ingesting new data. Consuming segments are available for query processing immediately, thereby enabling low latency and high data freshness.
  • After the segments reach a threshold in terms of time or number of rows, they are moved into Amazon Simple Storage Service (Amazon S3), which serves as deep storage for the Apache Pinot cluster. Deep storage is the permanent location for segment files. Segments used for batch processing are also stored there.
  • In parallel, the Pinot controller tracks the metadata of the cluster and performs actions required to keep the cluster in an ideal state. Its primary function is to orchestrate cluster resources as well as manage connections between resources within the cluster and data sources outside of it. Under the hood, the controller uses Apache Helix to manage cluster state, failover, distribution, and scalability and Apache Zookeeper to handles distributed coordination functions such as leader election, locks, queue management, and state tracking.
  • To enable the distributed aspect of the Pinot architecture, the broker accepts queries from the clients and forwards them to servers and collects the results and sends them back. The broker manages and optimizes the queries, distributes them across the servers, combines the results, and returns the result set. The broker sends the request to the right segments on the right servers, optimizes segment pruning, and splits the queries across servers appropriately. The results of each query are then merged and sent back to the requesting client.
  • The results of the queries are updated in real time in the Tableau dashboard.

To ensure high availability, the solution deploys application load balancers for the brokers and servers. We can access the Apache Pinot UI using the controller load balancer and use it to run queries and monitor the Apache Pinot cluster

Let’s start to deploy this solution and perform near real-time visualizations using Apache Pinot and Tableau.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Deploy the Apache Pinot solution using the AWS CDK

The AWS CDK is an open source project that you can use to define your cloud infrastructure using familiar programming languages. It uses high-level constructs to represent AWS components to simplify the build process. In this post, we use TypeScript and Python to define the cloud infrastructure.

  1. First, bootstrap the AWS CDK. This sets up the resources required by the AWS CDK to deploy into the AWS account. This step is only required if you haven’t used the AWS CDK in the deployment account and Region. The format for the bootstrap command is cdk bootstrap aws://<account-id>/<aws-region>.

In the following example, I’m running a bootstrap command for a fictitious AWS account with ID 123456789000 and us-east-1 N.Virginia Region:

cdk bootstrap aws://123456789000/us-east-1

Bootstrap command

  1. Next, clone the GitHub repository and install all the dependencies from package.json by running the following commands from the root of the cloned repository.
    git clonehttps://github.com/aws-samples/near-realtime-apache-pinot-workshop
    
    cd near-realtime-apache-pinot-workshop
    
    npm i

  2. Deploy the AWS CDK stack to create the AWS Cloud infrastructure by running the following command and enter y when prompted. Enter the IP address that you want to use to access the Apache Pinot controller and broker in /32 subnet mask format.
    cdk deploy --parameters IpAddress="<YOUR-IP-ADDRESS-IN-/32-SUBNET-MASK-FORMAT>"

Deployment of the AWS CDK stack takes approximately 10–12 minutes. You should see a stack deployment message that will display the creation of AWS objects, followed by the deployment time, the Stack ARN, and the total time, similar to the following screenshot:

CDK deployment screenshot

  1. Now, you can get the Apache Pinot controller Application Load Balancer (ALB) DNS name from the Copy the value for ControllerDNSUrl.
  2. Launch a browser session and paste the DNS name to see the Apache Pinot controller—it should look like the following screenshot, where you will see:
    • Number of controllers, brokers, servers, minions, tenants, and tables
    • List of tenants
    • List of controllers
    • List of brokers

Pinot management console

Near real-time visualization using Tableau

Now that we have provisioned all AWS Cloud resources, we will stream some sample web transactions to a Kinesis data stream and visualize the data in near real time from Tableau Desktop.

You can follow these steps to open the Tableau workbook to visualize

  1. Download the Tableau workbook to your local machine and open the workbook from Tableau Desktop.
  2. Get the DNS name for Apache Pinot broker’s Application Load Balancer DNS name from the CloudFormation console. Choose Stacks, select the ApachePinotSolutionStack, and then choose Outputs and copy the value for BrokerDNSUrl.
  3. Choose Edit connection and enter the URL in the following format:
    jdbc:pinot://<Apache-Pinot-Controller-DNS-Name>?brokers=<Apache-Pinot-Broker-DNS-Name>

  4. Enter admin for both the username and password.
  5. Access the KDG tool by following the instructions. Use the record template that follows to send sample web transactions data to Kinesis Data streams called pinot-stream by choosing Send dataas shown in the following screenshot. Stop sending data after sending a handful of records by choosing Stop sending data to Kinesis.
{
"userID" : "{{random.number(
{
"min":1,
"max":100
}
)}}",
"productName" : "{{commerce.productName}}",
"color" : "{{commerce.color}}",
"department" : "{{commerce.department}}",
"product" : "{{commerce.product}}",
"campaign" : "{{random.arrayElement(
["BlackFriday","10Percent","NONE"]
)}}",
"price" : {{random.number(
{   "min":10,
"max":150
}
)}},
"creationTimestamp" : "{{date.now("YYYY-MM-DD hh:mm:ss")}}"
}

Kinesis Data Generator configuration

You should be able to see the web transactions data in Tableau Desktop as shown in the following screenshot.

Clean up

To clean up the AWS resources you created:

  1. Disable termination protection on the following EC2 instances by going to the Amazon EC2 console and choosing Instance from the navigation pane. Choose Actions, Instance Settings, and then Change termination protection and clear the Termination protection checkbox.
    • ApachePinotSolutionStack/bastionHost
    • ApachePinotSolutionStack/zookeeperNode1
    • ApachePinotSolutionStack/zookeeperNode2
    • ApachePinotSolutionStack/zookeeperNode3
  2. Run the following command from the cloned GitHub repo and enter y when prompted.
    cdk destroy

Scaling the solution to production

The example in this post uses minimal resources to demonstrate functionality. Taking this to production requires a higher level of scalability. The solution provides autoscaling policies for independently scaling brokers and servers in and out, allowing the Apache Pinot custer to scale based on CPU requirements.

When autoscaling is initiated, the solution will invoke an AWS Lambda Function, to run the logic needed to add or remove brokers and servers in Apache Pinot.

In Apache Pinot, tables are tagged with an identifier that’s used for routing queries to the appropriate servers. When creating a table, you can specify a table name and optionally tag it. This is useful when you want to route queries to specific servers or build a multi-tenant Apache Pinot cluster. However, tagging adds additional considerations when removing brokers or servers. You need to make sure that neither have any active tables or tags associated with them. And when adding new components, rebalance the segments, so you can use the new brokers and servers.

Therefore, when scaling is needed in the solution, the autoscaling policy will invoke a Lambda function that either rebalances the segments of the tables when you add a new broker or server, or removes any tags associated with the broker or server you remove from the cluster.

Summary

Just like you would commonly use a distributed NoSQL datastore to serve a mobile application that requires low latency, high concurrency, high data freshness, high data volume, and high throughput, a distributed real-time OLAP datastore like Apache Pinot is purpose-built for achieving the same requirements for the analytics workload within your user-facing application. In this post, we walked you through how to deploy a scalable Apache Pinot-based near real-time user facing analytics solution on AWS. If you have any questions or suggestions, write to us in the comments section


About the authors

Raj RamasubbuRaj Ramasubbu is a Senior Analytics Specialist Solutions Architect focused on big data and analytics and AI/ML with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. Raj provided technical expertise and leadership in building data engineering, big data analytics, business intelligence, and data science solutions for over 18 years prior to joining AWS. He helped customers in various industry verticals like healthcare, medical devices, life science, retail, asset management, car insurance, residential REIT, agriculture, title insurance, supply chain, document management, and real estate.

Francisco MorilloFrancisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers, helping them design real-time analytics architectures using AWS services, supporting Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon Managed Service for Apache Flink.

Ismail Makhlouf is a Senior Specialist Solutions Architect for Data Analytics at AWS. Ismail focuses on architecting solutions for organizations across their end-to-end data analytics estate, including batch and real-time streaming, big data, data warehousing, and data lake workloads. He primarily partners with airlines, manufacturers, and retail organizations to support them to achieve their business objectives with well-architected data platforms.

Introducing data products in Amazon DataZone: Simplify discovery and subscription with business use case based grouping

Post Syndicated from Jason Hines original https://aws.amazon.com/blogs/big-data/introducing-data-products-in-amazon-datazone-simplify-discovery-and-subscription-with-business-use-case-based-grouping/

We are excited to announce a new feature in Amazon DataZone that allows data producers to group data assets into well-defined, self-contained packages (data products) tailored for specific business use cases. For example, a marketing analysis data product can bundle various data assets such as marketing campaign data, pipeline data, and customer data. This simplifies the process for data consumers to find datasets, understand their context through shared metadata, and access comprehensive datasets for specific use cases through a single workflow. With the grouping capabilities of data products, data producers can manage and control access to the underlying data assets with just a few steps.

Customers often face challenges in locating and accessing the fragmented data they need, expending time and resources in the process. With Amazon DataZone, they can use data products to enhance data cataloging and subscription processes, aligning these more closely with business objectives while eliminating redundancy in handling individual assets.

In this post, we highlight the key benefits of data products, outline their essential features and workflows, and demonstrate how customers can use these features for easier publishing, discovery, and subscription.

Key benefits of data products

Customers use Amazon DataZone to create data meshes and adopt a culture that emphasizes data as a product. Amazon DataZone facilitates the publication of data assets from diverse sources that are enriched with their business context. It is crucial to organize assets into cohesive units with relational context to maximize the potential of data as a product and drive business use cases.

Amazon DataZone now offers the capability to group data assets with shared metadata into cohesive, business use case based data products, enhancing both the publishing and subscription processes. Data products provide three core benefits that help customers address their business challenges:

  • Simplified discovery – Data consumers can quickly identify interconnected data assets by searching for and finding them as a single unit. This reduces the time and effort required to find all relevant information and lowers the risk of missing important data.
  • Unified access model – Data products simplify access to data with a single request by implementing a unified access model. This eliminates the need for multiple permissions, speeding up the initiation of data analysis.
  • Reduced administrative overhead – By cataloging assets as data product units, data producers reduce administrative overhead by enabling metadata and access control management at the product level rather than individually. This makes access governance and data utilization more efficient, ensuring alignment with business goals and easy accessibility for its intended use. Data governance teams can monitor consumption rates for these data products, providing valuable insights into data literacy maturity.

For example, one of our customers, Natera, uses Amazon DataZone to create tailored datasets for their specific needs. Mirko Buholzer, VP of software engineering at Natera, says

“At Natera, our mission to revolutionize precision medicine depends on managing and leveraging our vast clinical and genomic data. With the Amazon DataZone data products feature, we can create tailored datasets for specific uses like reproductive health, oncology, or organ transplantation. This streamlines data discovery and access for our researchers and data scientists, enabling quick analysis of relevant data. Additionally, it will help physicians and patients gain deeper insights in combination with our clinical tests, ultimately improving patient outcomes.”

With data products, Amazon DataZone now supports business use case based grouping, enhancing data publishing, discovery, and subscription. This feature enables the following capabilities, as shown in the following image:

  • Data product creation and publishing – Producers can create data products by selecting assets from their project’s inventory, setting up shared metadata, and publishing these products to make them discoverable to consumers.
  • Data discovery and subscription – Consumers can search for and subscribe to data product units. Subscription requests are sent within a single workflow to producers for approval. Subscription approval processes, such as approve, reject, and revoke, ensure that access is managed securely. Once approved, access grants for the individual assets within the data product are automatically managed by the system.
  • Data product lifecycle management – Producers have control over the lifecycle of data products, including the ability to edit them and remove them from the catalog. When a producer edits product metadata or adds or removes assets from a data product, they republish it as a new version, and subscriptions are updated without any reapproval.

Solution overview

To demonstrate these capabilities and workflows, consider a use case where a product marketing team wants to drive a campaign on product adoption. To be successful, they need access to sales data, customer data, and review data of similar products. The sales data engineer, acting as the data producer, owns this data and understands the common requests from customers to access these different data assets for sales-related analysis. The data producer’s objective is to group these assets so consumers, such as the product marketing team, can find them together and seamlessly subscribe to perform analysis.

The following high-level implementation steps show how to achieve this use case with data products in Amazon DataZone and are detailed in the following sections.

  1. Data publisher creates and publishes data product
    1. Create data product – The data publisher (the project contributor for the producing project) provides a name and description and adds assets to the data product.
    2. Curate data product – The data publisher adds a readme, glossaries, and metadata forms to the data product.
    3. Publish data product – The data publisher publishes the data product to make it discoverable to consumers.
  2. Data consumer discovers and subscribes to data product
    1. Search data product – The data consumer (the project member of the consuming project) looks for the desired data product in the catalog.
    2. Request subscription – The data consumer submits a request to access the data product.
    3. Data owner approves subscription request – The data owner reviews and approves the subscription request.
    4. Review access approval and grant – The system manages access grants for the underlying assets.
    5. Query subscribed data – The data consumer receives approval and can now access and query the data assets within the subscribed data product.
  3. Data owner maintains lifecycle of data product
    1. Revise data product – The data owner (the project owner for the producing project) updates the data product as needed.
    2. Unpublish data product – The data owner removes the data product from the catalog if necessary.
    3. Delete data product – The data owner permanently deletes the data product if it is no longer needed.
    4. Revoke subscription – The data owner manages subscriptions and revokes access if required.

Prerequisites

To follow along with this post, ensure the publisher of the product sales data asset has ingested individual data assets into Amazon DataZone. In our use case, a data engineer in sales owns the following AWS Glue tables: customers, order_items, orders, products, reviews, and shipments. The data engineer has added a data source to bring these six data assets into the sales producer project inventory, ingesting the metadata in Amazon DataZone. For instructions on ingesting metadata for AWS Glue tables, refer to Create and run an Amazon DataZone data source for the AWS Glue Data Catalog. For Amazon Redshift, see Create and run an Amazon DataZone data source for Amazon Redshift.

On the producer side, a sales product project has been created with a data lake environment. A data source was created to ingest the technical metadata from the AWS Glue salesdb database, which contains the six AWS Glue tables mentioned previously. On the consumer side, a marketing consumer project with a data lake environment has been established.

Data publisher creates and publishes data product

Sign in to Amazon DataZone data portal as a data publisher in the sales producer project. You can now create a data product to group inventory assets relevant to the sales analysis use case. Use the following steps to create and publish a data product, as shown in the following screenshot.

  1. Select DATA in the top ribbon of the Sales Product Project
  2. Select Inventory data in the navigation pane
  3. Choose DATA PRODUCTS to create a data product

Create data product

Follow these steps to create a data product:

  1. Choose Create new data product. Under Details, in the name field, enter “Sales Data Product.” In the description, enter “A data product containing the following 6 assets: Product, Shipments, Order Items, Orders, Customers, and Reviews,” as shown in the following screenshot.
  2. Select Choose assets to add the data assets. Select CHOOSE on the right side next to each of the six data products. Be sure to go to the second page to select the sixth asset. After all are selected, choose the blue CHOOSE button at the bottom of the page, as shown in the following screenshot. Then choose Create to create the data product.

Curate data product

You can curate the sales data product by adding a readme, glossary term, and metadata forms to provide business context to the data product, as shown in the following screenshot.

  1. Choose Add terms under GLOSSARY TERMS. Select a glossary term that you have added to your glossary, for example, Sales. Refer to Create, edit, or delete a business glossary for how to create a business glossary.
  2. Choose Add metadata form to add a form such as a business owner. Refer to Create, edit, or delete metadata forms for how to create a metadata form. In this example, we added Ownership as a metadata form.

Publish data product

Follow these steps to publish a data product.

  1. Once all the necessary business metadata has been added, choose Publish to publish the data product to the business catalog, as shown in the following screenshot.
  2. In the pop-up, choose Publish data product.

The six data assets in the data product will also be published but will only be discoverable through the data product unless published individually. Consumers cannot subscribe to the individual data assets unless they are published and made discoverable in the catalog separately.

Data consumer discovers and subscribes to data product

Now, as the marketing user, inside of the marketing project, you can find and subscribe to the sales data product.

Search data product

Sign in to the Amazon DataZone data portal as a marketing user in the marketing consumer project. In the search bar, enter “sales” or any other metadata that you added to the sales data product.

Once you find the appropriate data product, select it. You can view the metadata added and see which data assets are included in the data product by selecting the DATA ASSETS tab, as shown in the following screenshot.

Request subscription

Choose Subscribe to bring up the Subscribe to Sales Data Product modal. Make sure the project is your consumer project, for example, Marketing Consumer Project. In Reason for request, enter “Running a marketing campaign for the latest sales play.” Choose SUBSCRIBE.

The request will be routed to the sales producer project for approval.

Data owner approves subscription request

Sign in to Amazon DataZone as the project owner for the sales producer project to approve the request. You will see an alert in the task notification bar. Choose the notification icon on the top right to see the notifications, then choose Subscription Request Created, as shown in the following screenshot.

You can also view incoming subscription requests by choosing DATA in the blue ribbon at the top. Then choose Incoming requests in the navigation pane, REQUESTED under Incoming requests, and then View request, as shown in the following screenshot.

On the Subscription request pop-up, you will see who requested access to the Sales Data Product, from which project, the requested date and time, and their reason for requesting it. You can enter a Decision comment and then choose APPROVE.

Review access approval and grant

The marketing consumer is now approved to access the six assets included in the sales data product. Sign in to Amazon DataZone as a marketing user in the marketing consumer project. A new event will appear, showing that the SUBSCRIPTION REQUEST APPROVED has been completed.

You can view this in two different ways. Choose the notification icon on the top right and then EVENTS under Notifications, as shown in the first following screenshot. Alternatively, select DATA in the blue ribbon bar, then Subscribed data, and then Data products, as shown in the second following screenshot.

Choose the Sales Data Product and then Data assets. Amazon DataZone will automatically add the six data assets to the AWS Glue tables that the marketing consumer can use. Wait until you see that all six assets have been added to one environment, as shown in the following screenshot, before proceeding.

Query subscribed data

Once you complete the previous step, return to the main page of the marketing consumer project by choosing Marketing Consumer Project in the top left pull-down project selector, then choose OVERVIEW. The data can now be consumed through the Amazon Athena deep link on the right side. Choose Query data to open Athena, as shown in the following screenshot. In the Open Amazon Athena window, choose Open Amazon Athena.

A new window will open where the marketing consumer has been federated into the role that Amazon DataZone uses for granting permissions to the marketing consumer project data lake environment. The workgroup defaults to the appropriate workgroup that Amazon DataZone manages. Make sure that the Database under Data is the sub_db for the marketing consumer data lake environment. There will be six tables listed that correspond to the original six data assets added to the sales data product. Run your query. In this case, we used a query that looked for the top five best-selling products, as shown in the following code snippet and screenshot.

SELECT p.product_name, SUM(oi.quantity) AS total_quantity FROM order_items oi JOIN products p ON oi.product_id = p.product_idGROUP BY p.product_nameORDER BY total_quantity DESC 
LIMIT 5;

Data owner maintains lifecycle of data product

Follow these steps to maintain the lifecycle of the data product.

Revise data product

The data owner updates the data product, which includes editing metadata and adding or removing assets as needed. For detailed instructions, refer to Republish data products.

The sales data engineer has been tasked with removing one of the assets, the reviews table, from the sales data product.

  1. Open the SALES PRODUCER PROJECT by selecting it from the top project selector.
  2. Select DATA in the top ribbon.
  3. Select Published data in the navigation pane.
  4. Choose DATA PRODUCTS on the right side.
  5. Choose Sales Data Product.

The following screenshot shows these steps.

Once in the data product, the data engineer can add and remove metadata or assets. In To change any of the assets in the data product, follow these steps, as shown in the following screenshot.

  1. Select ASSETS in Sales Data Product.
  2. Select any of the assets. For this example, we remove the Reviews
  3. Select the three dots on the right side.
  4. Select Remove asset.
  5. A pop-up will appear confirming that you want to remove the asset. Choose Remove. The Reviews asset will now have a status of Removing asset: This asset is still available to subscribers.
  6. Republish the data product to remove access to this asset from all subscribers. Choose REPUBLISH and REPUBLISH DATA PRODUCT in the pop-up.
  7. To confirm the asset has been removed, sign in to the marketing project as the consumer. Open the Amazon Athena deep link on the OVERVIEW After selecting the sub_db associated with the marketing consumer data lake environment, only five tables are visible because the Reviews table was removed from the data product, as shown in the following screenshot.

The consumer doesn’t have to take any action after a data product has been republished. If the data engineer had changed any of the business metadata, such as by adding a metadata form, updating the readme, or adding glossary terms and republishing, the consumer would see those changes reflected when viewing the data product under the subscribed data.

Unpublish data product

The data owner removes the data product from the catalog, making it no longer discoverable to the organization. You can choose to retain existing subscription access for the underlying assets. For detailed instructions, refer to refer to Unpublish data product.

Delete data product

The data owner permanently deletes the data product if it is no longer needed. Before deletion, you need to revoke all subscriptions. This action will not delete the underlying data assets. For detailed instructions, refer to Delete Data Product.

Revoke subscription

The data owner manages subscriptions and may revoke a subscription after it has been approved. For detailed instructions, refer to Revoke subscription.

Cleanup

To ensure no additional charges are incurred after testing, be sure to delete the Amazon DataZone domain. Refer to Delete domains for the process.

Conclusion

Data products are crucial for improving decision-making accuracy and speed in modern businesses. Beyond making raw data available, they offer strategic packaging, curation, and discoverability. Data products help customers address the difficulty of locating and accessing fragmented data, which reduces the time and resources needed to perform this important task.

Amazon DataZone already facilitates data cataloging from various sources. Building on this capability, this new feature streamlines data utilization by bundling data into purpose-built data products aligned with business goals. As a result, customers can unlock the full potential of their data.

The feature is supported in all the AWS commercial Regions where Amazon DataZone is currently available. To get started, check out the Working with data products.


About the authors

Jason Hines is a Senior Solutions Architect, at AWS, specializing in serving global customers in the Healthcare and Life Sciences industries. With over 25 years of experience, he has worked with numerous Fortune 100 companies across multiple verticals, bringing a wealth of knowledge and expertise to his role. Outside of work, Jason has a passion for an active lifestyle. He enjoys various outdoor activities such as hiking, scuba diving, and exploring nature. Maintaining a healthy work-life balance is essential to him.

Ramesh H Singh is a Senior Product Manager Technical (External Services) at AWS in Seattle, Washington, currently with the Amazon DataZone team. He is passionate about building high-performance ML/AI and analytics products that enable enterprise customers to achieve their critical goals using cutting-edge technology. Connect with him on LinkedIn.

Leonardo Gomez is a Principal Analytics Specialist Solutions Architect at AWS. He has over a decade of experience in data management, helping customers around the globe address their business and technical needs. Connect with him on LinkedIn.

Improve Apache Kafka scalability and resiliency using Amazon MSK tiered storage

Post Syndicated from Sai Maddali original https://aws.amazon.com/blogs/big-data/improve-apache-kafka-scalability-and-resiliency-using-amazon-msk-tiered-storage/

Since the launch of tiered storage for Amazon Managed Streaming for Apache Kafka (Amazon MSK), customers have embraced this feature for its ability to optimize storage costs and improve performance. In previous posts, we explored the inner workings of Kafka, maximized the potential of Amazon MSK, and delved into the intricacies of Amazon MSK tiered storage. In this post, we deep dive into how tiered storage helps with faster broker recovery and quicker partition migrations, facilitating faster load balancing and broker scaling.

Apache Kafka availability

Apache Kafka is a distributed log service designed to provide high availability and fault tolerance. At its core, Kafka employs several mechanisms to provide reliable data delivery and resilience against failures:

  • Kafka replication – Kafka organizes data into topics, which are further divided into partitions. Each partition is replicated across multiple brokers, with one broker acting as the leader and the others as followers. If the leader broker fails, one of the follower brokers is automatically elected as the new leader, providing continuous data availability. The replication factor determines the number of replicas for each partition. Kafka maintains a list of in-sync replicas (ISRs) for each partition, which are the replicas that are up to date with the leader.
  • Producer acknowledgments – Kafka producers can specify the required acknowledgment level for write operations. This makes sure the data is durably persisted on the configured number of replicas before the producer receives an acknowledgment, reducing the risk of data loss.
  • Consumer group rebalancing – Kafka consumers are organized into consumer groups, where each consumer in the group is responsible for consuming a subset of the partitions. If a consumer fails, the partitions it was consuming are automatically reassigned to the remaining consumers in the group, providing continuous data consumption.
  • Zookeeper or KRaft for cluster coordination – Kafka relies on Apache ZooKeeper or KRaft for cluster coordination and metadata management. It maintains information about brokers, topics, partitions, and consumer offsets, enabling Kafka to recover from failures and maintain a consistent state across the cluster.

Kafka’s storage architecture and its impact on availability and resiliency

Although Kafka provides robust fault-tolerance mechanisms, in the traditional Kafka architecture, brokers store data locally on their attached storage volumes. This tight coupling of storage and compute resources can lead to several issues, impacting availability and resiliency of the cluster:

  • Slow broker recovery – When a broker fails, the recovery process involves transferring data from the remaining replicas to the new broker. This data transfer can be slow, especially for large data volumes, leading to prolonged periods of reduced availability and increased recovery times.
  • Inefficient load balancing – Load balancing in Kafka involves moving partitions between brokers to distribute the load evenly. However, this process can be resource-intensive and time-consuming, because it requires transferring large amounts of data between brokers.
  • Scaling limitations – Scaling a Kafka cluster traditionally involves adding new brokers and rebalancing partitions across the expanded set of brokers. This process can be disruptive and time-consuming, especially for large clusters with high data volumes.

How Amazon MSK tiered storage improves availability and resiliency

Amazon MSK offers tiered storage, a feature that allows configuring local and remote tiers. This greatly decouples compute and storage resources and thereby addresses the aforementioned challenges, improving availability and resiliency of Kafka clusters. You can benefit from the following:

  • Faster broker recovery – With tiered storage, data automatically moves from the faster Amazon Elastic Block Store (Amazon EBS) volumes to the more cost-effective storage tier over time. New messages are initially written to Amazon EBS for fast performance. Based on your local data retention policy, Amazon MSK transparently transitions that data to tiered storage. This frees up space on the EBS volumes for new messages. When broker fails and recovers either due to node or volume failure, the catch-up is faster because it only needs to catch up data stored on the local tier from the leader.
  • Efficient load balancing – Load balancing in Amazon MSK with tiered storage is more efficient because there is less data to move while reassigning partition. This process is faster and less resource-intensive, enabling more frequent and seamless load balancing operations.
  • Faster scaling – Scaling an MSK cluster with tiered storage is a seamless process. New brokers can be added to the cluster without the need for a large amount of data transfer and longer time for partition rebalancing. The new brokers can start serving traffic much faster, because the catch-up process takes less time, improving the overall cluster throughput and reducing downtime during scaling operations.

As shown in the following figure, MSK brokers and EBS volumes are tightly coupled. On a three-AZ deployed cluster, when you create a topic with replication factor three, Amazon MSK spreads those three replicas across all three Availability Zones and the EBS volumes attached with that broker store all the topic data spread across three Availability Zones. If you need to move a partition from one broker to another, Amazon MSK needs to move all the segments (both active and closed) from the existing broker to the new brokers, as illustrated in the following figure.

However, when you enable tiered storage for that topic, Amazon MSK transparently moves all closed segments for a topic from EBS volumes to tiered storage. That storage provides the built-in capability for durability and high availability with virtually unlimited storage capacity. With closed segments moved to tiered storage and only active segments on the local volume, your local storage footprint remains minimal regardless of topic size. If you need to move the partition to a new broker, the data movement is very minimal across the brokers. The following figure illustrates this updated configuration.

Amazon MSK tiered storage addresses the challenges posed by Kafka’s traditional storage architecture, enabling faster broker recovery, efficient load balancing, and seamless scaling, thereby improving availability and resiliency of your cluster. To learn more about the core components of Amazon MSK tiered storage, refer to Deep dive on Amazon MSK tiered storage.

A real-world test

We hope that you now understand how Amazon MSK tiered storage can improve your Kafka resiliency and availability. To test it, we created a three-node cluster with the new m7g instance type. We created a topic with a replication factor of three and without using tiered storage. Using the Kafka performance tool, we ingested 300 GB of data into the topic. Next, we added three new brokers to the cluster. Because Amazon MSK doesn’t automatically move partitions to these three new brokers, they will remain idle until we rebalance the partitions across all six brokers.

Let’s consider a scenario where we need to move all the partitions from the existing three brokers to the three new brokers. We used the kafka-reassign-partitions tool to move the partitions from the existing three brokers to the newly added three brokers. During this partition movement operation, we observed that the CPU usage was high, even though we weren’t performing any other operations on the cluster. This indicates that the high CPU usage was due to the data replication to the new brokers. As shown in the following metrics, the partition movement operation from broker 1 to broker 2 took approximately 75 minutes to complete.

Additionally, during this period, CPU utilization was elevated.

After completing the test, we enabled tiered storage on the topic with local.retention.ms=3600000 (1 hour) and retention.ms=31536000000. We continuously monitored the RemoteCopyBytesPerSec metrics to determine when the data migration to tiered storage was complete. After 6 hours, we observed zero activity on the RemoteCopyBytesPerSec metrics, indicating that all closed segments had been successfully moved to tiered storage. For instructions to enable tiered storage on an existing topic, refer to Enabling and disabling tiered storage on an existing topic.

We then performed the same test again, moving partitions to three empty brokers. This time, the partition movement operation was completed in just under 15 minutes, with no noticeable CPU usage, as shown in the following metrics. This is because, with tiered storage enabled, all the data has already been moved to the tiered storage, and we only have the active segment in the EBS volume. The partition movement operation is only moving the small active segment, which is why it takes less time and minimal CPU to complete the operation.

Conclusion

In this post, we explored how Amazon MSK tiered storage can significantly improve the scalability and resilience of Kafka. By automatically moving older data to the cost-effective tiered storage, Amazon MSK reduces the amount of data that needs to be managed on the local EBS volumes. This dramatically improves the speed and efficiency of critical Kafka operations like broker recovery, leader election, and partition reassignment. As demonstrated in the test scenario, enabling tiered storage reduced the time taken to move partitions between brokers from 75 minutes to just under 15 minutes, with minimal CPU impact. This enhanced the responsiveness and self-healing ability of the Kafka cluster, which is crucial for maintaining reliable, high-performance operations, even as data volumes continue to grow.

If you’re running Kafka and facing challenges with scalability or resilience, we highly recommend using Amazon MSK with the tiered storage feature. By taking advantage of this powerful capability, you can unlock the true scalability of Kafka and make sure your mission-critical applications can keep pace with ever-increasing data demands.

To get started, refer to Enabling and disabling tiered storage on an existing topic. Additionally, check out Automated deployment template of Cruise Control for Amazon MSK for effortlessly rebalancing your workload.


About the Authors

Sai Maddali is a Senior Manager Product Management at AWS who leads the product team for Amazon MSK. He is passionate about understanding customer needs, and using technology to deliver services that empowers customers to build innovative applications. Besides work, he enjoys traveling, cooking, and running.

Nagarjuna Koduru is a Principal Engineer in AWS, currently working for AWS Managed Streaming For Kafka (MSK). He led the teams that built MSK Serverless and MSK Tiered storage products. He previously led the team in Amazon JustWalkOut (JWO) that is responsible for real time tracking of shopper locations in the store. He played pivotal role in scaling the stateful stream processing infrastructure to support larger store formats and reducing the overall cost of the system. He has keen interest in stream processing, messaging and distributed storage infrastructure.

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Synchronize data lakes with CDC-based UPSERT using open table format, AWS Glue, and Amazon MSK

Post Syndicated from Shubham Purwar original https://aws.amazon.com/blogs/big-data/synchronize-data-lakes-with-cdc-based-upsert-using-open-table-format-aws-glue-and-amazon-msk/

In the current industry landscape, data lakes have become a cornerstone of modern data architecture, serving as repositories for vast amounts of structured and unstructured data. Change data capture (CDC) refers to the process of identifying and capturing changes made to data in a database and then delivering those changes in a downstream system. Capturing every change from transactions in a source database and moving them to the target keeps the systems synchronized, and helps with analytics use cases and zero-downtime database migrations.

However, efficiently managing and synchronizing data within these lakes presents a significant challenge. Maintaining data consistency and integrity across distributed data lakes is crucial for decision-making and analytics. Inaccurate or outdated data can lead to flawed insights and business decisions. Businesses require synchronized data to gain actionable insights and respond swiftly to changing market conditions. Scalability is a critical concern for data lakes, because they need to accommodate growing volumes of data without compromising performance or incurring exorbitant costs.

To address these issues effectively, we propose using Amazon Managed Streaming for Apache Kafka (Amazon MSK), a fully managed Apache Kafka service that offers a seamless way to ingest and process streaming data. We use MSK connect—an AWS managed service to deploy and run Kafka Connect to build an end-to-end CDC application that uses Debezium MySQL connector to process, insert, update, and delete records from MySQL and a confluent Amazon Simple Storage Service (Amazon S3) sink connector to write to Amazon S3 as raw data that can be consumed by other downstream application for further use cases. To process batch data effectively, we use AWS Glue, a serverless data integration service that uses the Spark framework to process the data from S3 and copies the data to the open table format layer. Open table format manages large collections of files as tables and supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. We chose Delta Lake as an example open table format, but you can achieve the same results using Apache Iceberg or Apache Hudi.

The post illustrates the construction of a comprehensive CDC system, enabling the processing of CDC data sourced from Amazon Relational Database Service (Amazon RDS) for MySQL. Initially, we’re creating a raw data lake of all modified records in the database in near real time using Amazon MSK and writing to Amazon S3 as raw data. This raw data can then be used to build a data warehouse or even a special type of data storage that’s optimized for analytics, such as a Delta Lake on S3. Later, we use an AWS Glue exchange, transform, and load (ETL) job for batch processing of CDC data from the S3 raw data lake. A key advantage of this setup is that you have complete control over the entire process, from capturing the changes in your database to transforming the data for your specific needs. This flexibility allows you to adapt the system to different use cases.

This is achieved through integration with MSK Connect using the Debezium MySQL connector, followed by writing data to Amazon S3 facilitated by the Confluent S3 Sink Connector. Subsequently, the data is processed from S3 using an AWS Glue ETL job, and then stored in the data lake layer. Finally, the Delta Lake table is queried using Amazon Athena.

Note: If you require real-time data processing of the CDC data, you can bypass the batch approach and use an AWS Glue streaming job instead. This job would directly connect to the Kafka topic in MSK, grabbing the data as soon as changes occur. It can then process and transform the data as needed, creating a Delta Lake on Amazon S3 that reflects the latest updates according to your business needs. This approach ensures you have the most up-to-date data available for real-time analytics.

Solution overview

The following diagram illustrates the architecture that you implement through this blog post. Each number represents a major component of the solution.

The workflow consists of the following:

  1. Near real-time data capture from MySQL and streaming to Amazon S3
    1. The process starts with data originating from Amazon RDS for
    2. A Debezium connector is used to capture changes to the data in the RDS instance in near real time. Debezium is a distributed platform that converts information from your existing databases into event streams, enabling applications to detect and immediately respond to row-level changes in the databases. Debezium is built on top of Apache Kafka and provides a set of Kafka Connect compatible connectors.
    3. The captured data changes are then streamed to an Amazon MSK topic. MSK is a managed service that simplifies running Apache Kafka on AWS.
    4. The processed data stream (topic) is streamed from MSK to Amazon S3 in JSON format. The Confluent S3 Sink Connector allows near real-time data transfer from an MSK cluster to an S3 bucket.
  2. Batch processing the CDC raw data and writing it into the data lake
    1. Set up an AWS Glue ETL job to process the raw CDC
    2. This job reads bookmarked data from an S3 raw bucket and writes into the data lake in open file format (Delta). The job also creates the Delta Lake table in AWS Glue Data Catalog.
    3. Delta Lake is an open-source storage layer built on top of existing data lakes. It adds functionalities like ACID transactions and versioning to improve data reliability and manageability.
  3. Analyze the data using serverless interactive query service
    1. Athena, a serverless interactive query service, can be used to query the Delta Lake table created in Glue Data Catalog. This allows for interactive data analysis without managing infrastructure.

For this post, we create the solution resources in the us-east-1 AWS Region using AWS CloudFormation templates. In the following sections, we show you how to configure your resources and implement the solution.

Configure resources with AWS CloudFormation

In this post, you use the following two CloudFormation templates. The advantage of using two different templates is that you can decouple the resource creation of the CDC pipeline and AWS Glue processing according to your use case, and if you have requirements to create specific process resources only.

  1. vpc-msk-mskconnect-rds-client.yaml – This template sets up the CDC pipeline resources such as a virtual private cloud (VPC), subnet, security group, AWS Identity and Access Management (IAM) roles, NAT, internet gateway, Amazon Elastic Compute Cloud (Amazon EC2) client, Amazon MSK, MSKConnect, RDS, and S3
  2. gluejob-setup.yaml – This template sets up the data processing resources such as the AWS Glue table, database and ETL

Configure MSK and MSK connect

To start, you’ll configure MKS and MSK connect using Debezium connector to capture incremental changes in table and write into Amazon S3 using an S3 sink connector. The vpc-msk-mskconnect-rds-client.yaml stack creates a VPC, private and public subnets, security groups, S3 buckets, Amazon MSK cluster, EC2 instance with Kafka client, RDS database, and MSK connectors, and its worker configurations.

  1. Launch the stack vpc-msk-mskconnect-rds-client using the CloudFormation template:
    BDB-4100-CFN-Launch-Stack
  2. Provide the parameter values as listed in the following
. A B C
1 Parameters Description Sample value
2 EnvironmentName An environment name that is prefixed to resource names. msk-delta-cdc-pipeline
3 DatabasePassword Database admin account password. S3cretPwd99
4 InstanceType MSK client EC2 instance type. t2.micro
5 LatestAmiId Latest AMI ID of Amazon Linux 2023 for EC2 instance. You can use the default value. /aws/service/ami-amazon-linux- latest/al2023-ami-kernel-6.1-x86_64
6 VpcCIDR IP range (CIDR notation) for this VPC. 10.192.0.0/16
7 PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone. 10.192.10.0/24
8 PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone. 10.192.11.0/24
9 PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone. 10.192.20.0/24
10 PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone. 10.192.21.0/24
11 PrivateSubnet3CIDR IP range (CIDR notation) for the private subnet in the third Availability Zone. 10.192.22.0/24
  1. The stack creation process can take approximately one hour to complete. Check the Outputs tab for the stack after the stack is created.

Next, you set up the AWS Glue data processing resources such as the AWS Glue database, table, and ETL job.

Implement UPSERT on an S3 data lake with Delta Lake using AWS Glue

The gluejob-setup.yaml CloudFormation template creates a database, IAM role, and AWS Glue ETL job. Retrieve the values for S3BucketNameForOutput, and S3BucketNameForScript from the vpc-msk-mskconnect-rds-client stack’s Outputs tab to use in this template. Complete the following steps:

  1. Launch the stack gluejob-setup.
    Launch Cloudformation Stack
  2. Provide parameter values as listed in the following
. A B C
1 Parameters Description Sample value
2 EnvironmentName Environment name that is prefixed to resource names. gluejob-setup
3 GlueDataBaseName Name of the Data Catalog database. glue_cdc_blog_db
4 GlueTableName Name of the Data Catalog table. blog_cdc_tbl
5 S3BucketForGlueScript Bucket name for the AWS Glue ETL script. Use the S3 bucket name from the previous stack. For example, aws- gluescript-${AWS::AccountId}-${AWS::Region}-${EnvironmentNam e
6 GlueWorkerType Worker type for AWS Glue job. For example, G.1X G.1X
7 NumberOfWorkers Number of workers in the AWS Glue job. 3
8 S3BucketForOutput Bucket name for writing data from the AWS Glue job. aws-glueoutput-${AWS::AccountId}-${AWS::Region}-${EnvironmentName}
9 S3ConnectorTargetBucketname Bucket name where the Amazon MSK S3 sink connector writes the data from the Kafka topic. msk-lab-${AWS::AccountId}- target-bucket
  1. The stack creation process can take approximately 2 minutes to complete. Check the Outputs tab for the stack after the stack is created.

In the gluejob-setup stack, we created an AWS Glue database and AWS Glue job. For further clarity, you can examine the AWS Glue database and job generated using the CloudFormation template.

After successfully creating the CloudFormation stack, you can proceed with processing data using the AWS Glue ETL job.

Run the AWS Glue ETL job

To process the data created in the S3 bucket from Amazon MSK using the AWS Glue ETL job that you set up in the previous section, complete the following steps:

  1. On the CloudFormation console, choose the stack gluejob-setup.
  2. On the Outputs tab, retrieve the name of the AWS Glue ETL job from the GlueJobName In the following screenshot, the name is GlueCDCJob-glue-delta-cdc.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Search for the AWS Glue ETL job named GlueCDCJob-glue-delta-cdc.
  3. Choose the job name to open its details page.
  4. Choose Run to start the On the Runs tab, confirm if the job ran without failure.

  1. Retrieve the OutputBucketName from the gluejob-setup template output.
  2. On the Amazon S3 console, navigate to the S3 bucket to verify the data.

Note: We have enabled AWS Glue job bookmark, which will make sure job will process the new data in each job run.

Query the Delta Lake table using Athena

After the AWS Glue ETL job has successfully created the Delta Lake table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, navigate to the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database glue_cdc_blog_db created using gluejob-setup stack.
  4. To validate the data, run the following query to preview the data and find the total count.
SELECT * FROM "glue_cdc_blog_db"."blog_cdc_tbl" ORDER BY cust_id DESC LIMIT 40;
SELECT COUNT(*) FROM "glue_cdc_blog_db"."blog_cdc_tbl";

The following screenshot shows the output of our example query.

Upload incremental (CDC) data for further processing

After we process the initial full load, let’s perform insert, update, and delete records in MySQL, which will be processed by the Debezium mysql connector and written to Amazon S3 using a confluent S3 sink connector.

  1. On the Amazon EC2 console, go to the EC2 instance named KafkaClientInstance that you created using the CloudFormation template.

  1. Sign in to the EC2 instance using SSM. Select KafkaClientInstance and then choose Connect.

  1. Run the following commands to insert the data into the RDS table. Use the database password from the CloudFormation stack parameter tab.
sudo su - ec2-user
RDS_AURORA_ENDPOINT=`aws rds describe-db-instances --region us-east-1 | jq -r '.DBInstances[] | select(.DBName == "salesdb") | .Endpoint.Address'`
mysql -f -u master -h $RDS_AURORA_ENDPOINT  --password
  1. Now perform the insert into the CUSTOMER table.
use salesdb;
INSERT into CUSTOMER values(8887,'Customer Name 8887','Market segment 8887');
INSERT into CUSTOMER values(8888,'Customer Name 8888','Market segment 8888');
INSERT into CUSTOMER values(8889,'Customer Name 8889','Market segment 8889');

  1. Run the AWS Glue job again to update the Delta Lake table with new records.
  2. Use the Athena console to validate the data.
  3. Perform the insert, update, and delete in the CUSTOMER table.
    UPDATE CUSTOMER SET NAME='Customer Name update 8888',MKTSEGMENT='Market segment update 8888' where CUST_ID = 8888;
    UPDATE CUSTOMER SET NAME='Customer Name update 8889',MKTSEGMENT='Market segment update 8889' where CUST_ID = 8889;
    DELETE FROM CUSTOMER where CUST_ID = 8887;
    INSERT into CUSTOMER values(9000,'Customer Name 9000','Market segment 9000');
    

  4. Run the AWS Glue job again to update the Delta Lake table with the insert, update, and delete records.
  5. Use the Athena console to validate the data to verify the update and delete records in the Delta Lake table.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack gluejob-setup.
  2. Delete the CloudFormation stack vpc-msk-mskconnect-rds-client.

Conclusion

Organizations continually seek high-performance, cost-effective, and scalable analytical solutions to extract value from their operational data sources in near real time. The analytical platform must be capable of receiving updates to operational data as they happen. Traditional data lake solutions often struggle with managing changes in source data, but the Delta Lake framework addresses this challenge. This post illustrates the process of constructing an end-to-end change data capture (CDC) application using Amazon MSK, MSK Connect, AWS Glue, and native Delta Lake tables, alongside guidance on querying Delta Lake tables from Amazon Athena. This architectural pattern can be adapted to other data sources employing various Kafka connectors, enabling the creation of data lakes supporting UPSERT operations using AWS Glue and native Delta Lake tables. For further insights, see the MSK Connect examples.


About the authors

Shubham Purwar is a Cloud Engineer (ETL) at AWS Bengaluru specializing in AWS Glue and Athena. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar is a Cloud Engineer (ETL) at AWS, specializing in AWS Glue. With a decade of experience, he excels in aiding customers with their big data workloads, focusing on data processing and analytics. He is committed to helping customers overcome ETL challenges and develop scalable data processing and analytics pipelines on AWS. In his free time, he likes to watch movies and spend time with his family.

Plan your advertising campaigns with Amazon Marketing Cloud on AWS Clean Rooms, now generally available

Post Syndicated from Veliswa Boya original https://aws.amazon.com/blogs/aws/plan-your-advertising-campaigns-with-amazon-marketing-cloud-on-aws-clean-rooms-now-generally-available/

Today, we are announcing the general availability of Amazon Marketing Cloud (AMC) on AWS Clean Rooms to help advertisers use their first-party signals to collaborate with Amazon Ads unique signals. With this collaboration, advertisers can generate differentiated insights, discover new audiences, and enable advertising campaign planning, activation, and measurement use cases, all without having to move their underlying signals outside of their AWS account. With AMC on AWS Clean Rooms, customers can easily prepare their data, match and create audiences, use custom insights to activate more relevant advertising campaigns with Amazon Ads, and measure return on ad spend. All of this can be accomplished from the most secure cloud computing environment available today.

Advertisers continually strive to reach new audiences and deliver relevant, marketing campaigns to better engage their customers. Yet, the advertising and marketing landscape is undergoing a fundamental shift with signal loss and fragmentation. As such, advertisers and their partners need to collaborate together using signals that are stored across many applications to personalize their advertising campaigns. However, to work with one another to gather insights, companies typically need to share a copy of their signals with their partners, which is often not aligned with their data governance, security and privacy, IT, and legal teams’ policies. As a result, many businesses miss opportunities to fully maximize the value of their first-party signals and improve planning, activation, and measurement outcomes for their campaigns.

AMC on AWS Clean Rooms makes it easier and scalable for advertisers to use their first-party signals with Amazon Ads, including collaborating across event-level signals and modeling unique audiences to help improve media planning, activation, and outcomes without having to move underlying signals outside their cloud environment.

AMC on AWS Clean Rooms prerequisites (environment setup)
To get started with AMC on AWS Clean Rooms, the advertiser needs an AWS account and a dataset that contains user population and event-level data stored in open data formats (CSV, Parquet, or Iceberg) in an Amazon Simple Storage Service (Amazon S3) bucket. The next step is to send an email to the Amazon Ads team to request the creation of an AMC instance. Once an instance has been created, the Amazon Ads team will create an AWS Clean Rooms collaboration and invite the advertiser to join the collaboration.

How it works
1. Join an AWS Clean Rooms collaboration and create an ID namespace.
2. Configure and associate tables to an AMC collaboration.
3. Run an ID mapping workflow to create and populate the ID mapping table.
4. Run a query in AMC.

Walkthrough

1. Join an AWS Clean Rooms collaboration and create an ID namespace.
The advertiser will accept the collaboration invite by creating a membership in their AWS account. Once in the collaboration, the advertiser will access the AWS Clean Rooms console and then select the AWS Entity Resolution ID namespace generated when the collaboration was created to start the process of using their data for matching and collaboration in AWS Clean Rooms. Next, specify the AWS Glue table and the associated schema mapping and choose the S3 bucket in the same AWS Region as the collaboration for temporarily storing your data while it processes. Lastly, the advertiser will provide permissions to read your data input from AWS Glue and write to Amazon S3 on their behalf.

In the AirportLink collaboration shown in the following screenshot, the advertiser (member AirportLink2) accepts a collaboration invite sent by member AirportLink1.


2. Configure and associate tables to an AMC collaboration.
After joining the collaboration, the advertiser will create configured tables on their purchase data, add custom analysis rule, and associate the configured table to the collaboration.



Within the collaboration, the advertiser will set up a collaboration analysis rule to control which party can receive the result of a query run on the associated table.


3. Run an ID mapping workflow to create and populate the ID mapping table.
Now that the ID namespace is associated with the collaboration, the Amazon Ads team will create an ID mapping table in the AWS Clean Rooms console. This step requires both the advertiser (source) and the Amazon Ads team (target) to associate their ID namespace resources to the collaboration. Amazon Ads will provide the methods of mapping and configuration, add the details for querying to name the ID mapping table, and provide permission for AWS Clean Rooms to execute and track the ID mapping workflow job on their behalf. Finally, the Amazon Ads team will select Create and Populate to start the mapping workflow and generate an ID mapping table that captures a common user cohort, who were matched on the rules provided in Step 2.

4. Run a query in AMC.
Advertisers can either use templates or write a SQL query to run for analysis and get query results for further insights. They can run the SQL query in the following ways:

  • Run a SQL query with AMC data and the advertiser’s data that return the results to the advertiser’s S3 bucket using aggregate analysis. An example query is “How many of the customers who are registered for my email list saw the ads I’m running on Amazon?”
  • Run a SQL query to create an audience on the advertiser’s data or overlap with AMC signals that returns results to the S3 bucket of Amazon Ads. An example query is to generate an audience to target in an ad campaign.
  • Run an AWS Clean Rooms ML lookalike modeling job where Amazon Ads contributes the configured model and the advertiser contributes a seed audience. The resulting segment (list of user ad IDs) is sent to Amazon Ads.


After running the query, the advertiser can create an audience using a rule-based audience or a similar audience by navigating to the Audience tab in AMC. The output of the audience query will be sent directly to Amazon Demand Side Platform (DSP). The following table shows the options available to you when creating the audience:

If you want to
Then
Use pre-built audience templates Select Create with instructional query from the dropdown list
Create custom audience queries Select Create new query from the dropdown list

When creating a new query, the advertiser will configure various options such as name, description, and date adjustments. Additionally, the advertiser can choose from the two following audience types:

Rule-based audience – Create audience-based on the audience query.
Similar audience – Create machine learning (ML) based audiences based on the seed audience outputs from the audience query.

Now available
AMC on AWS Clean Rooms is available in in the US East (N. Virginia) Region. Be sure to check the full Region list for future updates. Learn more about AMC on AWS Clean Rooms in the AWS documentation.

Give it a try by emailing the Amazon Ads team to get started and send feedback to the AWS re:Post for AWS Clean Rooms or through your usual AWS Support contacts.

Veliswa

AWS and Multicloud: Existing capabilities & continued enhancements

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/aws-and-multicloud-existing-capabilities-continued-enhancements/

When I speak to large-scale AWS customers about their challenges and concerns, the conversation often turns to the topic of multicloud. Whether by intent or by accident, these customers sometimes choose to make use of services from more than one cloud provider, sometimes in conjunction with applications or services that are still hosted on-premises. In some cases they made early, bottom-up choices at the team and division level, choosing cloud offerings from multiple vendors in the absence of a top-down mandate. In others, they acquired or merged with another organization and discovered a similar multi-vendor situation.

Regardless of the path, these customers tell me that they want to simplify and centralize their oversight and management of this diverse portfolio of cloud and on-premises resources. It is sometimes the case that the “multi” situation is time-bound, with a plan in place to ultimately consolidate operations in one place. It is also sometimes the case that the customer plans to retain their diverse portfolio.

AWS and multicloud
Our goal with AWS is to make you successful no matter what architectural choices you have made. In this post I want to outline our approach, share some capabilities that our customers have been using over the years, and provide you with an update on some of the more recent service announcements and content that we have created to give you guidance that will help you to succeed.

Our approach is to extend existing AWS operational and management capabilities to work in multicloud and hybrid environments. Because we extend existing capabilities, your investment in training, development, scripting, and runbooks is preserved, and actually becomes even more worthwhile since it applies to your other (non-AWS) resources. For example, you can use the same service (AWS Systems Manager) to patch and update Amazon Elastic Compute Cloud (Amazon EC2) instances, servers running on-premises, and servers provided by other cloud providers. Similarly, you can use Amazon CloudWatch to monitor applications, compute resources, and other cloud resources in all of those environments. These are two examples of how we are putting our approach into practice for you.

The AWS Solutions for Hybrid and Multicloud page contains additional examples of our extension-based approach to adding new capabilities, along some success stories from customers who have put the capabilities to use including Phillips 66 and Deutsche Börse.

Whether you choose to operate entirely on AWS or in multicloud and hybrid environments, one of the primary reasons to adopt AWS is the broad choice of services we offer, enabling you to innovate, build, deploy, and monitor your workloads. Just as we recently launched free data transfer out to the internet (DTO) when you want to move outside of AWS, we are committed to helping you be successful regardless of your approach.

Now that I have explained our approach and highlighted some of the principal multicloud service offerings, let’s take a look at a few of the newest multicloud and hybrid capabilities.

Multicloud launches
Since the beginning of 2023 we have launched eighteen new multicloud capabilities to existing AWS services, including 15 for data & analytics, 1 for security, and 2 for identity. Many of these launches add to the existing multicloud capabilities of the respective services:

AWS DataSync – This service transfers data between storage services. In addition to existing support for Google Cloud Storage, Azure Files, and Azure Blob Storage, we added support for five additional cloud service providers and storage services including Oracle Cloud Storage and DigitalOcean Spaces (full list). To learn more about this service, read What is AWS DataSync. To get started, I create a source location:

AWS Glue – This data integration service helps you to discover, prepare, and integrate all of your data at any scale. You can use it to connect to more than 80 different data sources, including cloud databases and analytics services. In October 2023, we introduced additional new connectors that allow you to move data bidirectionally between Amazon Simple Storage Service (Amazon S3), and either Azure Blob Storage or Azure Data Lake Storage (full list). We also launched six database connectors for AWS Glue for Apache Spark, including Teradata, SAP HANA, Azure SQL, Azure Cosmos DB, Vertica, and MongoDB (full list). To learn more about AWS Glue, read What is AWS Glue. I create a visual job flow to get started:

Amazon Athena – This serverless analytics service lets you use interactive SQL queries to analyze petabyte-scale data where it lives (more than 25 external data sources, including other cloud data stores), without copying or transforming it. Last year we added a new data source connector that allows you to query data in Google Cloud Storage. To learn more about Amazon Athena, read What is Amazon Athena.

Amazon AppFlow – You can take advantage of data and analytics in Google BigQuery using a connector available in Amazon AppFlow. To get started with Amazon AppFlow I create a flow and configure a data source:

Amazon Security Lake – This service helps you to achieve a more complete, organization-wide view of your security posture. It centralizes security data from your AWS environments, SaaS providers, on-premises environments, and cloud sources (Azure and GCP) into a purpose-built data lake. It became generally available last year, and now supports collection and analysis of security data from sources that support the Open Cybersecurity Schema Framework (OCSF) standard—more than 80 sources (full list).

AWS Secrets Manager – This service centrally manages secrets such as database credentials and API keys. Secrets are securely encrypted and can be centrally audited, with support for replication to support disaster recovery and multi-region applications. Last year we announced that you can Use AWS Secrets Manager to store and manage secrets in on-premises or multicloud workloads. To learn more, read What is AWS Secrets Manager.

AWS Identity and Access Management (IAM) – AWS IAM Identity Center now supports automated user provisioning from Google Workspace. The integration helps administrators simplify AWS access management across multiple accounts while maintaining familiar Google Workspace experiences for end users as they sign in.

Amazon CloudWatch – This service lets you query, visualize, and alarm on metrics of all sorts: application, AWS, on-premises, and multicloud. At re:Invent 2023 we added even more support for consolidation of hybrid, multicloud, and on-premises metrics. This new feature allows you to select and configure connectors that pull data from Amazon Managed Service for Prometheus, generic Prometheus, Amazon OpenSearch Service, Amazon RDS for MySQL, Amazon RDS for PostgreSQL, CSV files stored in Amazon Simple Storage Service (Amazon S3), and Microsoft Azure Monitor.

Multicloud content and guidance
Now that you know about some of our latest multicloud launches, let’s take a look at some of the blog posts and other content that my colleagues have created.

First, some blog posts:

Next, some of the most popular multicloud videos from AWS re:Invent 2023:

And finally, be sure to bookmark the AWS Solutions for Hybrid and Multicloud page.

We’re here to help
If you are running in a multicloud environment and are ready to simplify and centralize, be sure to reach out to your AWS Account Manager (AM) or Technical Account Manager (TAM). Both will be happy to help!

Jeff;

Get started with the new Amazon DataZone enhancements for Amazon Redshift

Post Syndicated from Carmen Manzulli original https://aws.amazon.com/blogs/big-data/get-started-with-the-new-amazon-datazone-enhancements-for-amazon-redshift/

In today’s data-driven landscape, organizations are seeking ways to streamline their data management processes and unlock the full potential of their data assets, while controlling access and enforcing governance. That’s why we introduced Amazon DataZone.

Amazon DataZone is a powerful data management service that empowers data engineers, data scientists, product managers, analysts, and business users to seamlessly catalog, discover, analyze, and govern data across organizational boundaries, AWS accounts, data lakes, and data warehouses.

On March 21, 2024, Amazon DataZone introduced several exciting enhancements to its Amazon Redshift integration that simplify the process of publishing and subscribing to data warehouse assets like tables and views, while enabling Amazon Redshift customers to take advantage of the data management and governance capabilities or Amazon DataZone.

These updates empower the experience for both data users and administrators.

Data producers and consumers can now quickly create data warehouse environments using preconfigured credentials and connection parameters provided by their Amazon DataZone administrators.

Additionally, these enhancements grant administrators greater control over who can access and use the resources within their AWS accounts and Redshift clusters, and for what purpose.

As an administrator, you can now create parameter sets on top of DefaultDataWarehouseBlueprint by providing parameters such as cluster, database, and an AWS secret. You can use these parameter sets to create environment profiles and authorize Amazon DataZone projects to use these environment profiles for creating environments.

In turn, data producers and data consumers can now select an environment profile to create environments without having to provide the parameters themselves, saving time and reducing the risk of issues.

In this post, we explain how you can use these enhancements to the Amazon Redshift integration to publish your Redshift tables to the Amazon DataZone data catalog, and enable users across the organization to discover and access them in a self-service fashion. We present a sample end-to-end customer workflow that covers the core functionalities of Amazon DataZone, and include a step-by-step guide of how you can implement this workflow.

The same workflow is available as video demonstration on the Amazon DataZone official YouTube channel.

Solution overview

To get started with the new Amazon Redshift integration enhancements, consider the following scenario:

  • A sales team acts as the data producer, owning and publishing product sales data (a single table in a Redshift cluster called catalog_sales)
  • A marketing team acts as the data consumer, needing access to the sales data in order to analyze it and build product adoption campaigns

At a high level, the steps we walk you through in the following sections include tasks for the Amazon DataZone administrator, Sales team, and Marketing team.

Prerequisites

For the workflow described in this post, we assume a single AWS account, a single AWS Region, and a single AWS Identity and Access Management (IAM) user, who will act as Amazon DataZone administrator, Sales team (producer), and Marketing team (consumer).

To follow along, you need an AWS account. If you don’t have an account, you can create one.

In addition, you must have the following resources configured in your account:

  • An Amazon DataZone domain with admin, sales, and marketing projects
  • A Redshift namespace and workgroup

If you don’t have these resources already configured, you can create them by deploying an AWS CloudFormation stack:

  1. Choose Launch Stack to deploy the provided CloudFormation template.
  2. For AdminUserPassword, enter a password, and take note of this password to use in later steps.
  3. Leave the remaining settings as default.
  4. Select I acknowledge that AWS CloudFormation might create IAM resources, then choose Submit.
  5. When the stack deployment is complete, on the Amazon DataZone console, choose View domains in the navigation pane to see the new created Amazon DataZone domain.
  6. On the Amazon Redshift Serverless console, in the navigation pane, choose Workgroup configuration and see the new created resource.

You should be logged in using the same role that you used to deploy the CloudFormation stack and verify that you’re in the same Region.

As a final prerequisite, you need to create a catalog_sales table in the default Redshift database (dev).

  1. On the Amazon Redshift Serverless console, selected your workgroup and choose Query data to open the Amazon Redshift query editor.
  2. In the query editor, choose your workgroup and select Database user name and password as the type of connection, then provide your admin database user name and password.
  3. Use the following query to create the catalog_sales table, which the Sales team will publish in the workflow:
    CREATE TABLE catalog_sales AS 
    SELECT 146776932 AS order_number, 23 AS quantity, 23.4 AS wholesale_cost, 45.0 as list_price, 43.0 as sales_price, 2.0 as discount, 12 as ship_mode_sk,13 as warehouse_sk, 23 as item_sk, 34 as catalog_page_sk, 232 as ship_customer_sk, 4556 as bill_customer_sk
    UNION ALL SELECT 46776931, 24, 24.4, 46, 44, 1, 14, 15, 24, 35, 222, 4551
    UNION ALL SELECT 46777394, 42, 43.4, 60, 50, 10, 30, 20, 27, 43, 241, 4565
    UNION ALL SELECT 46777831, 33, 40.4, 51, 46, 15, 16, 26, 33, 40, 234, 4563
    UNION ALL SELECT 46779160, 29, 26.4, 50, 61, 8, 31, 15, 36, 40, 242, 4562
    UNION ALL SELECT 46778595, 43, 28.4, 49, 47, 7, 28, 22, 27, 43, 224, 4555
    UNION ALL SELECT 46779482, 34, 33.4, 64, 44, 10, 17, 27, 43, 52, 222, 4556
    UNION ALL SELECT 46779650, 39, 37.4, 51, 62, 13, 31, 25, 31, 52, 224, 4551
    UNION ALL SELECT 46780524, 33, 40.4, 60, 53, 18, 32, 31, 31, 39, 232, 4563
    UNION ALL SELECT 46780634, 39, 35.4, 46, 44, 16, 33, 19, 31, 52, 242, 4557
    UNION ALL SELECT 46781887, 24, 30.4, 54, 62, 13, 18, 29, 24, 52, 223, 4561

Now you’re ready to get started with the new Amazon Redshift integration enhancements.

Amazon DataZone administrator tasks

As the Amazon DataZone administrator, you perform the following tasks:

  1. Configure the DefaultDataWarehouseBlueprint.
    • Authorize the Amazon DataZone admin project to use the blueprint to create environment profiles.
    • Create a parameter set on top of DefaultDataWarehouseBlueprint by providing parameters such as cluster, database, and AWS secret.
  2. Set up environment profiles for the Sales and Marketing teams.

Configure the DefaultDataWarehouseBlueprint

Amazon DataZone blueprints define what AWS tools and services are provisioned to be used within an Amazon DataZone environment. Enabling the data warehouse blueprint will allow data consumers and data producers to use Amazon Redshift and the Query Editor for data sharing, accessing, and consuming.

  1. On the Amazon DataZone console, choose View domains in the navigation pane.
  2. Choose your Amazon DataZone domain.
  3. Choose Default Data Warehouse.

If you used the CloudFormation template, the blueprint is already enabled.

Part of the new Amazon Redshift experience involves the Managing projects and Parameter sets tabs. The Managing projects tab lists the projects that are allowed to create environment profiles using the data warehouse blueprint. By default, this is set to all projects. For our purpose, let’s grant only the admin project.

  1. On the Managing projects tab, choose Edit.

  1. Select Restrict to only managing projects and choose the AdminPRJ project.
  2. Choose Save changes.

With this enhancement, the administrator can control which projects can use default blueprints in their account to create environment profile

The Parameter sets tab lists parameters that you can create on top of DefaultDataWarehouseBlueprint by providing parameters such as Redshift cluster or Redshift Serverless workgroup name, database name, and the credentials that allow Amazon DataZone to connect to your cluster or workgroup. You can also create AWS secrets on the Amazon DataZone console. Before these enhancements, AWS secrets had to be managed separately using AWS Secrets Manager, making sure to include the proper tags (key-value) for Amazon Redshift Serverless.

For our scenario, we need to create a parameter set to connect a Redshift Serverless workgroup containing sales data.

  1. On the Parameter sets tab, choose Create parameter set.
  2. Enter a name and optional description for the parameter set.
  3. Choose the Region containing the resource you want to connect to (for example, our workgroup is in us-east-1).
  4. In the Environment parameters section, select Amazon Redshift Serverless.

If you already have an AWS secret with credentials to your Redshift Serverless workgroup, you can provide the existing AWS secret ARN. In this case, the secret must be tagged with the following (key-value): AmazonDataZoneDomain: <Amazon DataZone domain ID>.

  1. Because we don’t have an existing AWS secret, we create a new one by choosing Create new AWS Secret.
  2. In the pop-up, enter a secret name and your Amazon Redshift credentials, then choose Create new AWS Secret.

Amazon DataZone creates a new secret using Secrets Manager and makes sure the secret is tagged with the domain in which you’re creating the parameter set.

  1. Enter the Redshift Serverless workgroup name and database name to complete the parameters list. If you used the provided CloudFormation template, use sales-workgroup for the workgroup name and dev for the database name.
  2. Choose Create parameter set.

You can see the parameter set created for your Redshift environment and the blueprint enabled with a single managing project configured.

 

Set up environment profiles for the Sales and Marketing teams

Environment profiles are predefined templates that encapsulate technical details required to create an environment, such as the AWS account, Region, and resources and tools to be added to projects. The next Amazon DataZone administrator task consists of setting up environment profiles, based on the default enabled blueprint, for the Sales and Marketing teams.

This task will be performed from the admin project in the Amazon DataZone data portal, so let’s follow the data portal URL and start creating an environment profile for the Sales team to publish their data.

  1. On the details page of your Amazon DataZone domain, in the Summary section, choose the link for your data portal URL.

When you open the data portal for the first time, you’re prompted to create a project. If you used the provided CloudFormation template, the projects are already created.

  1. Choose the AdminPRJ project.
  2. On the Environments page, choose Create environment profile.
  3. Enter a name (for example, SalesEnvProfile) and optional description (for example, Sales DWH Environment Profile) for the new environment profile.
  4. For Owner, choose AdminPRJ.
  5. For Blueprint, select the DefaultDataWarehouse blueprint (you’ll only see blueprints where the admin project is listed as a managing project).
  6. Choose the current enabled account and the parameter set you previously created.

Then you will see each pre-compiled value for Redshift Serverless. Under Authorized projects, you can pick the authorized projects allowed to use this environment profile to create an environment. By default, this is set to All projects.

  1. Select Authorized projects only.
  2. Choose Add projects and choose the SalesPRJ project.
  3. Configure the publishing permissions for this environment profile. Because the Sales team is our data producer, we select Publish from any schema.
  4. Choose Create environment profile.

Next, you create a second environment profile for the Marketing team to consume data. To do this, you repeat similar steps made for the Sales team.

  1. Choose the AdminPRJ project.
  2. On the Environments page, choose Create environment profile.
  3. Enter a name (for example, MarketingEnvProfile) and optional description (for example, Marketing DWH Environment Profile).
  4. For Owner, choose AdminPRJ.
  5. For Blueprint, select the DefaultDataWarehouse blueprint.
  6. Select the parameter set you created earlier.
  7. This time, keep All projects as the default (alternatively, you could select Authorized projects only and add MarketingPRJ).
  8. Configure the publishing permissions for this environment profile. Because the Marketing team is our data consumer, we select Don’t allow publishing.
  9. Choose Create environment profile.

With these two environment profiles in place, the Sales and Marketing teams can start working on their projects on their own to create their proper environments (resources and tools) with fewer configurations and less risk to incur errors, and publish and consume data securely and efficiently within these environments.

To recap, the new enhancements offer the following features:

  • When creating an environment profile, you can choose to provide your own Amazon Redshift parameters or use one of the parameter sets from the blueprint configuration. If you choose to use the parameter set created in the blueprint configuration, the AWS secret only requires the AmazonDataZoneDomain tag (the AmazonDataZoneProject tag is only required if you choose to provide your own parameter sets in the environment profile).
  • In the environment profile, you can specify a list of authorized projects, so that only authorized projects can use this environment profile to create data warehouse environments.
  • You can also specify what data authorized projects are allowed to be published. You can choose one of the following options: Publish from any schema, Publish from the default environment schema, and Don’t allow publishing.

These enhancements grant administrators more control over Amazon DataZone resources and projects and facilitate the common activities of all roles involved.

Sales team tasks

As a data producer, the Sales team performs the following tasks:

  1. Create a sales environment.
  2. Create a data source.
  3. Publish sales data to the Amazon DataZone data catalog.

Create a sales environment

Now that you have an environment profile, you need to create an environment in order to work with data and analytics tools in this project.

  1. Choose the SalesPRJ project.
  2. On the Environments page, choose Create environment.
  3. Enter a name (for example, SalesDwhEnv) and optional description (for example, Environment DWH for Sales) for the new environment.
  4. For Environment profile, choose SalesEnvProfile.

Data producers can now select an environment profile to create environments, without the need to provide their own Amazon Redshift parameters. The AWS secret, Region, workgroup, and database are ported over to the environment from the environment profile, streamlining and simplifying the experience for Amazon DataZone users.

  1. Review your data warehouse parameters to confirm everything is correct.
  2. Choose Create environment.

The environment will be automatically provisioned by Amazon DataZone with the preconfigured credentials and connection parameters, allowing the Sales team to publish Amazon Redshift tables seamlessly.

Create a data source

Now, let’s create a new data source for our sales data.

  1. Choose the SalesPRJ project.
  2. On the Data page, choose Create data source.
  3. Enter a name (for example, SalesDataSource) and optional description.
  4. For Data source type, select Amazon Redshift.
  5. For Environment¸ choose SalesDevEnv.
  6. For Redshift credentials, you can use the same credentials you provided during environment creation, because you’re still using the same Redshift Serverless workgroup.
  7. Under Data Selection, enter the schema name where your data is located (for example, public) and then specify a table selection criterion (for example, *).

Here, the * indicates that this data source will bring into Amazon DataZone all the technical metadata from the database tables of your schema (in this case, a single table called catalog_sales).

  1. Choose Next.

On the next page, automated metadata generation is enabled. This means that Amazon DataZone will automatically generate the business names of the table and columns for that asset. 

  1. Leave the settings as default and choose Next.
  2. For Run preference, select when to run the data source. Amazon DataZone can automatically publish these assets to the data catalog, but let’s select Run on demand so we can curate the metadata before publishing.
  3. Choose Next.
  4. Review all settings and choose Create data source.
  5. After the data source has been created, you can manually pull technical metadata from the Redshift Serverless workgroup by choosing Run.

When the data source has finished running, you can see the catalog_sales asset correctly added to the inventory.

Publish sales data to the Amazon DataZone data catalog

Open the catalog_sales asset to see details of the new asset (business metadata, technical metadata, and so on).

In a real-world scenario, this pre-publishing phase is when you can enrich the asset providing more business context and information, such as a readme, glossaries, or metadata forms. For example, you can start accepting some metadata automatically generated recommendations and rename the asset or its columns in order to make them more readable, descriptive, and easy to search and understand from a business user.

For this post, simply choose Publish asset to complete the Sales team tasks.

Marketing team tasks

Let’s switch to the Marketing team and subscribe to the catalog_sales asset published by the Sales team. As a consumer team, the Marketing team will complete the following tasks:

  1. Create a marketing environment.
  2. Discover and subscribe to sales data.
  3. Query the data in Amazon Redshift.

Create a marketing environment

To subscribe and access Amazon DataZone assets, the Marketing team needs to create an environment.

  1. Choose the MarketingPRJ project.
  2. On the Environments page, choose Create environment.
  3. Enter a name (for example, MarketingDwhEnv) and optional description (for example, Environment DWH for Marketing).
  4. For Environment profile, choose MarketingEnvProfile.

As with data producers, data consumers can also benefit from a pre-configured profile (created and managed by the administrator) in order to speed up the environment creation process, avoiding mistakes and reducing risks of errors.

  1. Review your data warehouse parameters to confirm everything is correct.
  2. Choose Create environment.

Discover and subscribe to sales data

Now that we have a consumer environment, let’s search the catalog_sales table in the Amazon DataZone data catalog.

  1. Enter sales in the search bar.
  2. Choose the catalog_sales table.
  3. Choose Subscribe.
  4. In the pop-up window, choose your marketing consumer project, provide a reason for the subscription request, and choose Subscribe.

When you get a subscription request as a data producer, Amazon DataZone will notify you through a task in the sales producer project. Because you’re acting as both subscriber and publisher here, you will see a notification.

  1. Choose the notification, which will open the subscription request.

You can see details including which project has requested access, who is the requestor, and why access is needed.

  1. To approve, enter a message for approval and choose Approve.

Now that subscription has been approved, let’s go back to the MarketingPRJ. On the Subscribed data page, catalog_sales is listed as an approved asset, but access hasn’t been granted yet. If we choose the asset, you can see that Amazon DataZone is working on the backend to automatically grant the access. When it’s complete, you’ll see the subscription as granted and the message “Asset added to 1 environment.”

Query data in Amazon Redshift

Now that the marketing project has access to the sales data, we can use the Amazon Redshift Query Editor V2 to analyze the sales data.

  1. Under MarketingPRJ, go to the Environments page and select the marketing environment.
  2. Under the analytics tools, choose Query data with Amazon Redshift, which redirects you to the query editor within the environment of the project.
  3. To connect to Amazon Redshift, choose your workgroup and select Federated user as the connection type.

When you’re connected, you will see the catalog_sales table under the public schema.

  1. To make sure that you have access to this table, run the following query:
SELECT * FROM catalog_sales LIMIT 10

As a consumer, you’re now able to explore data and create reports, or you can aggregate data and create new assets to publish in Amazon DataZone, becoming a producer of a new data product to share with other users and departments.

Clean up

To clean up your resources, complete the following steps:

  1. On the Amazon DataZone console, delete the projects used in this post. This will delete most project-related objects like data assets and environments.
  2. Clean up all Amazon Redshift resources (workgroup and namespace) to avoid incurring additional charges.

Conclusion

In this post, we demonstrated how you can get started with the new Amazon Redshift integration in Amazon DataZone. We showed how to streamline the experience for data producers and consumers and how to grant administrators control over data resources.

Embrace these enhancements and unlock the full potential of Amazon DataZone and Amazon Redshift for your data management needs.

Resources

For more information, refer to the following resources:

 


About the author

Carmen is a Solutions Architect at AWS, based in Milan (Italy). She is a Data Lover that enjoys helping companies in the adoption of Cloud technologies, especially with Data Analytics and Data Governance. Outside of work, she is a creative people who loves being in contact with nature and sometimes practicing adrenaline activities.

Monitoring Apache Iceberg metadata layer using AWS Lambda, AWS Glue, and AWS CloudWatch

Post Syndicated from Michael Greenshtein original https://aws.amazon.com/blogs/big-data/monitoring-apache-iceberg-metadata-layer-using-aws-lambda-aws-glue-and-aws-cloudwatch/

In the era of big data, data lakes have emerged as a cornerstone for storing vast amounts of raw data in its native format. They support structured, semi-structured, and unstructured data, offering a flexible and scalable environment for data ingestion from multiple sources. Data lakes provide a unified repository for organizations to store and use large volumes of data. This enables more informed decision-making and innovative insights through various analytics and machine learning applications.

Despite their advantages, traditional data lake architectures often grapple with challenges such as understanding deviations from the most optimal state of the table over time, identifying issues in data pipelines, and monitoring a large number of tables. As data volumes grow, the complexity of maintaining operational excellence also increases. Monitoring and tracking issues in the data management lifecycle are essential for achieving operational excellence in data lakes.

This is where Apache Iceberg comes into play, offering a new approach to data lake management. Apache Iceberg is an open table format designed specifically to improve the performance, reliability, and scalability of data lakes. It addresses many of the shortcomings of traditional data lakes by providing features such as ACID transactions, schema evolution, row-level updates and deletes, and time travel.

In this blog post, we’ll discuss how the metadata layer of Apache Iceberg can be used to make data lakes more efficient. You will learn about an open-source solution that can collect important metrics from the Iceberg metadata layer. Based on collected metrics, we will provide recommendations on how to improve the efficiency of Iceberg tables. Additionally, you will learn how to use Amazon CloudWatch anomaly detection feature to detect ingestion issues.

Deep dive into Iceberg’s Metadata layer

Before diving into a solution, let’s understand how the Apache Iceberg metadata layer works. The Iceberg metadata layer provides an open specification instructing integrated big data engines such as Spark or Trino how to run read and write operations and how to resolve concurrency issues. It’s crucial for maintaining inter-operability between different engines. It stores detailed information about tables such as schema, partitioning, and file organization in versioned JSON and Avro files. This ensures that each change is tracked and reversible, enhancing data governance and auditability.

Apache Iceberg metadata layer architecture diagram

History and versioning: Iceberg’s versioning feature captures every change in table metadata as immutable snapshots, facilitating data integrity, historical views, and rollbacks.

File organization and snapshot management: Metadata closely manages data files, detailing file paths, formats, and partitions, supporting multiple file formats like Parquet, Avro, and ORC. This organization helps with efficient data retrieval through predicate pushdown, minimizing unnecessary data scans. Snapshot management allows concurrent data operations without interference, maintaining data consistency across transactions.

In addition to its core metadata management capabilities, Apache Iceberg also provides specialized metadata tables—snapshots, files, and partitions—that provide deeper insights and control over data management processes. These tables are dynamically generated and provide a live view of the metadata for query purposes, facilitating advanced data operations:

  • Snapshots table: This table lists all snapshots of a table, including snapshot IDs, timestamps, and operation types. It enables users to track changes over time and manage version history effectively.
  • Files table: The files table provides detailed information on each file in the table, including file paths, sizes, and partition values. It is essential for optimizing read and write performance.
  • Partitions table: This table shows how data is partitioned across different files and provides statistics for each partition, which is crucial for understanding and optimizing data distribution.

Metadata tables enhance Iceberg’s functionality by making metadata queries straightforward and efficient. Using these tables, data teams can gain precise control over data snapshots, file management, and partition strategies, further improving data system reliability and performance.

Before you get started

The next section describes a packaged open source solution using Apache Iceberg’s metadata layer and AWS services to enhance monitoring across your Iceberg tables.

Before we deep dive into the suggested solution, let’s mention Iceberg MetricsReporter, which is a native way to emit metrics for Apache Iceberg. It supports two types of reports: one for commits and one for scans. The default output is log based. It produces log files as a result of commit or scan operations. To submit metrics to CloudWatch or any other monitoring tool, users need to create and configure a custom MetricsReporter implementation. MetricsReporter is supported in Apache Iceberg v1.1.0 and later versions, and customers who want to use it must enable it through Spark configuration on their existing pipelines.

The following is deployed independently and doesn’t require any configuration changes to existing data pipelines. It can immediately start monitoring all the tables within the AWS account and AWS Region where it’s deployed. This solution introduces an additional latency of metrics arrival between 20 and 80 seconds compared to MetricsReporter but offers seamless integration without the need for custom configurations or changes to current workflows.

Solution overview

This solution is specifically designed for customers who run Apache Iceberg on Amazon Simple Storage Service (Amazon S3) and use AWS Glue as their data catalog.

Solution architecture diagram

Key features

This solution uses an AWS Lambda deployment package to collect metrics from Apache Iceberg tables. The metrics are then submitted to CloudWatch where you can create metrics visualizations to help recognize trends and anomalies over time.

The solution is designed to be lightweight, focusing on collecting metrics directly from the Iceberg metadata layer without scanning the actual data layer. This approach significantly reduces the compute capacity required, making it efficient and cost-effective. Key features of the solution include:

  • Time-series metrics collection: The solution monitors Iceberg tables continuously to identify trends and detect anomalies in data ingestion rates, partition skewness, and more.
  • Event-driven architecture: The solution uses Amazon EventBridge to launch a Lambda function when the state of an AWS Glue Data Catalog table changes. This ensures real-time metrics collection every time a transaction is committed to an Iceberg table.
  • Efficient data retrieval: Incorporates minimal compute resources by utilizing AWS Glue interactive sessions and the pyiceberg library to directly access Iceberg metadata tables such as snapshots, partitions, and files.

Metrics tracked

As of the blog release date, the solution collects over 25 metrics. These metrics are categorized into several groups:

  • Snapshot metrics: Include total and changes in data files, delete files, records added or removed, and size changes.
  • Partition and file metrics: Aggregated and per-partition metrics like average, maximum, minimum record counts and file sizes, which help in understanding data distribution and help optimizing storage.

To see the complete list of metrics, go to the GitHub repository.

Visualizing data with CloudWatch dashboards

The solution also provides a sample CloudWatch dashboard to visualize the collected metrics. Metrics visualization is important for real-time monitoring and detecting operational issues. The provided helper script simplifies the set up and deployment of the dashboard.

Amazon CloudWatch dashboard

You can go to the GitHub repository to learn more about how to deploy the solution in your AWS account.

What are the vital metrics for Apache Iceberg tables?

This section discusses specific metrics from Iceberg’s metadata and explains why they’re important for monitoring data quality and system performance. The metrics are broken down into three parts: insight, challenge, and action. This provides a clear path for practical application. In this section, we provide only a subset of the available metrics that the solution can collect, for a complete list, see the solution Github page.

1. snapshot.added_data_files, snapshot.added_records

  • Metric insight: The number of data files and number of records added to the table during the last transaction. The ingestion rate measures the speed at which new data is added to the data lake. This metric helps identify bottlenecks or inefficiencies in data pipelines, guiding capacity planning and scalability decisions.
  • Challenge: A sudden drop in the ingestion rate can indicate failures in data ingestion pipelines, source system outages, configuration errors or traffic spikes.
  • Action: Teams need to establish real-time monitoring and alert systems to detect drops in ingestion rates promptly, allowing quick investigations and resolutions.

2. files.avg_record_count, files.avg_file_size

  • Metric insight: These metrics provide insights into the distribution and storage efficiency of the table. Small file sizes might suggest excessive fragmentation.
  • Challenge: Excessively small file sizes can indicate inefficient data storage leading to increased read operations and higher I/O costs.
  • Action: Implementing regular data compaction processes helps consolidate small files, optimizing storage and enhancing content delivery speeds as demonstrated by a streaming service. Data Catalog offers automatic compaction of Apache Iceberg tables. To learn more about compacting Apache Iceberg tables, see Enable compaction in Working with tables on the AWS Glue console.

3. partitions.skew_record_count, partitions.skew_file_count

  • Metric insight: The metrics indicate the asymmetry of the data distribution across the available table partitions. A skewness value of zero, or very close to zero, suggests that the data is balanced. Positive or negative skewness values might indicate a problem.
  • Challenge: Imbalances in data distribution across partitions can lead to inefficiencies and slow query responses.
  • Action: Regularly analyze data distribution metrics to adjust partitioning configuration. Apache Iceberg allows you to transform partitions dynamically, which enables optimization of table partitioning as query patterns or data volumes change, without impacting your existing data.

4. snapshot.deleted_records, snapshot.total_delete_files, snapshot.added_position_deletes

  • Metric insight: Deletion metrics in Apache Iceberg provide important information on the volume and nature of data deletions within a table. These metrics help track how often data is removed or updated, which is essential for managing data lifecycle and compliance with data retention policies.
  • Challenge: High values in these metrics can indicate excessive deletions or updates, which might lead to fragmentation and decreased query performance.
  • Action: To address these challenges, run compaction periodically to ensure deleted rows do not persist in new files. Regularly review and adjust data retention policies and consider expiring old snapshots to keep only necessary amount of data files. You can run compaction operation on specific partitions using Amazon Athena Optimize

Effective monitoring is essential for making informed decisions about necessary maintenance actions for Apache Iceberg tables. Determining the right timing for these actions is crucial. Implementing timely preventative maintenance ensures high operational efficiency of the data lake and helps to address potential issues before they become significant problems.

Using Amazon CloudWatch for anomaly detection and alerts

This section assumes that you have completed the solution setup and collected operational metrics from your Apache Iceberg tables into Amazon CloudWatch.

Now you can start setting up some alerts and detect anomalies.

We guide you on setting up the anomaly detection and configuring alerts in CloudWatch to monitor the snapshot.added_records metric, which indicates the ingestion rate of data written into an Apache Iceberg table.

Set up anomaly detection

CloudWatch anomaly detection applies machine learning algorithms to continuously analyze system metrics, determine normal baselines, and identify items that are outside of the established patterns. Here is how you configure it:

Amazon CloudWatch anomaly detection screenshot

  1. Select Metrics: In the AWS Management Console for Cloudwatch, go to the Metrics  tab and search for and select snapshot.added_records.
  2. Create anomaly detection models: Choose the Graphed metrics tab and click the Pulse icon to enable anomaly detection.
  3. Set Sensitivity: The second parameter of the ANOMALY_DETECTION_BAND (m1, 5) is to adjust the sensitivity of the anomaly detection. The goal is to balance detecting real issues and reducing false positives.

Configure alerts

After the anomaly detection model is set up, set up an alert to notify operations teams about potential issues:

  1. Create alarm: Choose the bell icon under Actions on the same Graphed metrics tab.
  2. Alarm settings: Set the alarm to notify the operations team when the snapshot.added_records metric is outside the anomaly detection band for two consecutive periods. This helps reduce the risk of false alerts.
  3. Alarm actions: Configure CloudWatch to send an alarm email to the operations team. In addition to sending emails, CloudWatch alarm actions can automatically launch remediation processes, such as scaling operations or initiating data compaction.

Best practices

  • Regularly review and adjust models: As data patterns evolve, periodically review and adjust anomaly detection models and alarm settings to remain effective.
  • Comprehensive coverage: Ensure that all critical aspects of the data pipeline are monitored, not just a few metrics.
  • Documentation and communication: Maintain clear documentation of what each metric and alarm represent and ensure that your operations team understands the monitoring set up and response procedures. Set up the alerting mechanisms to send notifications through appropriate channels such as email, corporate messenger, or telephone to ensure your operations team stays informed and can quickly address the issues.
  • Create playbooks and automate remediation tasks: Establish detailed playbooks that describe step-by-step responses for common scenarios identified by alerts. Additionally, automate remediation tasks where possible to speed up response times and reduce the manual burden on teams. This ensures consistent and effective responses to all incidents.

CloudWatch anomaly detection and alerting features help organizations proactively manage their data lakes. This ensures data integrity, reduces downtime, and maintains high data quality. As a result, it enhances operational efficiency and supports robust data governance.

Conclusion

In this blog post, we explored Apache Iceberg’s transformative impact on data lake management. Apache Iceberg addresses the challenges of big data with features like ACID transactions, schema evolution, and snapshot isolation, enhancing data reliability, query performance, and scalability.

We delved into Iceberg’s metadata layer and related metadata tables such as snapshots, files, and partitions that allow easy access to crucial information about the current state of the table. These metadata tables facilitate the extraction of performance-related data, enabling teams to monitor and optimize the data lake’s efficiency.

Finally, we showed you a practical solution for monitoring Apache Iceberg tables using Lambda, AWS Glue, and CloudWatch. This solution uses Iceberg’s metadata layer and CloudWatch monitoring capabilities to provide a proactive operational framework. This framework detects trends and anomalies, ensuring robust data lake management.


About the Author

AvatarMichael Greenshtein is a Senior Analytics Specialist at Amazon Web Services. He is an experienced data professional with over 8 years in cloud computing and data management. Michael is passionate about open-source technology and Apache Iceberg.

Manage Amazon Redshift provisioned clusters with Terraform

Post Syndicated from Amit Ghodke original https://aws.amazon.com/blogs/big-data/manage-amazon-redshift-provisioned-clusters-with-terraform/

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it straightforward and cost-effective to analyze all your data using standard SQL and your existing extract, transform, and load (ETL); business intelligence (BI); and reporting tools. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads such as BI, predictive analytics, and real-time streaming analytics.

HashiCorp Terraform is an infrastructure as code (IaC) tool that lets you define cloud resources in human-readable configuration files that you can version, reuse, and share. You can then use a consistent workflow to provision and manage your infrastructure throughout its lifecycle.

In this post, we demonstrate how to use Terraform to manage common Redshift cluster operations, such as:

  • Creating a new provisioned Redshift cluster using Terraform code and adding an AWS Identity and Access Management (IAM) role to it
  • Scheduling pause, resume, and resize operations for the Redshift cluster

Solution overview

The following diagram illustrates the solution architecture for provisioning a Redshift cluster using Terraform.

Solution Overview

In addition to Amazon Redshift, the solution uses the following AWS services:

  • Amazon Elastic Compute Cloud (Amazon EC2) offers the broadest and deepest compute platform, with over 750 instances and choice of the latest processors, storage, networking, operating system (OS), and purchase model to help you best match the needs of your workload. For this post, we use an m5.xlarge instance with the Windows Server 2022 Datacenter Edition. The choice of instance type and Windows OS is flexible; you can choose a configuration that suits your use case.
  • IAM allows you to securely manage identities and access to AWS services and resources. We use IAM roles and policies to securely access services and perform relevant operations. An IAM role is an AWS identity that you can assume to gain temporary access to AWS services and resources. Each IAM role has a set of permissions defined by IAM policies. These policies determine the actions and resources the role can access.
  • AWS Secrets Manager allows you to securely store the user name and password needed to log in to Amazon Redshift.

In this post, we demonstrate how to set up an environment that connects AWS and Terraform. The following are the high-level tasks involved:

  1. Set up an EC2 instance with Windows OS in AWS.
  2. Install Terraform on the instance.
  3. Configure your environment variables (Windows OS).
  4. Define an IAM policy to have minimum access to perform activities on a Redshift cluster, including pause, resume, and resize.
  5. Establish an IAM role using the policy you created.
  6. Create a provisioned Redshift cluster using Terraform code.
  7. Attach the IAM role you created to the Redshift cluster.
  8. Write the Terraform code to schedule cluster operations like pause, resume, and resize.

Prerequisites

To complete the activities described in this post, you need an AWS account and administrator privileges on the account to use the key AWS services and create the necessary IAM roles.

Create an EC2 instance

We begin with creating an EC2 instance. Complete the following steps to create a Windows OS EC2 instance:

  1. On the Amazon EC2 console, choose Launch Instance.
  2. Choose a Windows Server Amazon Machine Image (AMI) that suits your requirements.
  3. Select an appropriate instance type for your use case.
  4. Configure the instance details:
    1. Choose the VPC and subnet where you want to launch the instance.
    2. Enable Auto-assign Public IP.
    3. For Add storage, configure the desired storage options for your instance.
    4. Add any necessary tags to the instance.
  5. For Configure security group, select or create a security group that allows the necessary inbound and outbound traffic to your instance.
  6. Review the instance configuration and choose Launch to start the instance creation process.
  7. For Select an existing key pair or create a new key pair, choose an existing key pair or create a new one.
  8. Choose Launch instance.
  9. When the instance is running, you can connect to it using the Remote Desktop Protocol (RDP) and the administrator password obtained from the Get Windows password

Install Terraform on the EC2 instance

Install Terraform on the Windows EC2 instance using the following steps:

  1. RDP into the EC2 instance you created.
  2. Install Terraform on the EC2 instance.

You need to update the environment variables to point to the directory where the Terraform executable is available.

  1. Under System Properties, on the Advanced tab, choose Environment Variables.

Environment Variables

  1. Choose the path variable.

Path Variables

  1. Choose New and enter the path where Terraform is installed. For this post, it’s in the C:\ directory.

Add Terraform to path variable

  1. Confirm Terraform is installed by entering the following command:

terraform -v

Check Terraform version

Optionally, you can use an editor like Visual Studio Code (VS Code) and add the Terraform extension to it.

Create a user for accessing AWS through code (AWS CLI and Terraform)

Next, we create an administrator user in IAM, which performs the operations on AWS through Terraform and the AWS Command Line Interface (AWS CLI). Complete the following steps:

  1. Create a new IAM user.
  2. On the IAM console, download and save the access key and user key.

Create New IAM User

  1. Install the AWS CLI.
  2. Launch the AWS CLI and run aws configure and pass the access key ID, secret access key, and default AWS Region.

This prevents the AWS user name and password from being visible in plain text in the Terraform code and prevents accidental sharing when the code is committed to a code repository.

AWS Configure

Create a user for Accessing Redshift through code (Terraform)

Because we’re creating a Redshift cluster and subsequent operations, the administrator user name and password required for these processes (different than the admin role we created earlier for logging in to the AWS Management Console) needs to be invoked in the code. To do this securely, we use Secrets Manager to store the user name and password. We write code in Terraform to access these credentials during the cluster create operation. Complete the following steps:

  1. On the Secrets Manager console, choose Secrets in the navigation pane.
  2. Choose Store a new secret.

Store a New Secret

  1. For Secret type, select Credentials for Amazon Redshift data warehouse.
  2. Enter your credentials.

Choose Secret Type

Set up Terraform

Complete the following steps to set up Terraform:

  1. Create a folder or directory for storing all your Terraform code.
  2. Open the VS Code editor and browse to your folder.
  3. Choose New File and enter a name for the file using the .tf extension

Now we’re ready to start writing our code starting with defining providers. The providers definition is a way for Terraform to get the necessary APIs to interact with AWS.

  1. Configure a provider for Terraform:
terraform {
required_providers {
aws = {
source  = "hashicorp/aws"
version = "5.53.0"
}
}
}

# Configure the AWS Provider
provider "aws" {
region = "us-east-1"
}
  1. Access the admin credentials for the Amazon Redshift admin user:
data "aws_secretsmanager_secret_version" "creds" {
# Fill in the name you gave to your secret
secret_id = "terraform-creds"
}
/*json decode to parse the secret*/
locals {
terraform-creds = jsondecode(
data.aws_secretsmanager_secret_version.creds.secret_string
)
}

Create a Redshift cluster

To create a Redshift cluster, use the aws_redshift_cluster resource:

# Create an encrypted Amazon Redshift cluster

resource "aws_redshift_cluster" "dw_cluster" {
cluster_identifier = "tf-example-redshift-cluster"
database_name      = "dev"
master_username    = local.terraform-creds.username
master_password    = local.terraform-creds.password
node_type          = "ra3.xlplus"
cluster_type       = "multi-node"
publicly_accessible = "false"
number_of_nodes    = 2
encrypted         = true
kms_key_id        = local.RedshiftClusterEncryptionKeySecret.arn
enhanced_vpc_routing = true
cluster_subnet_group_name="<<your-cluster-subnet-groupname>>"
}

In this example, we create a Redshift cluster called tf-example-redshift-cluster, using the ra3.xlplus node type 2 node cluster. We use the credentials from Secrets Manager and jsondecode to access these values. This makes sure the user name and password aren’t passed in plain text.

Add an IAM role to the cluster

Because we didn’t have the option to associate an IAM role during cluster creation, we do so now with the following code:

resource "aws_redshift_cluster_iam_roles" "cluster_iam_role" {
cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
iam_role_arns      = ["arn:aws:iam::yourawsaccountId:role/service-role/yourIAMrolename"]
}

Enable Redshift cluster operations

Performing operations on the Redshift cluster such as resize, pause, and resume on a schedule offers a more practical use of these operations. Therefore, we create two policies: one that allows the Amazon Redshift scheduler service and one that allows the cluster pause, resume, and resize operations. Then we create a role that has both policies attached to it.

You can perform these steps directly from the console and then referenced in Terraform code. The following example demonstrates the code snippets to create policies and a role, and then to attach the policy to the role.

  1. Create the Amazon Redshift scheduler policy document and create the role that assumes this policy:
#define policy document to establish the Trust Relationship between the role and the entity (Redshift scheduler)

data "aws_iam_policy_document" "assume_role_scheduling" {
statement {
effect = "Allow"
principals {
type        = "Service"
identifiers = ["scheduler.redshift.amazonaws.com"]
}
actions = ["sts:AssumeRole"]
}
}

#create a role that has the above trust relationship attached to it, so that it can invoke the redshift scheduling service
resource "aws_iam_role" "scheduling_role" {
name               = "redshift_scheduled_action_role"
assume_role_policy = data.aws_iam_policy_document.assume_role_scheduling.json
}
  1. Create a policy document and policy for Amazon Redshift operations:
/*define the policy document for other redshift operations*/

data "aws_iam_policy_document" "redshift_operations_policy_definition" {
statement {
effect = "Allow"
actions = [
"redshift:PauseCluster",
"redshift:ResumeCluster",
"redshift:ResizeCluster",
]
resources = ["arn:aws:redshift:*:youraccountid:cluster:*"]
}
}

/*create the policy and add the above data (json) to the policy*/
resource "aws_iam_policy" "scheduling_actions_policy" {
name   = "redshift_scheduled_action_policy"
policy = data.aws_iam_policy_document.redshift_operations_policy_definition.json
}
  1. Attach the policy to the IAM role:
/*connect the policy and the role*/
resource "aws_iam_role_policy_attachment" "role_policy_attach" {
policy_arn = aws_iam_policy.scheduling_actions_policy.arn
role       = aws_iam_role.scheduling_role.name
}
  1. Pause the Redshift cluster:
#pause a cluster
resource "aws_redshift_scheduled_action" "pause_operation" {
name     = "tf-redshift-scheduled-action-pause"
schedule = "cron(00 22 * * ? *)"
iam_role = aws_iam_role.scheduling_role.arn
target_action {
pause_cluster {
cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
}
}
}

In the preceding example, we created a scheduled action called tf-redshift-scheduled-action-pause that pauses the cluster at 10:00 PM every day as a cost-saving action.

  1. Resume the Redshift cluster:
name     = "tf-redshift-scheduled-action-resume"
schedule = "cron(15 07 * * ? *)"
iam_role = aws_iam_role.scheduling_role.arn
target_action {
resume_cluster {
cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
}
}
}

In the preceding example, we created a scheduled action called tf-redshift-scheduled-action-resume that resumes the cluster at 7:15 AM every day in time for business operations to start using the Redshift cluster.

  1. Resize the Redshift cluster:
#resize a cluster
resource "aws_redshift_scheduled_action" "resize_operation" {
name     = "tf-redshift-scheduled-action-resize"
schedule = "cron(15 14 * * ? *)"
iam_role = aws_iam_role.scheduling_role.arn
target_action {
resize_cluster {
cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
cluster_type = "multi-node"
node_type = "ra3.xlplus"
number_of_nodes = 4 /*increase the number of nodes using resize operation*/
classic = true /*default behavior is to use elastic resizeboolean value if we want to use classic resize*/
}
}
}

In the preceding example, we created a scheduled action called tf-redshift-scheduled-action-resize that increases the nodes from 2 to 4. You can do other operations like change the node type as well. By default, elastic resize will be used, but if you want to use classic resize, you have to pass the parameter classic = true as shown in the preceding code. This can be a scheduled action to anticipate the needs of peak periods and resize appripriately for that duration. You can then downsize using similar code during non-peak times.

Test the solution

We apply the following code to test the solution. Change the resource details accordingly, such as account ID and Region name.

terraform {
  required_providers {
    aws = {
      source  = "hashicorp/aws"
      version = "5.53.0"
    }
  }
}

# Configure the AWS Provider
provider "aws" {
  region = "us-east-1"
}

# access secrets stored in secret manager
data "aws_secretsmanager_secret_version" "creds" {
  # Fill in the name you gave to your secret
  secret_id = "terraform-creds"
}

/*json decode to parse the secret*/
locals {
  terraform-creds = jsondecode(
    data.aws_secretsmanager_secret_version.creds.secret_string
  )
}

#Store the arn of the KMS key to be used for encrypting the redshift cluster

data "aws_secretsmanager_secret_version" "encryptioncreds" {
  secret_id = "RedshiftClusterEncryptionKeySecret"
}
locals {
  RedshiftClusterEncryptionKeySecret = jsondecode(
    data.aws_secretsmanager_secret_version.encryptioncreds.secret_string
  )
}

# Create an encrypted Amazon Redshift cluster
resource "aws_redshift_cluster" "dw_cluster" {
  cluster_identifier = "tf-example-redshift-cluster"
  database_name      = "dev"
  master_username    = local.terraform-creds.username
  master_password    = local.terraform-creds.password
  node_type          = "ra3.xlplus"
  cluster_type       = "multi-node"
  publicly_accessible = "false"
  number_of_nodes    = 2
  encrypted         = true
  kms_key_id        = local.RedshiftClusterEncryptionKeySecret.arn
  enhanced_vpc_routing = true
  cluster_subnet_group_name="redshiftclustersubnetgroup-yuu4sywme0bk"
}

#add IAM Role to the Redshift cluster

resource "aws_redshift_cluster_iam_roles" "cluster_iam_role" {
  cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
  iam_role_arns      = ["arn:aws:iam::youraccountid:role/service-role/yourrolename"]
}

#for audit logging please create an S3 bucket which has read write privileges for Redshift service, this example does not include S3 bucket creation.

resource "aws_redshift_logging" "redshiftauditlogging" {
  cluster_identifier   = aws_redshift_cluster.dw_cluster.cluster_identifier
  log_destination_type = "s3"
  bucket_name          = "your-s3-bucket-name"
}

#to do operations like pause, resume, resize on a schedule we need to first create a role that has permissions to perform these operations on the cluster

#define policy document to establish the Trust Relationship between the role and the entity (Redshift scheduler)

data "aws_iam_policy_document" "assume_role_scheduling" {
  statement {
    effect = "Allow"
    principals {
      type        = "Service"
      identifiers = ["scheduler.redshift.amazonaws.com"]
    }

    actions = ["sts:AssumeRole"]
  }
}

#create a role that has the above trust relationship attached to it, so that it can invoke the redshift scheduling service
resource "aws_iam_role" "scheduling_role" {
  name               = "redshift_scheduled_action_role"
  assume_role_policy = data.aws_iam_policy_document.assume_role_scheduling.json
}

/*define the policy document for other redshift operations*/

data "aws_iam_policy_document" "redshift_operations_policy_definition" {
  statement {
    effect = "Allow"
    actions = [
      "redshift:PauseCluster",
      "redshift:ResumeCluster",
      "redshift:ResizeCluster",
    ]

    resources =  ["arn:aws:redshift:*:youraccountid:cluster:*"]
  }
}

/*create the policy and add the above data (json) to the policy*/

resource "aws_iam_policy" "scheduling_actions_policy" {
  name   = "redshift_scheduled_action_policy"
  policy = data.aws_iam_policy_document.redshift_operations_policy_definition.json
}

/*connect the policy and the role*/

resource "aws_iam_role_policy_attachment" "role_policy_attach" {
  policy_arn = aws_iam_policy.scheduling_actions_policy.arn
  role       = aws_iam_role.scheduling_role.name
}

#pause a cluster

resource "aws_redshift_scheduled_action" "pause_operation" {
  name     = "tf-redshift-scheduled-action-pause"
  schedule = "cron(00 14 * * ? *)"
  iam_role = aws_iam_role.scheduling_role.arn
  target_action {
    pause_cluster {
      cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
    }
  }
}

#resume a cluster

resource "aws_redshift_scheduled_action" "resume_operation" {
  name     = "tf-redshift-scheduled-action-resume"
  schedule = "cron(15 14 * * ? *)"
  iam_role = aws_iam_role.scheduling_role.arn
  target_action {
    resume_cluster {
      cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
    }
  }
}

#resize a cluster

resource "aws_redshift_scheduled_action" "resize_operation" {
  name     = "tf-redshift-scheduled-action-resize"
  schedule = "cron(15 14 * * ? *)"
  iam_role = aws_iam_role.scheduling_role.arn
  target_action {
    resize_cluster {
      cluster_identifier = aws_redshift_cluster.dw_cluster.cluster_identifier
      cluster_type = "multi-node"
      node_type = "ra3.xlplus"
      number_of_nodes = 4 /*increase the number of nodes using resize operation*/
      classic = true /*default behavior is to use elastic resizeboolean value if we want to use classic resize*/
    }
  }
}

Run terraform plan to see a list of changes that will be made, as shown in the following screenshot.

Terraform plan

After you have reviewed the changes, use terraform apply to create the resources you defined.

Terraform Apply

You will be asked to enter yes or no before Terraform starts creating the resources.

Confirmation of apply

You can confirm that the cluster is being created on the Amazon Redshift console.

redshift cluster creation

After the cluster is created, the IAM roles and schedules for pause, resume, and resize operations are added, as shown in the following screenshot.

Terraform actions

You can also view these scheduled operations on the Amazon Redshift console.

Scheduled Actions

Clean up

If you deployed resources such as the Redshift cluster and IAM roles, or any of the other associated resources by running terraform apply, to avoid incurring charges on your AWS account, run terraform destroy to tear these resources down and clean up your environment.

Conclusion

Terraform offers a powerful and flexible solution for managing your infrastructure as code using a declarative approach, with a cloud-agnostic nature, resource orchestration capabilities, and strong community support. This post provided a comprehensive guide to using Terraform to deploy a Redshift cluster and perform important operations such as resize, resume, and pause on the cluster. Embracing IaC and using the right tools, such as Workflow Studio, VS Code, and Terraform, will enable you to build scalable and maintainable distributed applications, and automate processes.


About the Authors

Amit Ghodke is an Analytics Specialist Solutions Architect based out of Austin. He has worked with databases, data warehouses and analytical applications for the past 16 years. He loves to help customers implement analytical solutions at scale to derive maximum business value.

Ritesh Kumar Sinha is an Analytics Specialist Solutions Architect based out of San Francisco. He has helped customers build scalable data warehousing and big data solutions for over 16 years. He loves to design and build efficient end-to-end solutions on AWS. In his spare time, he loves reading, walking, and doing yoga.

Migrate workloads from AWS Data Pipeline

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/migrate-workloads-from-aws-data-pipeline/

AWS Data Pipeline helps customers automate the movement and transformation of data. With Data Pipeline, customers can define data-driven workflows, so that tasks can be dependent on the successful completion of previous tasks. Launched in 2012, Data Pipeline predates several popular Amazon Web Services (AWS) offerings for orchestrating data pipelines such as AWS Glue, AWS Step Functions, and Amazon Managed Workflows for Apache Airflow (Amazon MWAA).

Data Pipeline has been a foundational service for getting customer off the ground for their extract, transform, load (ETL) and infra provisioning use cases. Some customers want a deeper level of control and specificity than possible using Data Pipeline. With the recent advancements in the data industry, customers are looking for a more feature-rich platform to modernize their data pipelines to get them ready for data and machine learning (ML) innovation. This post explains how to migrate from Data Pipeline to alternate AWS services to serve the growing needs of data practitioners. The option you choose depends on your current workload on Data Pipeline. You can migrate typical use cases of Data Pipeline to AWS Glue, Step Functions, or Amazon MWAA.

Note that you will need to modify the configurations and code in the examples provided in this post based on your requirements. Before starting any production workloads after migration, you need to test your new workflows to ensure no disruption to production systems.

Migrating workloads to AWS Glue

AWS Glue is a serverless data integration service that helps analytics users to discover, prepare, move, and integrate data from multiple sources. It includes tooling for authoring, running jobs, and orchestrating workflows. With AWS Glue, you can discover and connect to hundreds of different data sources and manage your data in a centralized data catalog. You can visually create, run, and monitor ETL pipelines to load data into your data lakes. Also, you can immediately search and query cataloged data using Amazon Athena, Amazon EMR, and Amazon Redshift Spectrum.

We recommend migrating your Data Pipeline workload to AWS Glue when:

  • You’re looking for a serverless data integration service that supports various data sources, authoring interfaces including visual editors and notebooks, and advanced data management capabilities such as data quality and sensitive data detection.
  • Your workload can be migrated to AWS Glue workflows, jobs (in Python or Apache Spark) and crawlers (for example, your existing pipeline is built on top of Apache Spark).
  • You need a single platform that can handle all aspects of your data pipeline, including ingestion, processing, transfer, integrity testing, and quality checks.
  • Your existing pipeline was created from a pre-defined template on the AWS Management Console for Data Pipeline, such as exporting a DynamoDB table to Amazon S3, or importing DynamoDB backup data from S3, and you’re looking for the same template.
  • Your workload doesn’t depend on a specific Hadoop ecosystem application such as Apache Hive.
  • Your workload doesn’t require orchestrating on-premises servers, user-managed Amazon Elastic Compute Cloude (Amazon EC2) instances, or a user-managed Amazon EMR cluster.

Example: Migrate EmrActivity on EmrCluster to export DynamoDB tables to S3

One of the most common workloads on Data Pipeline is to backup Amazon DynamoDB tables to Amazon Simple Storage Service (Amazon S3). Data Pipeline has a pre-defined template named Export DynamoDB table to S3 to export DynamoDB table data to a given S3 bucket.

The template uses EmrActivity (named TableBackupActivity) which runs on EmrCluster (named EmrClusterForBackup) and backs up data on DynamoDBDataNode to S3DataNode.

You can migrate these pipelines to AWS Glue because it natively supports reading from DynamoDB.

To define an AWS Glue job for the preceding use case:

  1. Open the AWS Glue console.
  2. Choose ETL jobs.
  3. Choose Visual ETL.
  4. For Sources, select Amazon DynamoDB.
  5. On the node Data source - DynamoDB, for DynamoDB source, select Choose the DynamoDB table directly, then select your source DynamoDB table from the menu.
  6. For Connection options, enter s3.bucket and dynamodb.s3.prefix.
  7. Choose + (plus) to add a new node.
  8. For Targets, select Amazon S3.
  9. On the node Data target - S3 bucket, for Format, select your preferred format, for example, Parquet.
  10. For S3 Target location, enter your destination S3 path.
  11. On Job details tab, select IAM role. In case you do not have the IAM role, follow Configuring IAM permissions for AWS Glue.
  12. Choose Save and Run.

Your AWS Glue job has been successfully created and started.

You might notice that there is no property to manage read I/O rate. It’s because the default DynamoDB reader used in Glue Studio does not scan the source DynamoDB table. Instead it uses DynamoDB export.

Example: Migrate EmrActivity on EmrCluster to import DynamoDB from S3

Another common workload on Data Pipeline is to restore DynamoDB tables using backup data on Amazon S3. Data Pipeline has a pre-defined template named Import DynamoDB backup data from S3 to import DynamoDB table data from a given S3 bucket.

The template uses EmrActivity (named TableLoadActivity) which runs on EmrCluster (named EmrClusterForLoad) and loads data from S3DataNode to DynamoDBDataNode.

You can migrate these pipelines to AWS Glue because it natively supports writing to DynamoDB.

Prerequisites are to create a destination DynamoDB table and catalog it on Glue Data Catalog using Glue crawler, Glue console, or the API.

  1. Open the AWS Glue console.
  2. Choose ETL jobs.
  3. Choose Visual ETL.
  4. For Sources, select Amazon S3.
  5. On the node Data source - S3 bucket, for S3 URL, enter your S3 path.
  6. Choose + (plus) to add a new node.
  7. For Targets, select AWS Glue Data Catalog.
  8. On the node Data target - Data Catalog, for Database, select your destination database on Data Catalog.
  9. For Table, select your destination table on Data Catalog.
  10. On Job details tab, select IAM role. In case you do not have the IAM role, follow Configuring IAM permissions for AWS Glue.
  11. Choose Save and Run.

Your AWS Glue job has been successfully created and started.

Migrating workloads to Step Functions

AWS Step Functions is a serverless orchestration service that lets you build workflows for your business-critical applications. With Step Functions, you use a visual editor to build workflows and integrate directly with over 11,000 actions for over 250 AWS services, including AWS Lambda, Amazon EMR, DynamoDB, and more. You can use Step Functions for orchestrating data processing pipelines, handling errors, and working with the throttling limits on the underlying AWS services. You can create workflows that process and publish machine learning models, orchestrate micro-services, as well as control AWS services, such as AWS Glue, to create ETL workflows. You also can create long-running, automated workflows for applications that require human interaction.

We recommend migrating your Data Pipeline workload to Step Functions when:

  • You’re looking for a serverless, highly available workflow orchestration service.
  • You’re looking for a cost-effective solution that charges at single-task granularity.
  • Your workloads are orchestrating tasks for multiple AWS services, such as Amazon EMR, AWS Lambda, AWS Glue, or DynamoDB.
  • You’re looking for a low-code solution that comes with a drag-and-drop visual designer for workflow creation and doesn’t require learning new programming concepts.
  • You’re looking for a service that provides integrations with over 250 AWS services covering over 11,000 actions out-of-the-box, as well as allowing integrations with custom non-AWS services and activities.
  • Both Data Pipeline and Step Functions use JSON format to define workflows. This allows you to store your workflows in source control, manage versions, control access, and automate with continuous integration and development (CI/CD). Step Functions use a syntax called Amazon State Language, which is fully based on JSON and allows a seamless transition between the textual and visual representations of the workflow.
  • Your workload requires orchestrating on-premises servers, user-managed EC2 instances, or a user-managed EMR cluster.

With Step Functions, you can choose the same version of Amazon EMR that you’re currently using in Data Pipeline.

For migrating activities on Data Pipeline managed resources, you can use AWS SDK service integration on Step Functions to automate resource provisioning and cleaning up. For migrating activities on on-premises servers, user-managed EC2 instances, or a user-managed EMR cluster, you can install an SSM agent to the instance. You can initiate the command through the AWS Systems Manager Run Command from Step Functions. You can also initiate the state machine from the schedule defined in Amazon EventBridge.

Example: Migrate HadoopActivity on EmrCluster

To migrate HadoopActivity on EmrCluster on Data Pipeline to Step Functions:

  1. Open the AWS Step Functions console.
  2. Choose State machines.
  3. Choose Create state machine.
  4. In the Choose a template wizard, search for emr, select Manage an EMR job, and choose Select.
  1. For Choose how to use this template, select Build on it.
  2. Choose Use template.
  1. For Create an EMR cluster state, configure API Parameters based on the EMR release label, EMR capacity, IAM role, and so on based on the existing EmrClusternode configuration on Data Pipeline.
  1. For Run first step state, configure API Parameters based on the JAR file and arguments based on the existing HadoopActivity node configuration on Data Pipeline.
  2. If you have further activities configured on the existing HadoopActivity, repeat step 8.
  3. Choose Create.

Your state machine has been successfully configured. Learn more in Manage an Amazon EMR Job.

Migrating workloads to Amazon MWAA

Amazon MWAA is a managed orchestration service for Apache Airflow that lets you use the Apache Airflow platform to set up and operate end-to-end data pipelines in the cloud at scale. Apache Airflow is an open-source tool used to programmatically author, schedule, and monitor sequences of processes and tasks referred to as workflows. Apache Airflow brings in new concepts like executors, pools, and SLAs that provide you with superior data orchestration capabilities. With Amazon MWAA, you can use Airflow and Python programming language to create workflows without having to manage the underlying infrastructure for scalability, availability, and security. Amazon MWAA automatically scales its workflow runtime capacity to meet your needs and is integrated with AWS security services to help provide you with fast and secure access to your data.

We recommend migrating your Data Pipeline workloads to Amazon MWAA when:

  • You’re looking for a managed, highly available service to orchestrate workflows written in Python.
  • You want to transition to a fully managed, widely adopted open source technology—Apache Airflow—for maximum portability.
  • You require a single platform that can handle all aspects of your data pipeline, including ingestion, processing, transfer, integrity testing, and quality checks.
  • You’re looking for a service designed for data pipeline orchestration with features such as rich UI for observability, restarts for failed workflows, backfills, retries for tasks, and lineage support with OpenLineage.
  • You’re looking for a service that comes with more than 1,000 pre-built operators and sensors, covering AWS as well as non-AWS services.
  • Your workload requires orchestrating on-premises servers, user-managed EC2 instances, or a user-managed EMR cluster.

Amazon MWAA workflows are defined as directed acyclic graphs (DAGs) using Python, so you can also treat them as source code. Airflow’s extensible Python framework enables you to build workflows connecting with virtually any technology. It comes with a rich user interface for viewing and monitoring workflows and can be easily integrated with version control systems to automate the CI/CD process. With Amazon MWAA, you can choose the same version of Amazon EMR that you’re currently using in Data Pipeline.

Example: Migrate HadoopActivity on EmrCluster

Complete the following steps in case you do not have existing MWAA environments:

  1. Create an AWS CloudFormation template on your computer by copying the template from the quick start guide into a local text file.
  2. On the CloudFormation console, choose Stacks in the navigation pane.
  3. Choose Create stack with the option With new resources (standard).
  4. Choose Upload a template file and select the local template file.
  5. Choose Next.
  6. Complete the setup steps, entering a name for the environment, and leave the rest of the parameters as default.
  7. On the last step, acknowledge that resources will be created and choose Submit.

The creation can take 20–30 minutes, until the status of the stack changes to CREATE_COMPLETE. The resource that will take the most time is the Airflow environment. While it’s being created, you can continue with the following steps, until you’re required to open the Airflow UI.

An Airflow workflow is based on a DAG, which is defined by a Python file that programmatically specifies the different tasks involved and its interdependencies. Complete the following scripts to create the DAG:

  1. Create a local file named emr_dag.py using a text editor with following snippets, and configure the EMR related parameters based on the existing Data Pipeline definition:
    from airflow import DAG
    from airflow.providers.amazon.aws.operators.emr import (
        EmrCreateJobFlowOperator,
        EmrAddStepsOperator,
    )
    from airflow.providers.amazon.aws.sensors.emr import EmrStepSensor
    from airflow.utils.dates import days_ago
    from datetime import timedelta
    import os
    DAG_ID = os.path.basename(__file__).replace(".py", "")
    SPARK_STEPS = [
        {
            'Name': 'calculate_pi',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': ['spark-example', 'SparkPi', '10'],
            },
        }
    ]
    JOB_FLOW_OVERRIDES = {
        'Name': 'my-demo-cluster',
        'ReleaseLabel': 'emr-6.1.0',
        'Applications': [
            {
                'Name': 'Spark'
            },
        ],
        'Instances': {
            'InstanceGroups': [
                {
                    'Name': "Master nodes",
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'MASTER',
                    'InstanceType': 'm5.xlarge',
                    'InstanceCount': 1,
                },
                {
                    'Name': "Slave nodes",
                    'Market': 'ON_DEMAND',
                    'InstanceRole': 'CORE',
                    'InstanceType': 'm5.xlarge',
                    'InstanceCount': 2,
                }
            ],
            'KeepJobFlowAliveWhenNoSteps': False,
            'TerminationProtected': False,
        },
        'VisibleToAllUsers': True,
        'JobFlowRole': 'EMR_EC2_DefaultRole',
        'ServiceRole': 'EMR_DefaultRole'
    }
    with DAG(
        dag_id=DAG_ID,
        start_date=days_ago(1),
        schedule_interval='@once',
        dagrun_timeout=timedelta(hours=2),
        catchup=False,
        tags=['emr'],
    ) as dag:
        cluster_creator = EmrCreateJobFlowOperator(
            task_id='create_job_flow',
            job_flow_overrides=JOB_FLOW_OVERRIDES,
            aws_conn_id='aws_default',
        )
        step_adder = EmrAddStepsOperator(
            task_id='add_steps',
            job_flow_id=cluster_creator.output,
            aws_conn_id='aws_default',
            steps=SPARK_STEPS,
        )
        step_checker = EmrStepSensor(
            task_id='watch_step',
            job_flow_id=cluster_creator.output,
            step_id="{{ task_instance.xcom_pull(task_ids='add_steps')[0] }}",
            aws_conn_id='aws_default',
        )
        cluster_creator >> step_adder >> step_checker

Defining the schedule in Amazon MWAA is as simple as updating the schedule_interval parameter for the DAG. For example, to run the DAG daily, set schedule_interval='@daily'.

Now, you create a workflow that invokes the Amazon EMR step you just created:

  1. On the Amazon S3 console, locate the bucket created by the CloudFormation template, which will have a name starting with the name of the stack followed by -environmentbucket- (for example, myairflowstack-environmentbucket-ap1qks3nvvr4).
  2. Inside that bucket, create a folder called dags, and inside that folder, upload the DAG file emr_dag.py that you created in the previous section.
  3. On the Amazon MWAA console, navigate to the environment you deployed with the CloudFormation stack.

If the status is not yet Available, wait until it reaches that state. It shouldn’t take longer than 30 minutes after you deployed the CloudFormation stack.

  1. Choose the environment link on the table to see the environment details.

It’s configured to pick up DAGs from the bucket and folder you used in the previous steps. Airflow will monitor that folder for changes.

  1. Choose Open Airflow UI to open a new tab accessing the Airflow UI, using the integrated IAM security to sign you in.

If there are issues with the DAG file you created, it will display an error on top of the page indicating the lines affected. In that case, review the steps and upload again. After a few seconds, it will parse it and update or remove the error banner.

Clean up

After you migrate your existing Data Pipeline workload and verify that the migration was successful, delete your pipelines in Data Pipeline to stop further runs and billing.

Conclusion

In this blog post, we outlined a few alternate AWS services for migrating your existing Data Pipeline workloads. You can migrate to AWS Glue to run and orchestrate Apache Spark applications, AWS Step Functions to orchestrate workflows involving various other AWS services, or Amazon MWAA to help manage workflow orchestration using Apache Airflow. By migrating, you will be able to run your workloads with a broader range of data integration functionalities. If you have additional questions, post in the comments or read about migration examples in our documentation.


About the authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team and AWS Data Pipeline team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Vaibhav Porwal is a Senior Software Development Engineer on the AWS Glue and AWS Data Pipeline team. He is working on solving problems in orchestration space by building low cost, repeatable, scalable workflow systems that enables customers to create their ETL pipelines seamlessly.

Sriram Ramarathnam is a Software Development Manager on the AWS Glue and AWS Data Pipeline team. His team works on solving challenging distributed systems problems for data integration across AWS serverless and serverfull compute offerings.

Matt Su is a Senior Product Manager on the AWS Glue team and AWS Data Pipeline team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytics services. In his spare time, he enjoys skiing and gardening.

How ActionIQ built a truly composable customer data platform using Amazon Redshift

Post Syndicated from Mackenzie Johnson original https://aws.amazon.com/blogs/big-data/how-actioniq-built-a-truly-composable-customer-data-platform-using-amazon-redshift/

This post is written in collaboration with Mackenzie Johnson and Phil Catterall from ActionIQ.

ActionIQ is a leading composable customer data (CDP) platform designed for enterprise brands to grow faster and deliver meaningful experiences for their customers. ActionIQ taps directly into a brand’s data warehouse to build smart audiences, resolve customer identities, and design personalized interactions to unlock revenue across the customer lifecycle. Enterprise brands including Albertsons, Atlassian, Bloomberg, e.l.f. Beauty, DoorDash, HP, and more use ActionIQ to drive growth through better customer experiences.

High costs associated with launching campaigns, the security risk of duplicating data, and the time spent on SQL requests have created a demand for a better solution for managing and activating customer data. Organizations are demanding secure, cost efficient, and time efficient solutions to power their marketing outcomes.

This post will demonstrate how ActionIQ built a connector for Amazon Redshift to tap directly into your data warehouse and deliver a secure, zero-copy CDP. It will cover how you can get started with building a truly composable CDP with Amazon Redshift—from the solution architecture to setting up and testing the connector.

The challenge

Copying or moving data means heavy and complex logistics, along with added incurred cost and security risks associated with replicating data. On the logistics side, data engineering teams have to set up additional extract, transform, and load (ETL) pipelines out of their Amazon Redshift warehouse into ActionIQ, then configure ActionIQ to ingest the data on a recurring basis. Additional ETL jobs means more moving parts, which introduces more potential points of failure, such as breaking schema changes, partial data transfers, delays, and more. All of this requires additional observability overhead to help your team alert on and manage issues as they come up.

These additional ETL jobs add latency to the end-to-end process from data collection to activation, which makes it more likely that your campaigns are activating on stale data and missing key audience members. That will have implications for the customer experience, thereby directly affecting their ability to drive revenue.

The solution

Our solution aims to reduce the logistics already discussed and enables up-to-the-minute data by establishing a secure connection and pushing queries directly down to your data warehouse. Instead of loading full datasets into ActionIQ, the query is pushed to the data warehouse, making it do the hard querying and aggregation work, and wait for the result set.

With Amazon Redshift as your data warehouse, you can run complex workloads with consistently high performance while minimizing the time and effort spent in copying data over to the data warehouse through the use of features like zero-ETL integration with transactional data stores, streaming ingestion, and data sharing. You can also train machine learning models and make predictions directly from your Amazon Redshift data warehouse using familiar SQL commands.

Solution architecture

Within AWS, ActionIQ has a virtual private cloud (VPC) and you have your own VPC. We work within our own private area in AWS, with our own locks and access restrictions. Because ActionIQ is going to have access to your Amazon Redshift data warehouse, this implies that an outside organization (ActionIQ) will be able to make direct database queries to the production database environment.

A Solution architecture diagram showing AWS PrivaeLink set up between ActionIQ's VPC and the customer's VPC

For your information security (infosec) teams to approve this design, we need very clear and tight guardrails to ensure that:

  • ActionIQ only has access to what is absolutely necessary
  • No unintended third party can access these assets

ActionIQ needs to communicate securely to satisfy every information security requirement. To do that, within those AWS environments, you must set up AWS PrivateLink with ActionIQ to create a secure connection between the two VPCs. PrivateLink sets up a secure tunnel between the two VPCs, thus avoiding any opening of either VPC to the public internet. After PrivateLink is set up, ActionIQ needs to be granted privileges to the relevant database objects in your Amazon Redshift data warehouse.

In Amazon Redshift, you must create a distinct database, a service account specifically for ActionIQ, and views to populate the data to be shared with ActionIQ. The views need to adhere to ActionIQ’s data model guidelines, which aren’t rigid, but nonetheless require some structure, such as a clear profile_id that is used in all the views for easy joins between the various data sets.

Getting started with ActionIQ

When starting a hybrid compute integration with Amazon Redshift, it’s key to align your data to ActionIQ’s table types in the following manner:

  • Customer base table: A single dimension table with one record per customer that contains all customers.
  • User info tables: Dimension tables that describe customers and join to the customer base table. They often contain slow-moving or static demographic information and are typically matched one to one with customer records.
  • Event tables: Fact or log-like tables contain events or actions your customers take. The primary key is typically a user_id and timestamp.
  • Entity tables: Dimension tables that describe non-customer objects. They often provide additional information to augment the data in event tables. For example, an entity table could be a product table that contains product metadata and joins to a transaction event table on a product_id.

Visual representation of the relationships of the high level entities in the customer, event and product subject areas

Note: User info and event tables can join on any available identifier to the customer base table, not just the base user ID.

Now you can set up the connection and declare the views in the ActionIQ UI. After ActionIQ establishes a table with master profiles, users can begin to interpret those tables and work with them to build out campaigns.

Establish a secure connection

After setting up PrivateLink, the remaining steps to prepare for hybrid compute are the following:

  1. Create a separate database in Amazon Redshift to define the shared dataset with ActionIQ.
  2. Create a service account for ActionIQ in Amazon Redshift.
  3. Grant READ access to the service account for the dedicated database.
  4. Define the views that will be shared with ActionIQ.

Allow listing

If your data warehouse is on a private network, you must add ActionIQ’s IP addresses to your network’s allow list to allow ActionIQ to access your cloud warehouse. For more information on how to set this up, see the Configure inbound rules for SQL clients.

Database set up

Create an Amazon Redshift user for ActionIQ

  1. Sign in to the Amazon Redshift console
  2. From the navigation menu, choose the Query Editor and connect to your database.
  3. Create a user for ActionIQ:
    CREATE USER actioniq PASSWORD 'password';

  4. Grant permissions to the tables within a given schema you want to give ActionIQ access to:
GRANT USAGE ON SCHEMA 'yourschema' TO actioniq;
GRANT SELECT ON ALL TABLES IN SCHEMA 'yourschema' TO actioniq;

You can then run commands in the query editor to create a new user and to grant permission to the data sets you want to access through ActionIQ.

The result is that ActionIQ now has a programmatic query access to a dedicated database in your Amazon Redshift data warehouse, and that access is limited to that database.

In order to make this easy to govern, we recommend the following guidelines on the shared views:

  • As much as possible, the shared objects should be views and not tables.
  • The views should never use select *, but should explicitly specify each field desired in the view. This has multiple benefits:
    • The schema is robust; even if the underlying table changes, it won’t initiate a change in the shared view
    • It makes it very clear which fields are accessible by ActionIQ and which are not, thereby enabling a proper governance approval process.
  • Limiting privileges to READ access means the data warehouse administrators can be structurally certain that the data views won’t change unless they want them to.

The importance of providing views instead of actual tables is two-fold:

  1. A view doesn’t replicate data. The whole point is to avoid data replication, and we don’t want to replicate data within Amazon Redshift either. With a view, which is essentially a query definition on top of the actual data tables, we avoid the need to replicate data at all. There is a legitimate question of “why not give access to tables directly?” which brings us to the second point.
  2. Tables and data schema change on their own schedule, and ActionIQ needs a stable data schema to work with. By defining a view, we’re also defining a contract for sharing data between you and ActionIQ. The underlying data table can change, and the view definition can absorb this change, without modifying the structure of what the view delivers. This stability is critical for any enterprise software as a service to work effectively with a large organization.

On the ActionIQ side, there’s no caching or persisting of data of any kind. This means that ActionIQ launches a query whenever a scheduled campaign is launched and requires data, and whenever a user of the platform asks for an audience count. In other words, queries will generally happen during business hours, but technically can happen at any time.

Testing the connector

ActionIQ deploys the Amazon Redshift connector and tested queries to validate the success of the connector. After the audience is defined and validated, ActionIQ sends the SQL query to Amazon Redshift and it returns information. We also validate the results with Amazon Redshift to ensure that the logic is correct as intended.

With this, you experience a lean and more transparent deployment process. You can see the queries ActionIQ sends to Amazon Redshift, because the queries are logged. You can see what’s going on, what is attributable to ActionIQ, and can see the growth of adoption and usage.

Image showing connection set up to an Amazon Redshift database

A Connector defines the credentials and other parameters needed to connect to a cloud database or warehouse. The Connector screen is used to create, view and manage your connectors.

Key considerations

Organizations need strong data governance. ActionIQ requires a contract of what the data is going to look like within the defined view. With the dynamic nature of data, strong governance workflows with defined fields are required to run the connector smoothly to achieve the ultimate outcome—driving revenue through marketing campaigns.

Because ActionIQ is used as the central hub of marketing orchestration and activation, it needs to process a lot of queries. Because marketing activity can have significant spikes in activity, it’s prudent to plan for the maximum load on the underlying database.

In one scenario, you might have spikey workloads. With Amazon Redshift Serverless, your data warehouse will be able to scale automatically to manage those spikes. That means Amazon Redshift can absorb large and sudden spikes in queries from ActionIQ without much technical planning.

If workload isolation is a priority and you want to run the ActionIQ workloads using dedicated compute resources, you can use the data sharing feature to create a data share that can be accessed by a dedicated Redshift serverless endpoint. This would allow ActionIQ to query up-to-date data from a separate Redshift serverless instance without the need to copy any data while maintaining complete workload isolation.

The data team needs data to run business intelligence. ActionIQ is driving marketing activation and creating a new data set for the universal contact history—essentially the log of all marketing contacts from the activity. ActionIQ provides this dataset back to Amazon Redshift, which can then be included in the BI reports for ROI measurements.

Conclusion

For your information security teams, the ActionIQ’s Amazon Redshift connector presents a viable solution because ActionIQ doesn’t replicate the data, and the controls outlined establish how ActionIQ accesses the data. Key benefits include:

  • Control: Choose where data is stored and queried to improve security and fit existing technology investments.
  • Performance: Reduce operational effort, increase productivity and cut down on unnecessary technology costs.
  • Power: Use the auto-scaling capabilities of Amazon Redshift for running your workload.

For business teams, the ActionIQ Amazon Redshift connector is querying the freshest data possible. With the connector, there is zero data latency—an important consideration with key audience members that are primed to convert.

ActionIQ is excited to launch the Amazon Redshift connector to activate your data where it lives—within your Amazon Redshift data warehouse—for a zero-copy, real-time experience that drives outcomes with your customers. To learn more about how organizations are modernizing their data platforms using Amazon Redshift, visit the Amazon Redshift page.

Enhance your Amazon Redshift investment with ActionIQ.


About the authors

Mackenzie Johnson is a Senior Manager at ActionIQ. She is an innovative marketing strategist who’s passionate about the convergence of complementary technologies and amplifying joint value. With extensive experience across digital transformation storytelling, she thrives on educating enterprise businesses about the impact of CX based on a data-driven approach.

Phil Catterall is a Senior Product Manager at ActionIQ and leads product development on ActionIQ’s foundational data management, processing, and query federation capabilities. He’s passionate about designing and building scalable data products to empower business users in new ways.

Sain Das is a Senior Product Manager on the Amazon Redshift team and leads Amazon Redshift GTM for partner programs including the Powered by Amazon Redshift and Redshift Ready programs.

Building a scalable streaming data platform that enables real-time and batch analytics of electric vehicles on AWS

Post Syndicated from Ayush Agrawal original https://aws.amazon.com/blogs/big-data/building-a-scalable-streaming-data-platform-that-enables-real-time-and-batch-analytics-of-electric-vehicles-on-aws/

The automobile industry has undergone a remarkable transformation because of the increasing adoption of electric vehicles (EVs). EVs, known for their sustainability and eco-friendliness, are paving the way for a new era in transportation. As environmental concerns and the push for greener technologies have gained momentum, the adoption of EVs has surged, promising to reshape our mobility landscape.

The surge in EVs brings with it a profound need for data acquisition and analysis to optimize their performance, reliability, and efficiency. In the rapidly evolving EV industry, the ability to harness, process, and derive insights from the massive volume of data generated by EVs has become essential for manufacturers, service providers, and researchers alike.

As the EV market is expanding with many new and incumbent players trying to capture the market, the major differentiating factor will be the performance of the vehicles.

Modern EVs are equipped with an array of sensors and systems that continuously monitor various aspects of their operation including parameters such as voltage, temperature, vibration, speed, and so on. From battery management to motor performance, these data-rich machines provide a wealth of information that, when effectively captured and analyzed, can revolutionize vehicle design, enhance safety, and optimize energy consumption. The data can be used to do predictive maintenance, device anomaly detection, real-time customer alerts, remote device management, and monitoring.

However, managing this deluge of data isn’t without its challenges. As the adoption of EVs accelerates, the need for robust data pipelines capable of collecting, storing, and processing data from an exponentially growing number of vehicles becomes more pronounced. Moreover, the granularity of data generated by each vehicle has increased significantly, making it essential to efficiently handle the ever-increasing number of data points. The challenges include not only the technical intricacies of data management but also concerns related to data security, privacy, and compliance with evolving regulations.

In this blog post, we delve into the intricacies of building a reliable data analytics pipeline that can scale to accommodate millions of vehicles, each generating hundreds of metrics every second using Amazon OpenSearch Ingestion. We also provide guidelines and sample configurations to help you implement a solution.

Of the prerequisites that follow, the IOT topic rule and the Amazon Managed Streaming for Apache Kafka (Amazon MSK) cluster can be set up by following How to integrate AWS IoT Core with Amazon MSK. The steps to create an Amazon OpenSearch Service cluster are available in Creating and managing Amazon OpenSearch Service domains.

Prerequisites

Before you begin the implementing the solution, you need the following:

  • IOT topic rule
  • Amazon MSK Simple Authentication and Security Layer/Salted Challenge Response Mechanism (SASL/SCRAM) cluster
  • Amazon OpenSearch Service domain

Solution overview

The following architecture diagram provides a scalable and fully managed modern data streaming platform. The architecture uses Amazon OpenSearch Ingestion to stream data into OpenSearch Service and Amazon Simple Storage Service (Amazon S3) to store the data. The data in OpenSearch powers real-time dashboards. The data can also be used to notify customers of any failures occurring on the vehicle (see Configuring alerts in Amazon OpenSearch Service). The data in Amazon S3 is used for business intelligence and long-term storage.

Architecture diagram

In the following sections, we focus on the following three critical pieces of the architecture in depth:

1. Amazon MSK to OpenSearch ingestion pipeline

2. Amazon OpenSearch Ingestion pipeline to OpenSearch Service

3. Amazon OpenSearch Ingestion to Amazon S3

Solution Walkthrough

Step 1: MSK to Amazon OpenSearch Ingestion pipeline

Because each electric vehicle streams massive volumes of data to Amazon MSK clusters through AWS IoT Core, making sense of this data avalanche is critical. OpenSearch Ingestion provides a fully managed serverless integration to tap into these data streams.

The Amazon MSK source in OpenSearch Ingestion uses Kafka’s Consumer API to read records from one or more MSK topics. The MSK source in OpenSearch Ingestion seamlessly connects to MSK to ingest the streaming data into OpenSearch Ingestion’s processing pipeline.

The following snippet illustrates the pipeline configuration for an OpenSearch Ingestion pipeline used to ingest data from an MSK cluster.

While creating an OpenSearch Ingestion pipeline, add the following snippet in the Pipeline configuration section.

version: "2"
msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true                  
      topics: 
         - name: "ev-device-topic " 
           group_id: "opensearch-consumer" 
           serde_format: json                 
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        sts_role_arn: "arn:aws:iam:: ::<<account-id>>:role/opensearch-pipeline-Role"
        # Provide the region of the domain. 
        region: "<<region>>" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:<<region>>:<<account-id>>:cluster/<<name>>/<<id>>" 

When configuring Amazon MSK and OpenSearch Ingestion, it’s essential to establish an optimal relationship between the number of partitions in your Kafka topics and the number of OpenSearch Compute Units (OCUs) allocated to your ingestion pipelines. This optimal configuration ensures efficient data processing and maximizes throughput. You can read more about it in Configure recommended compute units (OCUs) for the Amazon MSK pipeline.

Step 2: OpenSearch Ingestion pipeline to OpenSearch Service

OpenSearch Ingestion offers a direct method for streaming EV data into OpenSearch. The OpenSearch sink plugin channels data from multiple sources directly into the OpenSearch domain. Instead of manually provisioning the pipeline, you define the capacity for your pipeline using OCUs. Each OCU provides 6 GB of memory and two virtual CPUs. To use OpenSearch Ingestion auto-scaling optimally, it’s essential to configure the maximum number of OCUs for a pipeline based on the number of partitions in the topics being ingested. If a topic has a large number of partitions (for example, more than 96, which is the maximum OCUs per pipeline), it’s recommended to configure the pipeline with a maximum of 1–96 OCUs. This way, the pipeline can automatically scale up or down within this range as needed. However, if a topic has a low number of partitions (for example, fewer than 96), it’s advisable to set the maximum number of OCUs to be equal to the number of partitions. This approach ensures that each partition is processed by a dedicated OCU enabling parallel processing and optimal performance. In scenarios where a pipeline ingests data from multiple topics, the topic with the highest number of partitions should be used as a reference to configure the maximum OCUs. Additionally, if higher throughput is required, you can create another pipeline with a new set of OCUs for the same topic and consumer group, enabling near-linear scalability.

OpenSearch Ingestion provides several pre-defined configuration blueprints that can help you quickly build your ingestion pipeline on AWS

The following snippet illustrates pipeline configuration for an OpenSearch Ingestion pipeline using OpenSearch as a SINK with a dead letter queue (DLQ) to Amazon S3. When a pipeline encounters write errors, it creates DLQ objects in the configured S3 bucket. DLQ objects exist within a JSON file as an array of failed events.

sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          hosts: [ "https://<<domain-name>>.<<region>>.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
            sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>" 
          # Provide the region of the domain. 
            region: "<<region>>" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # serverless: true 
          # index name can be auto-generated from topic name 
          index: "index_ev_pipe-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          #distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          dlq: 
            s3: 
            # Provide an S3 bucket 
              bucket: "<<bucket-name>>"
            # Provide a key path prefix for the failed requests
              key_path_prefix: "oss-pipeline-errors/dlq"
            # Provide the region of the bucket.
              region: "<<region>>"
            # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
              sts_role_arn: "arn:aws:iam:: <<account-id>>:role/<<role-name>>"

Step 3: OpenSearch Ingestion to Amazon S3

OpenSearch Ingestion offers a built-in sink for loading streaming data directly into S3. The service can compress, partition, and optimize the data for cost-effective storage and analytics in Amazon S3. Data loaded into S3 can be partitioned for easier query isolation and lifecycle management. Partitions can be based on vehicle ID, date, geographic region, or other dimensions as needed for your queries.

The following snippet illustrates how we’ve partitioned and stored EV data in Amazon S3.

- s3:
            aws:
              # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
              # Provide the region of the domain.
                region: "<<region>>"
            # Replace with the bucket to send the logs to
            bucket: "evbucket"
            object_key:
              # Optional path_prefix for your s3 objects
              path_prefix: "index_ev_pipe/year=%{yyyy}/month=%{MM}/day=%{dd}/hour=%{HH}"
            threshold:
              event_collect_timeout: 60s
            codec:
              parquet:
                auto_schema: true

The pipeline can be created following the steps in Creating Amazon OpenSearch Ingestion pipelines.

The following is the complete pipeline configuration, combining the configuration of all three steps. Update the Amazon Resource Names (ARNs), AWS Region, Open Search Service domain endpoint, and S3 names as needed.

The entire OpenSearch Ingestion pipeline configuration can be directly copied into the ‘Pipeline configuration’ field in the AWS Management Console while creating the OpenSearch Ingestion pipeline

version: "2"
msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true           # Default is false  
      topics: 
         - name: "<<msk-topic-name>>" 
           group_id: "opensearch-consumer" 
           serde_format: json        
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
        # Provide the region of the domain. 
        region: "<<region>>" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:us-east-1:<<account-id>>:cluster/<<cluster-name>>/<<cluster-id>>" 
  processor:
      - parse_json:
  sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          hosts: [ "https://<<opensearch-service-domain-endpoint>>.us-east-1.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
            sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>" 
          # Provide the region of the domain. 
            region: "<<region>>" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # index name can be auto-generated from topic name 
          index: "index_ev_pipe-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          #distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          dlq: 
            s3: 
            # Provide an S3 bucket 
              bucket: "<<bucket-name>>"
            # Provide a key path prefix for the failed requests
              key_path_prefix: "oss-pipeline-errors/dlq"
            # Provide the region of the bucket.
              region: "<<region>>"
            # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
              sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
      - s3:
            aws:
              # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
              # Provide the region of the domain.
                region: "<<region>>"
            # Replace with the bucket to send the logs to
            bucket: "<<bucket-name>>"
            object_key:
              # Optional path_prefix for your s3 objects
              path_prefix: "index_ev_pipe/year=%{yyyy}/month=%{MM}/day=%{dd}/hour=%{HH}"
            threshold:
              event_collect_timeout: 60s
            codec:
              parquet:
                auto_schema: true

Real-time analytics

After the data is available in OpenSearch Service, you can build real-time monitoring and notifications. OpenSearch Service has robust support for multiple notification channels, allowing you to receive alerts through services like Slack, Chime, custom webhooks, Microsoft Teams, email, and Amazon Simple Notification Service (Amazon SNS).

The following screenshot illustrates supported notification channels in OpenSearch Service.

The notification feature in OpenSearch Service allows you to create monitors that will watch for certain conditions or changes in your data and launch alerts, such as monitoring vehicle telemetry data and launching alerts for issues like battery degradation or abnormal energy consumption. For example, you can create a monitor that analyzes battery capacity over time and notifies the on-call team using Slack if capacity drops below expected degradation curves in a significant number of vehicles. This could indicate a potential manufacturing defect requiring investigation.

In addition to notifications, OpenSearch Service makes it easy to build real-time dashboards to visually track metrics across your fleet of vehicles. You can ingest vehicle telemetry data like location, speed, fuel consumption, and so on, and visualize it on maps, charts, and gauges. Dashboards can provide real-time visibility into vehicle health and performance.

The following screenshot illustrates creating a sample dashboard on OpenSearch Service

Opensearch Dashboard

A key benefit of OpenSearch Service is its ability to handle high sustained ingestion and query rates with millisecond latencies. It distributes incoming vehicle data across data nodes in a cluster for parallel processing. This allows OpenSearch to scale out to handle very large fleets while still delivering the real-time performance needed for operational visibility and alerting.

Batch analytics

After the data is available in Amazon S3, you can build a secure data lake to power a variety of analytics use cases deriving powerful insights. As an immutable store, new data is continually stored in S3 while existing data remains unaltered. This serves as a single source of truth for downstream analytics.

For business intelligence and reporting, you can analyze trends, identify insights, and create rich visualizations powered by the data lake. You can use Amazon QuickSight to build and share dashboards without needing to set up servers or infrastructure. Here’s an example of a Quicksight dashboard for IoT device data. For example, you can use a dashboard to gain insights from historical data that can help with better vehicle and battery design.

The Amazon Quicksight public gallery shows examples of dashboards across different domains.

You should consider Amazon OpenSearch dashboards for your operational day-to-day use cases to identify issues and alert in near real time whereas Amazon Quicksight should be used to analyze big data stored in a lake house and generate actionable insights from them.

Clean up

Delete the OpenSearch pipeline and Amazon MSK cluster to stop incurring costs on these services.

Conclusion

In this post, you learned how Amazon MSK, OpenSearch Ingestion, OpenSearch Services, and Amazon S3 can be integrated to ingest, process, store, analyze, and act on endless streams of EV data efficiently.

With OpenSearch Ingestion as the integration layer between streams and storage, the entire pipeline scales up and down automatically based on demand. No more complex cluster management or lost data from bursts in streams.

See Amazon OpenSearch Ingestion to learn more.


About the authors

Ayush Agrawal is a Startups Solutions Architect from Gurugram, India with 11 years of experience in Cloud Computing. With a keen interest in AI, ML, and Cloud Security, Ayush is dedicated to helping startups navigate and solve complex architectural challenges. His passion for technology drives him to constantly explore new tools and innovations. When he’s not architecting solutions, you’ll find Ayush diving into the latest tech trends, always eager to push the boundaries of what’s possible.

Fraser SequeiraFraser Sequeira is a Solutions Architect with AWS based in Mumbai, India. In his role at AWS, Fraser works closely with startups to design and build cloud-native solutions on AWS, with a focus on analytics and streaming workloads. With over 10 years of experience in cloud computing, Fraser has deep expertise in big data, real-time analytics, and building event-driven architecture on AWS.

How PostNL processes billions of IoT events with Amazon Managed Service for Apache Flink

Post Syndicated from Çağrı Çakır original https://aws.amazon.com/blogs/big-data/how-postnl-processes-billions-of-iot-events-with-amazon-managed-service-for-apache-flink/

This post is co-written with Çağrı Çakır and Özge Kavalcı from PostNL.

PostNL is the designated universal postal service provider for the Netherlands and has three main business units offering postal delivery, parcel delivery, and logistics solutions for ecommerce and cross-border solutions. With 5,800 retail points, 11,000 mailboxes, and over 900 automated parcel lockers, the company plays an important role in the logistics value chain. It aims to be the delivery organization of choice by making it as easy as possible to send and receive parcels and mail. With almost 34,000 employees, PostNL is at the heart of society. On a typical weekday, the company delivers an average of 1.1 million parcels and 6.9 million letters across Belgium, Netherlands, and Luxemburg.

In this post, we describe the legacy PostNL stream processing solution, its challenges, and why PostNL chose Amazon Managed Service for Apache Flink to help modernize their Internet of Things (IoT) data stream processing platform. We provide a reference architecture, describe the steps we took to migrate to Apache Flink, and the lessons learned along the way.

With this migration, PostNL has been able to build a scalable, robust, and extendable stream processing solution for their IoT platform. Apache Flink is a perfect fit for IoT. Scaling horizontally, it allows processing the sheer volume of data generated by IoT devices. With event time semantics, you can correctly handle events in the order they were generated, even from occasionally disconnected devices.

PostNL is excited about the potential of Apache Flink, and now plans to use Managed Service for Apache Flink with other streaming use cases and shift more business logic upstream into Apache Flink.

Apache Flink and Managed Service for Apache Flink

Apache Flink is a distributed computation framework that allows for stateful real-time data processing. It provides a single set of APIs for building batch and streaming jobs, making it straightforward for developers to work with bounded and unbounded data. Managed Service for Apache Flink is an AWS service that provides a serverless, fully managed infrastructure for running Apache Flink applications. Developers can build highly available, fault-tolerant, and scalable Apache Flink applications with ease and without needing to become an expert in building, configuring, and maintaining Apache Flink clusters on AWS.

The challenge of real-time IoT data at scale

Today, PostNL’s IoT platform, Roller Cages solution, tracks more than 380,000 assets with Bluetooth Low Energy (BLE) technology in near real time. The IoT platform was designed to provide availability, geofencing, and bottom state events of each asset by using telemetry sensor data such as GPS points and accelerometers that are coming from Bluetooth devices. Those events are used by different internal consumers to make logistical operations straightforward to plan, more efficient, and sustainable.

PostNL Roller cages tracking solution

Tracking this high volume of assets emitting different sensor readings inevitably creates billions of raw IoT events for the IoT platform as well as for the downstream systems. Handling this load repeatedly both within the IoT platform and throughout the downstream systems was neither cost-efficient nor easy to maintain. To reduce the cardinality of events, the IoT platform uses stream processing to aggregate data over fixed time windows. These aggregations must be based on the moment when the device emitted the event. This type of aggregation based on event time becomes complex when messages may be delayed and arrive out of order, which may frequently happen with IoT devices that can get disconnected temporarily.

The following diagram illustrates the overall flow from edge to the downstream systems.

PostNL IoT workflow

The workflow consists of the following components:

  1. The edge architecture includes IoT BLE devices that serve as sources of telemetry data, and gateway devices that connect these IoT devices to the IoT platform.
  2. Inlets contain a set of AWS services such as AWS IoT Core and Amazon API Gateway to collect IoT detections using MQTTS or HTTPS and deliver them to the source data stream using Amazon Kinesis Data Streams.
  3. The aggregation application filters IoT detections, aggregates them for a fixed time window, and sinks aggregations to the destination data stream.
  4. Event producers are the combination of different stateful services that generate IoT events such as geofencing, availability, bottom state, and in-transit.
  5. Outlets, including services such as Amazon EventBridge, Amazon Data Firehose, and Kinesis Data Streams, deliver produced events to consumers.
  6. Consumers, which are internal teams, interpret IoT events and build business logic based on them.

The core component of this architecture is the aggregation application. This component was originally implemented using a legacy stream processing technology. For several reasons, as we discuss shortly, PostNL decided to evolve this critical component. The journey of replacing the legacy stream processing with Managed Service for Apache Flink is the focus of the rest of this post.

The decision to migrate the aggregation application to Managed Service for Apache Flink

As the number of connected devices grows, so does the necessity for a robust and scalable platform capable of handling and aggregating massive volumes of IoT data. After thorough analysis, PostNL opted to migrate to Managed Service for Apache Flink, driven by several strategic considerations that align with evolving business needs:

  • Enhanced data aggregation – Using Apache Flink’s strong capabilities in real-time data processing enables PostNL to efficiently aggregate raw IoT data from various sources. The ability to extend the aggregation logic beyond what was provided by the current solution can unlock more sophisticated analytics and more informed decision-making processes.
  • Scalability – The managed service provides the ability to scale your application horizontally. This allows PostNL to handle increasing data volumes effortlessly as the number of IoT devices grows. This scalability means that data processing capabilities can expand in tandem with the business.
  • Focus on core business – By adopting a managed service, the IoT platform team can focus on implementing business logic and develop new use cases. The learning curve and overhead of operating Apache Flink at scale would have diverted valuable energies and resources of the relatively small team, slowing down the adoption process.
  • Cost-effectiveness – Managed Service for Apache Flink employs a pay-as-you-go model that aligns with operational budgets. This flexibility is particularly beneficial for managing costs in line with fluctuating data processing needs.

Challenges of handling late events

Common stream processing use cases require aggregating events based on when they were generated. This is called event time semantics. When implementing this type of logic, you may encounter the problem of delayed events, in which events reach your processing system late, long after other events generated around the same time.

Late events are common in IoT due to reasons inherent to the environment, such as network delays, device failures, temporarily disconnected devices, or downtime. IoT devices often communicate over wireless networks, which can introduce delays in transmitting data packets. And sometimes they may experience intermittent connectivity issues, resulting in data being buffered and sent in batches after connectivity is restored. This may result in events being processed out of order—some events may be processed several minutes after other events that were generated around the same time.

Imagine you want to aggregate events generated by devices within a specific 10-second window. If events can be several minutes late, how can you be sure you have received all events that were generated in those 10 seconds?

A simple implementation may just wait for several minutes, allowing late events to arrive. But this method means that you can’t calculate the result of your aggregation until several minutes later, increasing the output latency. Another solution would be waiting a few seconds, and then dropping any events arriving later.

Increasing latency or dropping events that may contain critical information are not palatable options for the business. The solution must be a good compromise, a trade-off between latency and completeness.

Apache Flink offers event time semantics out of the box. In contrast to other stream processing frameworks, Flink offers multiple options for dealing with late events. We dive into how Apache Flink deal with late events next.

A powerful stream processing API

Apache Flink provides a rich set of operators and libraries for common data processing tasks, including windowing, joins, filters, and transformations. It also includes over 40 connectors for various data sources and sinks, including streaming systems like Apache Kafka and Amazon Managed Streaming for Apache Kafka, or Kinesis Data Streams, databases, and also file system and object stores like Amazon Simple Storage Service (Amazon S3).

But the most important characteristic for PostNL is that Apache Flink offers different APIs with different level of abstractions. You can start with a higher level of abstraction, SQL, or Table API. These APIs abstract streaming data as more familiar tables, making them easier to learn for simpler use cases. If your logic becomes more complex, you can switch to the lower level of abstraction of the DataStream API, where streams are represented natively, closer to the processing happening inside Apache Flink. If you need the finest-grained level of control on how each single event is handled, you can switch to the Process Function.

A key learning has been that choosing one level of abstraction for your application is not an irreversible architectural decision. In the same application, you can mix different APIs, depending on the level of control you need at that specific step.

Scaling horizontally

To process billions of raw events and grow with the business, the ability to scale was an essential requirement for PostNL. Apache Flink is designed to scale horizontally, distributing processing and application state across multiple processing nodes, with the ability to scale out further when the workload grows.

For this particular use case, PostNL had to aggregate the sheer volume of raw events with similar characteristics and over time, to reduce their cardinality and make the data flow manageable for the other systems downstream. These aggregations go beyond simple transformations that handle one event at a time. They require a framework capable of stateful stream processing. This is exactly the type of use case Apache Flink was designed for.

Advanced event time semantics

Apache Flink emphasizes event time processing, which enables accurate and consistent handling of data with respect to the time it occurred. By providing built-in support for event time semantics, Flink can handle out-of-order events and late data gracefully. This capability was fundamental for PostNL. As mentioned, IoT generated events may arrive late and out of order. However, the aggregation logic must be based on the moment the measurement was actually taken by the device—the event time—and not when it’s processed.

Resiliency and guarantees

PostNL had to make sure no data sent from the device is lost, even in case of failure or restart of the application. Apache Flink offers strong fault tolerance guarantees through its distributed snapshot-based checkpointing mechanism. In the event of failures, Flink can recover the state of the computations and achieve exactly-once semantics of the result. For example, each event from a device is never missed nor counted twice, even in the event of an application failure.

The journey of choosing the right Apache Flink API

A key requirement of the migration was reproducing exactly the behavior of the legacy aggregation application, as expected by the downstream systems that can’t be modified. This introduced several additional challenges, in particular around windowing semantics and late event handling.

As we have seen, in IoT, events may be out of order by several minutes. Apache Flink offers two high-level concepts for implementing event time semantics with out-of-order events: watermarks and allowed lateness.

Apache Flink provides a range of flexible APIs with different levels of abstraction. After some initial research, Flink-SQL and the Table API were discarded. These higher levels of abstraction provide advanced windowing and event time semantics, but couldn’t provide the fine-grained control PostNL needed to reproduce exactly the behavior of the legacy application.

The lower level of abstraction of the DataStream API also offers windowing aggregation capabilities, and allows you to customize the behaviors with custom triggers, evictors, and handling late events by setting an allowed lateness.

Unfortunately, the legacy application was designed to handle late events in a peculiar way. The result was a hybrid event time and processing time logic that couldn’t be easily reproduced using high-level Apache Flink primitives.

Fortunately, Apache Flink offers a further lower level of abstraction, the ProcessFunction API. With this API, you have the finest-grained control on application state, and you can use timers to implement virtually any custom time-based logic.

PostNL decided to go in this direction. The aggregation was implemented using a KeyedProcessFunction that provides a way to perform arbitrary stateful processing on keyed streams—logically partitioned streams. Raw events from each IoT device are aggregated based on their event time (the timestamp written on the event by the source device) and the results of each window is emitted based on processing time (the current system time).

This fine-grained control finally allowed PostNL to reproduce exactly the behavior expected by the downstream applications.

The journey to production readiness

Let’s explore the journey of migrating to Managed Service for Apache Flink, from the start of the project to the rollout to production.

Identifying requirements

The first step of the migration process focused on thoroughly understanding the existing system’s architecture and performance metrics. The goal was to provide a seamless transition to Managed Service for Apache Flink with minimal disruption to ongoing operations.

Understanding Apache Flink

PostNL needed to familiarize themselves with the Managed Service for Apache Flink application and its streaming processing capabilities, including built-in windowing strategies, aggregation functions, event time vs. processing time differences, and finally KeyProcessFunction and mechanisms for handling late events.

Different options were considered, using primitives provided by Apache Flink out of the box, for event time logic and late events. The biggest requirement was to reproduce exactly the behavior of the legacy application. The ability to switch to using a lower level of abstraction helped. Using the finest-grained control allowed by the ProcessFunction API, PostNL was able to handle late events exactly as the legacy application.

Designing and implementing ProcessFunction

The business logic is designed using ProcessFunction to emulate the peculiar behavior of the legacy application in handling late events without excessively delaying the initial results. PostNL decided to use Java for the implementation, because Java is the primary language for Apache Flink. Apache Flink allows you to develop and test your application locally, in your preferred integrated development environment (IDE), using all the available debug tools, before deploying it to Managed Service for Apache Flink. Java 11 with Maven compiler was used for implementation. For more information about IDE requirements, refer to Getting started with Amazon Managed Service for Apache Flink (DataStream API).

Testing and validation

The following diagram shows the architecture used to validate the new application.

Testing architecture

To validate the behavior of the ProcessFunction and late event handling mechanisms, integration tests were designed to run both the legacy application and the Managed Service for Flink application in parallel (Steps 3 and 4). This parallel execution allowed PostNL to directly compare the results generated by each application under identical conditions. Multiple integration test cases push data to the source stream (2) in parallel (7) and wait until their aggregation window is complete, then they pull the aggregated results from the destination stream to compare (8). Integration tests are automatically triggered by the CI/CD pipeline after deployment of the infrastructure is complete. During the integration tests, the primary focus was on achieving data consistency and processing accuracy between the legacy application and the Managed Service for Flink application. The output streams, aggregated data, and processing latencies were compared to validate that the migration didn’t introduce any unexpected discrepancies. For writing and running the integration tests, Robot Framework, an open source automation framework, was utilized.

After the integration tests are passed, there is one more validation layer: end-to-end tests. Similar to the integration tests, end-to-end tests are automatically invoked by the CI/CD pipeline after the deployment of the platform infrastructure is complete. This time, multiple end-to-end test cases send data to AWS IoT Core (1) in parallel (9) and check the aggregated results from the destination S3 bucket (5, 6) dumped from the output stream to compare (10).

Deployment

PostNL decided to run the new Flink application on shadow mode. The new application ran for some time in parallel with the legacy application, consuming exactly the same inputs, and sending output from both applications to a data lake on Amazon S3. This allowed them to compare the results of the two applications using real production data, and also to test the stability and performance of the new one.

Performance optimization

During migration, the PostNL IoT platform team learned how the Flink application can be fine-tuned for optimal performance, considering factors such as data volume, processing speed, and efficient late event handling. A particularly interesting aspect was to verify that the state size wasn’t increasing unbounded over the long term. A risk of using the finest-grained control of ProcessFunction is state leak. This happens when your implementation, directly controlling the state in the ProcessFunction, misses some corner cases where a state is never deleted. This causes the state to grow unbounded. Because streaming applications are designed to run continuously, an expanding state can degrade performance and eventually exhaust memory or local disk space.

With this phase of testing, PostNL found the right balance of application parallelism and resources—including compute, memory, and storage—to process the normal daily workload profile without lag, and handle occasional peaks without over-provisioning, optimizing both performance and cost-effectiveness.

Final switch

After running the new application in shadow mode for some time, the team decided the application was stable and emitting the expected output. The PostNL IoT platform finally switched over to production and shut down the legacy application.

Key takeaways

Among the several learnings gathered in the journey of adopting Managed Service for Apache Flink, some are particularly important, and proving key when expanding to new and diverse use cases:

  • Understand event time semantics – A deep understanding of event time semantics is crucial in Apache Flink for accurately implementing time-dependent data operations. This knowledge makes sure events are processed correctly relative to when they actually occurred.
  • Use the powerful Apache Flink API – Apache Flink’s API allows for the creation of complex, stateful streaming applications beyond basic windowing and aggregations. It’s important to fully grasp the extensive capabilities offered by the API to tackle sophisticated data processing challenges.
  • With power comes more responsibility – The advanced functionality of Apache Flink’s API brings significant responsibility. Developers must make sure applications are efficient, maintainable, and stable, requiring careful resource management and adherence to best practices in coding and system design.
  • Don’t mix event time and processing time logic – Combining event time and processing time for data aggregation presents unique challenges. It prevents you from using higher-level functionalities provided out of the box by Apache Flink. The lowest level of abstractions among Apache Flink APIs allow for implementing custom time-based logic, but require a careful design to achieve accuracy and timely results, alongside extensive testing to validate good performance.

Conclusion

In the journey of adopting Apache Flink, the PostNL team learned how the powerful Apache Flink APIs allow you to implement complex business logic. The team came to appreciate how Apache Flink can be utilized to solve several and diverse problems, and they are now planning to extend it to more stream processing use cases.

With Managed Service for Apache Flink, the team was able to focus on the business value and implementing the required business logic, without worrying about the heavy lifting of setting up and managing an Apache Flink cluster.

To learn more about Managed Service for Apache Flink and choosing the right managed service option and API for your use case, see What is Amazon Managed Service for Apache Flink. To experience hands-on how to develop, deploy, and operate Apache Flink applications on AWS, see the Amazon Managed Service for Apache Flink Workshop.


About the Authors

Çağrı ÇakırÇağrı Çakır is the Lead Software Engineer for the PostNL IoT platform, where he manages the architecture that processes billions of events each day. As an AWS Certified Solutions Architect Professional, he specializes in designing and implementing event-driven architectures and stream processing solutions at scale. He is passionate about harnessing the power of real-time data, and dedicated to optimizing operational efficiency and innovating scalable systems.

Ozge KavalciÖzge Kavalcı works as Senior Solution Engineer for the PostNL IoT platform and loves to build cutting-edge solutions that integrate with the IoT landscape. As an AWS Certified Solutions Architect, she specializes in designing and implementing highly scalable serverless architectures and real-time stream processing solutions that can handle unpredictable workloads. To unlock the full potential of real-time data, she is dedicated to shaping the future of IoT integration.

Amit SinghAmit Singh works as a Senior Solutions Architect at AWS with enterprise customers on the value proposition of AWS, and participates in deep architectural discussions to make sure solutions are designed for successful deployment in the cloud. This includes building deep relationships with senior technical individuals to enable them to be cloud advocates. In his free time, he likes to spend time with his family and learn more about everything cloud.

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Solutions Architect at AWS helping customers across EMEA. He has been building cloud-centered, data-intensive systems for several years, working in the finance industry both through consultancies and for fintech product companies. He has used open-source technologies extensively and contributed to several projects, including Apache Flink.

Improve your Amazon OpenSearch Service performance with OpenSearch Optimized Instances

Post Syndicated from Jatinder Singh original https://aws.amazon.com/blogs/big-data/improve-your-amazon-opensearch-service-performance-with-opensearch-optimized-instances/

Amazon OpenSearch Service introduced the OpenSearch Optimized Instances (OR1), deliver price-performance improvement over existing instances. The newly introduced OR1 instances are ideally tailored for heavy indexing use cases like log analytics and observability workloads.

OR1 instances use a local and a remote store. The local storage utilizes either Amazon Elastic Block Store (Amazon EBS) of type gp3 or io1 volumes, and the remote storage uses Amazon Simple Storage Service (Amazon S3). For more details about OR1 instances, refer to Amazon OpenSearch Service Under the Hood: OpenSearch Optimized Instances (OR1).

In this post, we conduct experiments using OpenSearch Benchmark to demonstrate how the OR1 instance family improves indexing throughput and overall domain performance.

Getting started with OpenSearch Benchmark

OpenSearch Benchmark, a tool provided by the OpenSearch Project, comprehensively gathers performance metrics from OpenSearch clusters, including indexing throughput and search latency. Whether you’re tracking overall cluster performance, informing upgrade decisions, or assessing the impact of workflow changes, this utility proves invaluable.

In this post, we compare the performance of two clusters: one powered by memory-optimized instances and the other by OR1 instances. The dataset comprises HTTP server logs from the 1998 World Cup website. With the OpenSearch Benchmark tool, we conduct experiments to assess various performance metrics, such as indexing throughput, search latency, and overall cluster efficiency. Our aim is to determine the most suitable configuration for our specific workload requirements.

You can install OpenSearch Benchmark directly on a host running Linux or macOS, or you can run OpenSearch Benchmark in a Docker container on any compatible host.

OpenSearch Benchmark includes a set of workloads that you can use to benchmark your cluster performance. Workloads contain descriptions of one or more benchmarking scenarios that use a specific document corpus to perform a benchmark against your cluster. The document corpus contains indexes, data files, and operations invoked when the workflow runs.

When assessing your cluster’s performance, it is recommended to use a workload similar to your cluster’s use cases, which can save you time and effort. Consider the following criteria to determine the best workload for benchmarking your cluster:

  • Use case – Selecting a workload that mirrors your cluster’s real-world use case is essential for accurate benchmarking. By simulating heavy search or indexing tasks typical for your cluster, you can pinpoint performance issues and optimize settings effectively. This approach makes sure benchmarking results closely match actual performance expectations, leading to more reliable optimization decisions tailored to your specific workload needs.
  • Data – Use a data structure similar to that of your production workloads. OpenSearch Benchmark provides examples of documents within each workload to understand the mapping and compare with your own data mapping and structure. Every benchmark workload is composed of the following directories and files for you to compare data types and index mappings.
  • Query types – Understanding your query pattern is crucial for detecting the most frequent search query types within your cluster. Employing a similar query pattern for your benchmarking experiments is essential.

Solution overview

The following diagram explains how OpenSearch Benchmark connects to your OpenSearch domain to run workload benchmarks.Scope of solution

The workflow comprises the following steps:

  1. The first step involves running OpenSearch Benchmark using a specific workload from the workloads repository. The invoke operation collects data about the performance of your OpenSearch cluster according to the selected workload.
  2. OpenSearch Benchmark ingests the workload dataset into your OpenSearch Service domain.
  3. OpenSearch Benchmark runs a set of predefined test procedures to capture OpenSearch Service performance metrics.
  4. When the workload is complete, OpenSearch Benchmark outputs all related metrics to measure the workload performance. Metric records are by default stored in memory, or you can set up an OpenSearch Service domain to store the generated metrics and compare multiple workload executions.

In this post, we used the http_logs workload to conduct performance benchmarking. The dataset comprises 247 million documents designed for ingestion and offers a set of sample queries for benchmarking. Follow the steps outlined in the OpenSearch Benchmark User Guide to deploy OpenSearch Benchmark and run the http_logs workload.

Prerequisites

You should have the following prerequisites:

In this post, we deployed OpenSearch Benchmark in an AWS Cloud9 host using an Amazon Linux 2 instance type m6i.2xlarge with a capacity of 8 vCPUs, 32 GiB memory, and 512 TiB storage.

Performance analysis using the OR1 instance type in OpenSearch Service

In this post, we conducted a performance comparison between two different configurations of OpenSearch Service:

  • Configuration 1 – Cluster manager nodes and three data nodes of memory-optimized r6g.large instances
  • Configuration 2 – Cluster manager nodes and three data nodes of or1.larges instances

In both configurations, we use the same number and type of cluster manager nodes: three c6g.xlarge.

You can set up different configurations with the supported instance types in OpenSearch Service to run performance benchmarks.

The following table summarizes our OpenSearch Service configuration details.

  Configuration 1 Configuration 2
Number of cluster manager nodes 3 3
Type of cluster manager nodes c6g.xlarge c6g.xlarge
Number of data nodes 3 3
Type of data node r6g.large or1.large
Data node: EBS volume size (GP3) 200 GB 200 GB
Multi-AZ with standby enabled Yes Yes

Now let’s examine the performance details between the two configurations.

Performance benchmark comparison

The http_logs dataset contains HTTP server logs from the 1998 World Cup website between April 30, 1998 and July 26, 1998. Each request consists of a timestamp field, client ID, object ID, size of the request, method, status, and more. The uncompressed size of the dataset is 31.1 GB with 247 million JSON documents. The amount of load sent to both domain configurations is identical. The following table displays the amount of time taken to run various aspects of an OpenSearch workload on our two configurations.

Category Metric Name

Configuration 1

(3* r6g.large data nodes)

Runtimes

Configuration 2

(3* or1.large data nodes)

Runtimes

Performance Difference
Indexing Cumulative indexing time of primary shards 207.93 min 142.50 min 31%
Indexing Cumulative flush time of primary shards 21.17 min 2.31 min 89%
Garbage Collection Total Young Gen GC time 43.14 sec 24.57 sec 43%
bulk-index-append p99 latency 10857.2 ms 2455.12 ms 77%
query-Mean Throughput 29.76 ops/sec 36.24 ops/sec 22%
query-match_all(default) p99 latency 40.75 ms 32.99 ms 19%
query-term p99 latency 7675.54 ms 4183.19 ms 45%
query-range p99 latency 59.5316 ms 51.2864 ms 14%
query-hourly_aggregation p99 latency 5308.46 ms 2985.18 ms 44%
query-multi_term_aggregation p99 latency 8506.4 ms 4264.44 ms 50%

The benchmarks show a notable enhancement across various performance metrics. Specifically, OR1.large data nodes demonstrate a 31% reduction in indexing time for primary shards compared to r6g.large data nodes. OR1.large data nodes also exhibit a 43% improvement in garbage collection efficiency and significant enhancements in query performance, including term, range, and aggregation queries.

The extent of improvement depends on the workload. Therefore, make sure to run custom workloads as expected in your production environments in terms of indexing throughput, type of search queries, and concurrent requests.

Migration journey to OR1

The OR1 instance family is available in OpenSearch Service 2.11 or higher. Usually, if you’re using OpenSearch Service and you want to benefit from new released features in a specific version, you would follow the supported upgrade paths to upgrade your domain.

However, to use the OR1 instance type, you need to create a new domain with OR1 instances and then migrate your existing domain to the new domain. The migration journey to OpenSearch Service domain using an OR1 instance is similar to a typical OpenSearch Service migration scenario. Critical aspects involve determining the appropriate size for the target environment, selecting suitable data migration methods, and devising a seamless cutover strategy. These elements provide optimal performance, smooth data transition, and minimal disruption throughout the migration process.

To migrate data to a new OR1 domain, you can use the snapshot restore option or use Amazon OpenSearch Ingestion to migrate the data for your source.

For instructions on migration, refer to Migrating to Amazon OpenSearch Service.

Clean up

To avoid incurring continued AWS usage charges, make sure you delete all the resources you created as part of this post, including your OpenSearch Service domain.

Conclusion

In this post, we ran a benchmark to review the performance of the OR1 instance family compared to the memory-optimized r6g instance. We used OpenSearch Benchmark, a comprehensive tool for gathering performance metrics from OpenSearch clusters.

Learn more about how OR1 instances work and experiment with OpenSearch Benchmark to make sure your OpenSearch Service configuration matches your workload demand.


About the Authors

Jatinder Singh is a Senior Technical Account Manager at AWS and finds satisfaction in aiding customers in their cloud migration and innovation endeavors. Beyond his professional life, he relishes spending moments with his family and indulging in hobbies such as reading, culinary pursuits, and playing chess.

Hajer Bouafif is an Analytics Specialist Solutions Architect at Amazon Web Services. She focuses on Amazon OpenSearch Service and helps customers design and build well-architected analytics workloads in diverse industries. Hajer enjoys spending time outdoors and discovering new cultures.

Puneetha Kumara is a Senior Technical Account Manager at AWS, with over 15 years of industry experience, including roles in cloud architecture, systems engineering, and container orchestration.

Manpreet Kour is a Senior Technical Account Manager at AWS and is dedicated to ensuring customer satisfaction. Her approach involves a deep understanding of customer objectives, aligning them with software capabilities, and effectively driving customer success. Outside of her professional endeavors, she enjoys traveling and spending quality time with her family.

Author data integration jobs with an interactive data preparation experience with AWS Glue visual ETL

Post Syndicated from Chiho Sugimoto original https://aws.amazon.com/blogs/big-data/author-data-integration-jobs-with-an-interactive-data-preparation-experience-with-aws-glue-visual-etl/

We are excited to announce a new capability of the AWS Glue Studio visual editor that offers a new visual user experience. Now you can author data preparation transformations and edit them with the AWS Glue Studio visual editor. The AWS Glue Studio visual editor is a graphical interface that enables you to create, run, and monitor data integration jobs in AWS Glue.

The new data preparation interface in AWS Glue Studio provides an intuitive, spreadsheet-style view for interactively working with tabular data. Within this interface, you can visually inspect tabular data samples, validate recipe steps through real-time runs, and author data preparation recipes without writing code. Within the new experience, you can choose from hundreds of prebuilt transformations. This allows data analysts and data scientists to rapidly construct the necessary data preparation steps to meet their business needs. After you complete authoring the recipes, AWS Glue Studio will automatically generate the Python script to run the recipe data transformations as part of AWS Glue extract, transform, and load (ETL) jobs.

In this post, we show how to use this new feature to build a visual ETL job that preprocesses data to meet the business needs for an example use case, entirely within the AWS Glue Studio console, without the overhead of manual script coding.

Example use case

A fictional e-commerce company sells apparel and allows customers to leave text reviews and star ratings for each product, to help other customers to make informed purchase decisions. To simulate this, we will use a sample synthetic review dataset, which includes different products and customer reviews.

In this scenario, you’re a data analyst in this company. Your role involves preprocessing raw customer review data to prepare it for downstream analytics. This requires transforming the data by normalizing columns through actions such as casting columns to appropriate data types, splitting a single column into multiple new columns, and adding computed columns based on other columns. To quickly create an ETL job for these business requirements, you use AWS Glue Studio to inspect the data and author data preparation recipes.

The AWS Glue job will be configured to output the file to Amazon Simple Storage Service (Amazon S3) in a preferred format and automatically create a table in the AWS Glue Data Catalog. This Data Catalog table will be shared with your analyst team, allowing them to query the table using Amazon Athena.

Prerequisites

For this tutorial, you need an S3 bucket to store output from the AWS Glue ETL job and Athena queries, and a Data Catalog database to create new tables. You also need to create AWS Identity and Access Management (IAM) roles for the AWS Glue job and AWS Management Console user.

Create an S3 bucket to store output from the AWS Glue ETL jobs and Athena query results

You can either create a new S3 bucket or use an existing bucket to store output from the AWS Glue ETL job and Athena queries. In the following steps, replace <glue-etl-output-s3-bucket> and <athena-query-output-s3-bucket> with the name of the S3 bucket.

Create a Data Catalog database

You can either create a new Data Catalog database or use an existing database to create tables. In the following steps, replace <your_database> with the name of your database.

Create an IAM role for the AWS Glue job

Complete the following steps to create an IAM role for the AWS Glue job:

  1. On the IAM console, in the navigation pane, choose Role.
  2. Choose Create role.
  3. For Trusted entity type, choose AWS service.
  4. For Service or use case, choose Glue.
  5. Choose Next.
  6. For Add permissions, choose AWSGlueServiceRole, then choose Next.
  7. For Role name, enter a role name (for this post, GlueJobRole-recipe-demo).
  8. Choose Create role.
  9. Choose the created IAM role.
  10. Under Permissions policies, choose Add permission and Create inline policy.
  11. For Policy editor, choose JSON, and enter the following policy:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::aws-bigdata-blog/generated_synthetic_reviews/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:List*",
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::<glue-etl-output-s3-bucket>/*",
                    "arn:aws:s3:::<glue-etl-output-s3-bucket>"
                ]
            }
        ]
    }

  12. Choose Next.
  13. For Policy name, enter a name for your policy.
  14. Choose Create policy.

Create an IAM role for the console user

Complete the following steps to create the IAM role to interact with the console:

  1. On the IAM console, in the navigation pane, choose Role.
  2. Choose Create role.
  3. For Trusted entity type, choose the entity of your choice.
  4. For Add permissions, add the following AWS managed policies:
    1. AmazonAthenaFullAccess
    2. AWSGlueConsoleFullAccess
  5. Choose Next.
  6. For Role name, enter a role name of your choice.
  7. Choose Create role.
  8. Choose the created IAM role.
  9. Under Permissions policies, choose Add permission and Create inline policy.
  10. For Policy editor, choose JSON, and enter the following policy:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "Statement1",
                "Effect": "Allow",
                "Action": [
                    "iam:PassRole"
                ],
                "Resource": [
                    "arn:aws:iam::<account-id>:role/GlueJobRole-recipe-demo"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject"
                ],
                "Resource": [
                    "arn:aws:s3:::aws-bigdata-blog/generated_synthetic_reviews/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:List*",
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::<glue-etl-output-s3-bucket>/*",
                    "arn:aws:s3:::<athena-query-output-s3-bucket>/*"
                ]
            }
        ]
    }

  11. Choose Next.
  12. For Policy name, enter a name for your policy.
  13. Choose Create policy.

The S3 bucket and IAM roles required for this tutorial have been created and configured. Switch to the console user role that you set up and proceed with the steps in the following sections.

Author and run a data integration job using the interactive data preparation experience

Let’s create an AWS Glue ETL job in AWS Glue Studio. In this ETL job, we load S3 Parquet files as the source, process the data using recipe steps, and write the output to Amazon S3 as Parquet. You can configure all these steps in the visual editor in AWS Glue Studio. We use the new data preparation authoring capabilities to create recipes that meet our specific business needs for data transformations. This exercise will demonstrate how you can develop data preparation recipes in AWS Glue Studio that are tailored to your use case and can be readily incorporated into scalable ETL jobs. Complete the following steps:

  1. On the AWS Glue Studio console, choose Visual ETL in the navigation pane.
  2. Under Create job, choose Visual ETL.
  3. At the top of the job, replace “Untitled job” with a name of your choice.
  4. On the Job Details tab, under Basic properties, specify the IAM role that the job will use (GlueJobRole-recipe-demo).
  5. Choose Save.
  6. On the Visual tab, choose the plus sign to open the Add nodes menu. Search for s3 and add an Amazon S3 as a Source.
  1. For S3 source type, choose S3 location.
  2. For S3 URL, specify s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Apparel/.
  3. For Data format, select Parquet.
  4. As a child of this source, search in the Add nodes menu for recipe and add the Data Preparation Recipe
  5. In the Data preview window, choose Start session if it has not been started.
    1. If it hasn’t been started, Start a data preview session will be displayed on the Data Preparation Recipe
    2. Choose your IAM role for the AWS Glue job.
    3. Choose Start session.
  1. After your data preview session has been started, on the Data Preparation Recipe transform, choose Author Recipe to open the data preparation recipe editor.

This will initialize a session using a subset of the data. After session initialization, the AWS Glue Studio console provides an interactive interface that enables intuitive construction of recipe steps for AWS Glue ETL jobs.

As described in our example use case, you’re authoring recipes to preprocess customer review data for analysis. Upon reviewing the spreadsheet-style data preview, you notice the product_title column contains values like business formal pants, plain and business formal jeans, patterned, with the product name and sub-attribute separated by a comma. To better structure this data for downstream analysis, you decide to split the product_title column on the comma delimiter to create separate columns for the product name and sub-attribute. This will allow for easier filtering and aggregation by product type or attribute during analysis.

On the spreadsheet-style UI, you can check the statistics of each column like Min, Median, Max, cardinality, and value distribution for a subset of the data. This provides useful insights about the data to inform transformation decisions. When reviewing the statistics for the review_year columns, you notice they contain a wide range of values spanning over 15 years. To enable easier analysis of seasonal and weekly trends, you decide to derive new columns showing the week number and day of the week computed from the review_date column.

Moreover, for convenience of downstream analysis, you decided to change the data type of the customer_id and product_id columns from string to integer. Converting data types is a common task in ETL workflows for analytics. The data preparation recipes in AWS Glue Studio provide a wide variety of common ETL transformations like renaming columns, deleting columns, sorting, and reordering columns. Feel free to browse the data preparation UI to discover other available recipes that can help transform your data.

Let’s see how to implement the recipe step in the Data Preparation Recipe transform to meet these requirements.

  1. Select the customer_id column and choose the Change type recipe step.
    1. For Change type to, choose integer.
    2. Choose Apply to add the recipe step.
  1. Select the product_id column and choose the Change type recipe step.
    1. For Change type to, choose integer.
    2. Choose Apply.
  2. Select the product_title column and choose On a single delimiter under SPLIT.
    1. For Delimiter, select Enter custom value and enter ,.
    2. Choose Apply.
  1. Select the review_date column and choose Week number under EXTRACT.
    1. For Destination column, enter review_date_week_number.
    2. Choose Apply.
  1. Select the review_date column and choose Day of week under EXTRACT.
    1. For Destination column, enter review_date_week_day.
    2. Choose Apply.

After these recipe steps were applied, you can see the customer_id and product_id columns have been converted to integer, the product_title column has been split into product_title1 and product_title2, and review_date_week_number and review_date_week_day have been added. While authoring data preparation recipe steps, you can view tabular data and inspect whether the recipe steps are working as expected. This enables interactive validation of recipe steps through the subset examination results previewed in the UI during the recipe authoring process.

  1. Choose Done authoring recipe to close the interface.

Now, on the Script tab in AWS Glue Studio console, you can see the script generated from the recipe steps. AWS Glue Studio automatically converts the recipe steps configured through the UI into the Python code. This allows you to build ETL jobs utilizing the wide range of transformations available in data preparation recipes, without having to manually code the logic yourself.

  1. Choose Save to save the job.
  2. On the Visual tab, search in the Add nodes menu for s3 and add an Amazon S3 as a Target.
    1. For Format, choose Parquet.
    2. For Compression Type, choose Snappy.
    3. For S3 Target Location, select your output S3 location s3://<glue-etl-output-s3-bucket>/output/.
    4. For Data Catalog update options, choose Create a table in the Data Catalog and on subsequent runs, update the schema and add new partitions.
    5. For Database, choose the database of your choice.
    6. For Table name, enter data_preparation_recipe_demo_tbl.
    7. Under Partition keys, choose Add a partition key, and select review_year.
  3. Choose Save, then choose Run to run the job.

Up to this point, we have created and run the ETL job. When the job has finished running, a table named data_preparation_recipe_demo_tbl has been created in the Data Catalog. The table has the partition column review_year with partitions for the years 2000–2016. Let’s move on to the next step and query the table.

Run queries on the output data with Athena

Now that the AWS Glue ETL job is complete, let’s query the transformed output data. As a sample analysis, let’s find the top three items that were reviewed in 2008 across all marketplaces and calculate the average star rating for those items. Then, for the top one item that was reviewed in 2008, we find the top five sub-attributes for it. This will demonstrate querying the new processed dataset to derive insights.

  1. On the Athena console, run the following query against the table:
    SELECT count(*) AS count, product_title_1, avg(star_rating) AS ave 
    FROM <your_database>.data_preparation_recipe_demo_tbl 
    WHERE review_year = 2008
    GROUP BY product_title_1
    ORDER BY count DESC
    LIMIT 3;

This query counts the number of reviews in 2008 for each product_title_1 and returns the top three most reviewed items. It also calculates the average star_rating for each of the top three items. The query will return results as shown in the following screenshot.

The item made with natural materials heels is the top one most reviewed item. Now let’s query the top five most reviewed attributes for it.

  1. Run the following query against the table:
    SELECT count(*) AS count, product_title_2, avg(star_rating) AS ave 
    FROM <your_database>.data_preparation_recipe_demo_tbl
    WHERE review_year = 2008 
    AND product_title_1 = 'made with natural materials heels'
    GROUP BY product_title_2
    ORDER BY count DESC
    LIMIT 5;

The query will return results as shown in the following screenshot.

The query results show that for the top reviewed item made with natural materials heels, the top five most reviewed sub-attributes in 2008 were draped, asymmetric, muted, polka-dotted, and oversized. Of these top five sub-attributes, draped had the highest average star rating.

Through this walkthrough, we were able to quickly build an ETL job and generate datasets that fulfill analytics needs, without the overhead of manual script coding.

Clean up

If you no longer need this solution, you can delete the following resources created in this tutorial:

  • S3 bucket (s3://<glue-etl-output-s3-bucket>, s3://<athena-query-output-s3-bucket>)
  • IAM roles for the AWS Glue job (GlueJobRole-recipe-demo) and the console user
  • AWS Glue ETL job
  • Data Catalog database (<your_database>) and table (data_preparation_recipe_demo_tbl)

Conclusion

In this post, we introduced the new AWS Glue data preparation authoring experience, which lets you create new low-code no-code data integration recipe transformations directly on the AWS Glue Studio console. We demonstrated how you can use this feature to quickly build ETL jobs and generate datasets that meet your business needs without time-consuming manual coding.

The AWS Glue data preparation authoring experience is now publicly available. Try out this new capability and discover recipes that can facilitate your data transformations.

To learn more about using the interactive data preparation authoring experience in AWS Glue Studio, check out the following video and read the AWS News Blog.


About the Authors

Chiho Sugimoto is a Cloud Support Engineer on the AWS Big Data Support team. She is passionate about helping customers build data lakes using ETL workloads. She loves planetary science and enjoys studying the asteroid Ryugu on weekends.

Fabrizio Napolitano is a Principal Specialist Solutions Architect or Data Analytics at AWS. He has worked in the analytics domain for the last 20 years, now focusing on helping Canadian public sector organizations innovate with data. Quite by surprise, he become a Hockey Dad after moving to Canada.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Gal HeyneGal Heyne is a Technical Product Manager for AWS Data Processing services with a strong focus on AI/ML, data engineering, and BI. She is passionate about developing a deep understanding of customers’ business needs and collaborating with engineers to design easy-to-use data services products.

Accelerate query performance with Apache Iceberg statistics on the AWS Glue Data Catalog

Post Syndicated from Sotaro Hikita original https://aws.amazon.com/blogs/big-data/accelerate-query-performance-with-apache-iceberg-statistics-on-the-aws-glue-data-catalog/

Today, we are pleased to announce a new capability for the AWS Glue Data Catalog: generating column-level aggregation statistics for Apache Iceberg tables to accelerate queries. These statistics are utilized by cost-based optimizer (CBO) in Amazon Redshift Spectrum, resulting in improved query performance and potential cost savings.

Apache Iceberg is an open table format that provides the capability of ACID transactions on your data lakes. It’s designed to process large analytics datasets and is efficient for even small row-level operations. It also enables useful features such as time-travel, schema evolution, hidden partitioning, and more.

AWS has invested in service integration with Iceberg to enable Iceberg workloads based on customer feedback. One example is the AWS Glue Data Catalog. The Data Catalog is a centralized repository that stores metadata about your organization’s datasets, making the data visible, searchable, and queryable for users. The Data Catalog supports Iceberg tables and tracks the table’s current metadata. It also allows automatic compaction of individual small files produced by each transactional write on tables into a few large files for faster read and scan operations.

In 2023, the Data Catalog announced support for column-level statistics for non-Iceberg tables. That feature collects table statistics used by the query engine’s CBO. Now, the Data Catalog expands this support to Iceberg tables. The Iceberg table’s column statistics that the Data Catalog generates are based on Puffin Spec and stored on Amazon Simple Storage Service (Amazon S3) with other table data. This way, various engines supporting Iceberg can utilize and update them.

This post demonstrates how column-level statistics for Iceberg tables work with Redshift Spectrum. Furthermore, we showcase the performance benefit of the Iceberg column statistics with the TPC-DS dataset.

How Iceberg table’s column statistics works

AWS Glue Data Catalog generates table column statistics using the Theta Sketch algorithm on Apache DataSketches to estimate the number of distinct values (NDV) and stores them in Puffin file.

For SQL planners, NDV is an important statistic to optimize query planning. There are a few scenarios where NDV statistics can potentially optimize query performance. For example, when joining two tables on a column, the optimizer can use the NDV to estimate the selectivity of the join. If one table has a low NDV for the join column compared to the other table, the optimizer may choose to use a broadcast join instead of a shuffle join, reducing data movement and improving query performance. Moreover, when there are more than two tables to be joined, the optimizer can estimate the output size of each join and plan the efficient join order. Furthermore, NDV can be used for various optimizations such as group by, distinct, and count query.

However, calculating NDV continuously with 100% accuracy requires O(N) space complexity. Instead, Theta Sketch is an efficient algorithm that allows you to estimate the NDV in a dataset without needing to store all the distinct values on memory and storage. The key idea behind Theta Sketch is to hash the data into a range between 0–1, and then select only a small portion of the hashed values based on a threshold (denoted as θ). By analyzing this small subset of data, the Theta Sketch algorithm can provide an accurate estimate of the NDV in the original dataset.

Iceberg’s Puffin file is designed to store information such as indexes and statistics as a blob type. One of the representative blob types that can be stored is apache-datasketches-theta-v1, which is serialized values for estimating the NDV using the Theta Sketch algorithm. Puffin files are linked to a snapshot-id on Iceberg’s metadata and are utilized by the query engine’s CBO to optimize query plans.

Leverage Iceberg column statistics through Amazon Redshift

To demonstrate the performance benefit of this capability, we employ the industry-standard TPC-DS 3 TB dataset. We compare the query performance with and without Iceberg column statistics for the tables by running queries in Redshift Spectrum. We have included the queries used in this post, and we recommend trying your own queries by following the workflow.

The following is the overall steps:

  1. Run AWS Glue Job that extracts TPS-DS dataset from Public Amazon S3 bucket and saves them as an Iceberg table in your S3 bucket. AWS Glue Data Catalog stores those tables’ metadata location. Query these tables using Amazon Redshift Spectrum.
  2. Generate column statistics: Employ the enhanced capabilities of AWS Glue Data Catalog to generate column statistics for each tables. It generates puffin files storing Theta Sketch.
  3. Query with Amazon Redshift Spectrum: Evaluate the performance benefit of column statistics on query performance by utilizing Amazon Redshift Spectrum to run queries on the dataset.

The following diagram illustrates the architecture.

To try this new capability, we complete the following steps:

  1. Set up resources with AWS CloudFormation.
  2. Run an AWS Glue job to create Iceberg tables for the 3TB TPC-DS dataset in your S3 bucket. The Data Catalog stores those tables’ metadata location.
  3. Run queries on Redshift Spectrum and note the query duration.
  4. Generate Iceberg column statistics for Data Catalog tables.
  5. Run queries on Redshift Spectrum and compare the query duration with the previous run.
  6. Optionally, schedule AWS Glue column statistics jobs using AWS Lambda and an Amazon EventBridge

Set up resources with AWS CloudFormation

This post includes a CloudFormation template for a quick setup. You can review and customize it to suit your needs. Note that this CloudFormation template requires a region with at least 3 Availability Zones. The template generates the following resources:

  • A virtual private cloud (VPC), public subnet, private subnets, and route tables
  • An Amazon Redshift Serverless workgroup and namespace
  • An S3 bucket to store the TPC-DS dataset, column statistics, job scripts, and so on
  • Data Catalog databases
  • An AWS Glue job to extract the TPS-DS dataset from the public S3 bucket and save the data as an Iceberg table in your S3 bucket
  • AWS Identity and Access Management (AWS IAM) roles and policies
  • A Lambda function and EventBridge schedule to run the AWS Glue column statistics on a schedule

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack.
  3. Choose Next.
  4. Leave the parameters as default or make appropriate changes based on your requirements, then choose Next.
  5. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  6. Choose Create.

This stack can take around 10 minutes to complete, after which you can view the deployed stack on the AWS CloudFormation console.

Run an AWS Glue job to create Iceberg tables for the 3TB TPC-DS dataset

When the CloudFormation stack creation is complete, run the AWS Glue job to create Iceberg tables for the TPC-DS dataset. This AWS Glue job extracts the TPC-DS dataset from the public S3 bucket and transforms the data into Iceberg tables. Those tables are loaded into your S3 bucket and registered to the Data Catalog.

To run the AWS Glue job, complete the following steps:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose InitialDataLoadJob-<your-stack-name>.
  3. Choose Run.

This AWS Glue job can take around 30 minutes to complete. The process is complete when the job processing status shows as Succeeded.

The AWS Glue job creates tables storing the TPC-DS dataset in two identical databases: tpcdsdbnostats and tpcdsdbwithstats. The tables in tpcdsdbnostats will have no generated statistics, and we use them as reference. We generate statistics on tables in tpcdsdbwithstats. Confirm the creation of those two databases and underlying tables on the AWS Glue console. At this time, those databases hold the same data and there are no statistics generated on the tables.

Run queries on Redshift Spectrum without statistics

In the previous steps, you set up a Redshift Serverless workgroup with the given RPU (128 by default), prepared the TPC-DS 3TB dataset in your S3 bucket, and created Iceberg tables (which currently don’t have statistics).

To run your query in Amazon Redshift, complete the following steps:

  1. Download the Amazon Redshift queries.
  2. In the Redshift query editor v2, run the queries listed in the Redshift Query for tables without column statistics section in the downloaded file redshift-tpcds-sample.sql.
  3. Note the query runtime of each query.

Generate Iceberg column statistics

To generate statistics on the Data Catalog tables, complete the following steps:

  1. On the AWS Glue console, choose Databases under Data Catalog in the navigation pane.
  2. Choose the tpcdsdbwithstats database to view all available tables.
  3. Select any of these tables (for example, call_center).
  4. Go to Column statistics – new and choose Generate statistics.
  5. Keep the default options:
    1. For Choose columns, select Table (All columns).
    2. For Row sampling options, select All rows.
    3. For IAM role, choose AWSGluestats-blog-<your-stack-name>.
  6. Choose Generate statistics.

You’ll be able to see status of the statistics generation run as shown in the following screenshot.

After you generate the Iceberg table column statistics, you should be able to see detailed column statistics for that table.

Following the statistics generation, you will find an <id>.stat file in the AWS Glue table’s underlying data location in Amazon S3. This file is a Puffin file that stores the Theta Sketch data structure. Query engines can use this Theta Sketch algorithm to efficiently estimate the NDV when operating on the table, which helps optimize query performance.

Reiterate the previous steps to generate statistics for all tables, such as catalog_sales, catalog_returns, warehouse, item, date_dim, store_sales, customer, customer_address, web_sales, time_dim, ship_mode, web_site, and web_returns. Alternatively, you can manually run the Lambda function that instructs AWS Glue to generate column statistics for all tables. We discuss the details of this function later in this post.

After you generate statistics for all tables, you can assess the query performance for each query.

Run queries on Redshift Spectrum with statistics

In the previous steps, you set up a Redshift Serverless workgroup with the given RPU (128 by default), prepared the TPC-DS 3TB dataset in your S3 bucket, and created Iceberg tables with column statistics.

To run the provided query using Redshift Spectrum on the statistics tables, complete the following steps:

  1. In the Redshift query editor v2, run the queries listed in Redshift Query for tables with column statistics section in the downloaded file redshift-tpcds-sample.sql.
  2. Note the query runtime of each query.

With Redshift Serverless 128 RPU and the TPC-DS 3TB dataset, we conducted sample runs for 10 selected TPC-DS queries where NDV information was expected to be beneficial. We ran each query 10 times. The results shown in the following table are sorted by the percentage of the performance improvement for the queries with column statistics.

TPC-DS 3T Queries Without Column Statistics With Column Statistics Performance Improvement (%)
Query 16 305.0284 51.7807 489.1
Query 75 398.0643 110.8366 259.1
Query 78 169.8358 52.8951 221.1
Query 95 35.2996 11.1047 217.9
Query 94 160.52 57.0321 181.5
Query 68 14.6517 7.4745 96
Query 4 217.8954 121.996 78.6
Query 72 123.8698 76.215 62.5
Query 29 22.0769 14.8697 48.5
Query 25 43.2164 32.8602 31.5

The results demonstrated clear performance benefits ranging from 31.5–489.1%.

To dive deep, let’s explore query 16, which showed the highest performance benefit:

TPC-DS Query 16:

select
   count(distinct cs_order_number) as "order count"
  ,sum(cs_ext_ship_cost) as "total shipping cost"
  ,sum(cs_net_profit) as "total net profit"
from
   "awsdatacatalog"."tpcdsdbwithstats"."catalog_sales" cs1
  ,"awsdatacatalog"."tpcdsdbwithstats"."date_dim"
  ,"awsdatacatalog"."tpcdsdbwithstats"."customer_address"
  ,"awsdatacatalog"."tpcdsdbwithstats"."call_center"
where
    d_date between '2000-2-01' 
    and dateadd(day, 60, cast('2000-2-01' as date))
    and cs1.cs_ship_date_sk = d_date_sk
    and cs1.cs_ship_addr_sk = ca_address_sk
    and ca_state = 'AL'
    and cs1.cs_call_center_sk = cc_call_center_sk
    and cc_county in ('Dauphin County','Levy County','Luce County','Jackson County',
                    'Daviess County')
and exists (select *
            from "awsdatacatalog"."tpcdsdbwithstats"."catalog_sales" cs2
            where cs1.cs_order_number = cs2.cs_order_number
            and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
and not exists(select *
               from "awsdatacatalog"."tpcdsdbwithstats"."catalog_returns" cr1
               where cs1.cs_order_number = cr1.cr_order_number)
order by count(distinct cs_order_number)
limit 100;

You can compare the difference between the query plans with and without column statistics with the ANALYZE query.

The following screenshot shows the results without column statistics.

The following screenshot shows the results with column statistics.

You can observe some notable differences as a result of using column statistics. At a high level, the overall estimated cost of the query is significantly reduced, from 20633217995813352.00 to 331727324110.36.

The two query plans chose different join strategies.

The following is one line included in the query plan without column statistics:

XN Hash Join DS_DIST_BOTH (cost45365031.50 rows=10764790749 width=44)
" Outer Dist Key: ""outer"".cs_order_number"
Inner Dist Key: volt_tt_61c54ae740984.cs_order_number
" Hash Cond: ((""outer"".cs_order_number = ""inner"".cs_order_number) AND (""outer"".cs_warehouse_sk = ""inner"".cs_warehouse_sk))"

The following is the corresponding line in the query plan with column statistics:

XN Hash Join DS_BCAST_INNER (cost=307193250965.64..327130154786.68 rows=17509398 width=32)
" Hash Cond: ((""outer"".cs_order_number = ""inner"".cs_order_number) AND (""outer"".cs_warehouse_sk = ""inner"".cs_warehouse_sk))"

The query plan for the table without column statistics used DS_DIST_BOTH when joining large tables, whereas the query plan for the table with column statistics chose DS_BCAST_INNER. The join order has also changed based on the column statistics. Those join strategy and join order changes are mainly driven by more accurate join cardinality estimations, which are possible with column statistics, and result in a more optimized query plan.

Schedule AWS Glue column statistics Runs

Maintaining up-to-date column statistics is crucial for optimal query performance. This section guides you through automating the process of generating Iceberg table column statistics using Lambda and EventBridge Scheduler. This automation keeps your column statistics up to date without manual intervention.

The required Lambda function and EventBridge schedule are already created through the CloudFormation template. The Lambda function is used to invoke the AWS Glue column statistics run. First, complete the following steps to explore how the Lambda function is configured:

  1. On the Lambda console, choose Functions in the navigation pane.
  2. Open the function GlueTableStatisticsFunctionv1.

For a clearer understanding of the Lambda function, we recommend reviewing the code in the Code section and examining the environment variables under Configuration.

As shown in the following code snippet, the Lambda function invokes the start_column_statistics_task_run API through the AWS SDK for Python (Boto3) library.

Next, complete the following steps to explore how the EventBridge schedule is configured:

  1. On the EventBridge console, choose Schedules under Scheduler in the navigation pane.
  2. Locate the schedule created by the CloudFormation console.

This page is where you manage and configure the schedules for your events. As shown in the following screenshot, the schedule is configured to invoke the Lambda function daily at a specific time—in this case, 08:27 PM UTC. This makes sure the AWS Glue column statistics runs on a regular and predictable basis.

Clean up

When you have finished all the above steps, remember to clean up all the AWS resources you created using AWS CloudFormation:

  1. Delete the CloudFormation stack.
  2. Delete S3 bucket storing the Iceberg table for the TPC-DS dataset and the AWS Glue job script.

Conclusion

This post introduced a new feature in the Data Catalog that enables you to create Iceberg table column-level statistics. The Iceberg table stores Theta Sketch, which can be used to estimate NDV efficiently in a Puffin file. The Redshift Spectrum CBO can use that to optimize the query plan, resulting in improved query performance and potential cost savings.

Try out this new feature in the Data Catalog to generate column-level statistics and improve query performance, and let us know your feedback in the comments section. Visit the AWS Glue Catalog documentation to learn more.


About the Authors

Sotaro Hikita is a Solutions Architect. He supports customers in a wide range of industries, especially the financial industry, to build better solutions. He is particularly passionate about big data technologies and open source software.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Kyle Duong is a Senior Software Development Engineer on the AWS Glue and AWS Lake Formation team. He is passionate about building big data technologies and distributed systems.

Kalaiselvi Kamaraj is a Senior Software Development Engineer with Amazon. She has worked on several projects within the Amazon Redshift query processing team and currently focusing on performance-related projects for Redshift data lakes.

Sandeep Adwankar is a Senior Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Integrate your data and collaborate using data preparation in AWS Glue Studio

Post Syndicated from Veliswa Boya original https://aws.amazon.com/blogs/aws/integrate-your-data-and-collaborate-using-data-preparation-in-aws-glue-studio/

Today, we announce the general availability of data preparation authoring in AWS Glue Studio Visual ETL. This is a new no-code data preparation user experience for business users and data analysts with a spreadsheet-style UI that runs data integration jobs at scale on AWS Glue for Spark. The new visual data preparation experience makes it easier for data analysts and data scientists to clean and transform data to prepare it for analytics and machine learning (ML). Within this new experience, you can choose from hundreds of pre-built transformations to automate data preparation tasks, all without the need to write any code.

Business analysts can now collaborate with data engineers to build data integration jobs. Data engineers can use the Glue Studio visual flow-based view to define connections to the data and set the ordering of the data flow process. Business analysts can use the data preparation experience to define the data transformation and output. Additionally, you can import your existing AWS Glue DataBrew data cleansing and preparation “recipes” to the new AWS Glue data preparation experience. This way, you can continue to author them directly in AWS Glue Studio and then scale up recipes to process petabytes of data at the lower price point for AWS Glue jobs.

Visual ETL prerequisites (environment setup)
The visual ETL needs an AWSGlueConsoleFullAccess IAM managed policy attached to the users and roles that will access AWS Glue.


This policy grants these users and roles full access to AWS Glue and read access to Amazon Simple Storage Service (Amazon S3) resources.

Advanced visual ETL flows
Once the appropriate AWS Identity and Access Management (IAM) role permissions have been defined, author the visual ETL using AWS Glue Studio.

Extract
Create an Amazon S3 node by selecting the Amazon S3 node from the list of Sources.


Select the newly created node and browse for an S3 dataset. Once the file has been uploaded successfully, choose Infer schema to configure the source node and the visual interface will show the preview of the data contained in the .csv file.

Earlier I created an S3 bucket in the same Region as the AWS Glue visual ETL and uploaded a .csv file visual ETL conference data.csv containing the data that I will be visualizing.

It’s important to set up the role permissions as detailed in the previous step to grant AWS Glue access to read the S3 bucket. Without performing this step, you’ll get an error that ultimately prevents you from seeing the data preview.

Transform
After the node has been configured, add a Data Preparation Recipe and start a data preview session. Starting this session typically takes about 2 – 3 minutes.


Once the data preview session is ready, choose Author Recipe to start an authoring session and add transformations once the data frame is complete. During the authoring session, you can view the data, apply transformation steps, and view the transformed data interactively. You can undo, redo, and reorder the steps. You can visualize the data type of the column and the statistical properties of each column.


You can start applying transformation steps to your data such as changing formats from lowercase to uppercase, changing the sort order, and more, by choosing Add step. All your data preparation steps will be tracked in the recipe.
I wanted a view of conferences that will be hosted in South Africa, so I created two recipes to filter by condition where the Location column has values equal to “South Africa”, and the Comments column contains a value.


Load
Once you’ve prepared your data interactively, you can share your work with data engineers who can extend it with more advanced visual ETL flows and custom code to seamlessly integrate it into their production data pipelines.

Now available
The AWS Glue data preparation authoring experience is now publicly available in all commercial AWS Regions where AWS Data Brew is available. To learn more, visit AWS Glue.

For more information, visit the AWS Glue Developer Guide and send feedback to AWS re:Post for AWS Glue or through your usual AWS support contacts.

— Veliswa

Amazon DataZone introduces OpenLineage-compatible data lineage visualization in preview

Post Syndicated from Leonardo Gomez original https://aws.amazon.com/blogs/big-data/amazon-datazone-introduces-openlineage-compatible-data-lineage-visualization-in-preview/

We are excited to announce the preview of API-driven, OpenLineage-compatible data lineage in Amazon DataZone to help you capture, store, and visualize lineage of data movement and transformations of data assets on Amazon DataZone.

With the Amazon DataZone OpenLineage-compatible API, domain administrators and data producers can capture and store lineage events beyond what is available in Amazon DataZone, including transformations in Amazon Simple Storage Service (Amazon S3), AWS Glue, and other AWS services. This provides a comprehensive view for data consumers browsing in Amazon DataZone, who can gain confidence of an asset’s origin, and data producers, who can assess the impact of changes to an asset by understanding its usage.

In this post, we discuss the latest features of data lineage in Amazon DataZone, its compatibility with OpenLineage, and how to get started capturing lineage from other services such as AWS Glue, Amazon Redshift, and Amazon Managed Workflows for Apache Airflow (Amazon MWAA) into Amazon DataZone through the API.

Why it matters to have data lineage

Data lineage gives you an overarching view into data assets, allowing you to see the origin of objects and their chain of connections. Data lineage enables tracking the movement of data over time, providing a clear understanding of where the data originated, how it has changed, and its ultimate destination within the data pipeline. With transparency around data origination, data consumers gain trust that the data is correct for their use case. Data lineage information is captured at levels such as tables, columns, and jobs, allowing you to conduct impact analysis and respond to data issues because, for example, you can see how one field impacts downstream sources. This equips you to make well-informed decisions before committing changes and avoid unwanted changes downstream.

Data lineage in Amazon DataZone is an API-driven, OpenLineage-compatible feature that helps you capture and visualize lineage events from OpenLineage-enabled systems or through an API, to trace data origins, track transformations, and view cross-organizational data consumption. The lineage visualized includes activities inside the Amazon DataZone business data catalog. Lineage captures the assets cataloged as well as the subscribers to those assets and to activities that happen outside the business data catalog captured programmatically using the API.

Additionally, Amazon DataZone versions lineage with each event, enabling you to visualize lineage at any point in time or compare transformations across an asset’s or job’s history. This historical lineage provides a deeper understanding of how data has evolved, which is essential for troubleshooting, auditing, and enforcing the integrity of data assets.

The following screenshot shows an example lineage graph visualized with the Amazon DataZone data catalog.

Introduction to OpenLineage compatible data lineage

The need to capture data lineage consistently across various analytical services and combine them into a unified object model is key in uncovering insights from the lineage artifact. OpenLineage is an open source project that offers a framework to collect and analyze lineage. It also offers reference implementation of an object model to persist metadata along with integration to major data and analytics tools.

The following are key concepts in OpenLineage:

  • Lineage events – OpenLineage captures lineage information through a series of events. An event is anything that represents a specific operation performed on the data that occurs in a data pipeline, such as data ingestion, transformation, or data consumption.
  • Lineage entitiesEntities in OpenLineage represent the various data objects involved in the lineage process, such as datasets and tables.
  • Lineage runs – A lineage run represents a specific run of a data pipeline or a job, encompassing multiple lineage events and entities.
  • Lineage form types – Form types, or facets, provide additional metadata or context about lineage entities or events, enabling richer and more descriptive lineage information. OpenLineage offers facets for runs, jobs, and datasets, with the option to build custom facets.

The Amazon DataZone data lineage API is OpenLineage compatible and extends OpenLineage’s functionality by providing a materialization endpoint to persist the lineage outputs in an extensible object model. OpenLineage offers integrations for certain sources, and integration of these sources with Amazon DataZone is straightforward because the Amazon DataZone data lineage API understands the format and translates to the lineage data model.

The following diagram illustrates an example of the Amazon DataZone lineage data model.

In Amazon DataZone, every lineage node represents an underlying resource—there is a 1:1 mapping of the lineage node with a logical or physical resource such as table, view, or asset. The nodes represent a specific job with a specific run, or a node for a table or asset, and one node for a subscription target.

Each version of a node captures what happened to the underlying resource at that specific timestamp. In Amazon DataZone, lineage not only shares the story of data movement outside it, but it also represents the lineage of activities inside Amazon DataZone, such as asset creation, curation, publishing, and subscription.

To hydrate the lineage model in Amazon DataZone, two types of lineage are captured:

  • Lineage activities inside Amazon DataZone – This includes assets added to the catalog and published, and then details about the subscriptions are captured automatically. When you’re in the producer project context (for example, if the project you’re selected is the owning project of the asset you are browsing and you’re a member of that project), you will see two states of the dataset node:
    • The inventory asset type node defines the asset in the catalog that is in an unpublished stage. Other users can’t subscribe to the inventory asset. To learn more, refer to Creating inventory and published data in Amazon DataZone.
    • The published asset type represents the actual asset that is discoverable by data users across the organization. This is the asset type that can be subscribed by other project members. If you are a consumer and not part of the producing project of that asset, you will only see the published asset node.
  • Lineage activities outside of Amazon DataZone can be captured programmatically using the PostLineageEvent With these events captured either upstream or downstream of cataloged assets, data producers and consumers get a comprehensive view of data movement to check the origin of data or its consumption. We discuss how to use the API to capture lineage events later in this post.

There are two different types of lineage nodes available in Amazon DataZone:

  • Dataset node – In Amazon DataZone, lineage visualizes nodes that represent tables and views. Depending on the context of the project, the producers will be able to view both the inventory and published asset, whereas consumers can only view the published asset. When you first open the lineage tab on the asset details page, the cataloged dataset node will be the starting point for lineage graph traversal upstream or downstream. Dataset nodes include lineage nodes automated from Amazon DataZone and custom lineage nodes:
    • Automated dataset nodes – These nodes include information about AWS Glue or Amazon Redshift assets published in the Amazon DataZone catalog. They’re automatically generated and include a corresponding AWS Glue or Amazon Redshift icon within the node.
    • Custom dataset nodes – These nodes include information about assets that are not published in the Amazon DataZone catalog. They’re created manually by domain administrators (producers) and are represented by a default custom asset icon within the node. These are essentially custom lineage nodes created using the OpenLineage event format.
  • Job (run) node – This node captures the details of the job, which represents the latest run of a particular job and its run details. This node also captures multiple runs of the job and can be viewed on the History tab of the node details. Node details are made visible when you choose the icon.

Visualizing lineage in Amazon DataZone

Amazon DataZone offers a comprehensive experience for data producers and consumers. The asset details page provides a graphical representation of lineage, making it straightforward to visualize data relationships upstream or downstream. The asset details page provides the following capabilities to navigate the graph:

  • Column-level lineage – You can expand column-level lineage when available in dataset nodes. This automatically shows relationships with upstream or downstream dataset nodes if source column information is available.
  • Column search – If the dataset has more than 10 columns, the node presents pagination to navigate to columns not initially presented. To quickly view a particular column, you can search on the dataset node that lists just the searched column.
  • View dataset nodes only – If you want filter out the job nodes, you can choose the Open view control icon in the graph viewer and toggle the Display dataset nodes only This will remove all the job nodes from the graph and let you navigate just the dataset nodes.
  • Details pane – Each lineage node captures and displays the following details:
    • Every dataset node has three tabs: Lineage info, Schema, and History. The History tab lists the different versions of lineage event captured for that node.
    • The job node has a details pane to display job details with the tabs Job info and History. The details pane also captures queries or expressions run as part of the job.
  • Version tabs – All lineage nodes in Amazon DataZone data lineage will have versioning, captured as history, based on lineage events captured. You can view lineage at a selected timestamp that opens a new tab on the lineage page to help compare or contrast between the different timestamps.

The following screenshot shows an example of data lineage visualization.

You can experience the visualization with sample data by choosing Preview on the Lineage tab and choosing the Try sample lineage link. This opens a new browser tab with sample data to test and learn about the feature with or without a guided tour, as shown in the following screenshot.

Solution overview

Now that we understand the capabilities of the new data lineage feature in Amazon DataZone, let’s explore how you can get started in capturing lineage from AWS Glue tables and ETL (extract, transform, and load) jobs, Amazon Redshift, and Amazon MWAA.

The getting started scripts are also available in Amazon DataZone’s new GitHub repository.

Prerequisites

For this walkthrough, you should have the following prerequisites:

If the AWS account you use to follow this post uses AWS Lake Formation to manage permissions on the AWS Glue Data Catalog, make sure that you log in as a user with access to create databases and tables. For more information, refer to Implicit Lake Formation permissions.

Launch the CloudFormation stack

To create your resources for this use case using AWS CloudFormation, complete the following steps:

  1. Launch the CloudFormation stack in us-east-1:
  2. For Stack name, enter a name for your stack.
  3. Choose Next.
  4. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  5. Choose Create stack.

Wait for the stack formation to finish provisioning the resources. When you see the CREATE_COMPLETE status, you can proceed to the next steps.

Capture lineage from AWS Glue tables

For this example, we use CloudShell, which is a browser-based shell, to run the commands necessary to harvest lineage metadata from AWS Glue tables. Complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Select the AWSomeRetailCrawler crawler created by the CloudFormation template.
  3. Choose Run.

When the crawler is complete, you’ll see a Succeeded status.

Now let’s harvest the lineage metadata using CloudShell.

  1. Download the extract_glue_crawler_lineage.py file.
  2. On the Amazon DataZone console, open CloudShell.
  1. On the Actions menu, choose Update file.
  2. Upload the extract_glue_crawler_lineage.py file.

  3. Run the following commands:
    sudo yum -y install python3
    python3 -m venv env
    . env/bin/activate
    pip install boto3

You should get the following results.

  1. After all the libraries and dependencies are configured, run the following command to harvest the lineage metadata from the inventory table:
    python extract_glue_crawler_lineage.py -d awsome_retail_db -t inventory -r us-east-1 -i dzd_Your_doamin

  2. The script asks for verification of the settings provided; enter Yes.

You should receive a notification indicating that the script ran successfully.

After you capture the lineage information from the Inventory table, complete the following steps to run the data source.

  1. On the Amazon DataZone data portal, open the Sales
  2. On the Data tab, choose Data sources in the navigation pane.
  1. Select your data source job and choose Run.

For this example, we had a data source job called SalesDLDataSourceV2 already created pointing to the awesome_retail_db database. To learn more about how to create data source jobs, refer to Create and run an Amazon DataZone data source for the AWS Glue Data Catalog.

After the job runs successfully, you should see a confirmation message.

Now let’s view the lineage diagram generated by Amazon DataZone.

  1. On the Data inventory tab, choose the Inventory table.
  2. On the Inventory asset page, choose the new Lineage tab.

On the Lineage tab, you can see that Amazon DataZone created three nodes:

  • Job / Job run – This is based on the AWS Glue crawler used to harvest the asset technical metadata
  • Dataset – This is based on the S3 object that contains the data related to this asset
  • Table – This is the AWS Glue table created by the crawler

If you choose the Dataset node, Amazon DataZone offers information about the S3 object used to create the asset.

Capture data lineage for AWS Glue ETL jobs

In the previous section, we covered how to generate a data lineage diagram on top of a data asset. Now let’s see how we can create one for an AWS Glue job.

The CloudFormation template that we launched earlier created an AWS Glue job called Inventory_Insights. This job gets data from the Inventory table and creates a new table called Inventory_Insights with the aggregated data of the total products available in all the stores.

The CloudFormation template also copied the openlineage-spark_2.12-1.9.1.jar file to the S3 bucket created for this post. This file is necessary to generate lineage metadata from the AWS Glue job. We use version 1.9.1, which is compatible with AWS Glue 3.0, the version used to create the AWS Glue job for this post. If you’re using a different version of AWS Glue, you need to download the corresponding OpenLineage Spark plugin file that matches your AWS Glue version.

The OpenLineage Spark plugin is not able to extract data lineage from AWS Glue Spark jobs that use AWS Glue DynamicFrames. Use Spark SQL DataFrames instead.

  1. Download the extract_glue_spark_lineage.py file.
  2. On the Amazon DataZone console, open CloudShell.
  3. On the Actions menu, choose Update file.
  4. Upload the extract_glue_spark_lineage.py file.
  5. On the CloudShell console, run the following command (if your CloudShell session expired, you can open a new session):
    python extract_glue_spark_lineage.py —region "us-east-1" —domain-identifier 'dzd_Your Domain'

  6. Confirm the information showed by the script by entering yes.

You will see the following message; this means that the script is ready to get the AWS Glue job lineage metadata after you run it.

Now let’s run the AWS Glue job created by the Cloud formation template.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Select the Inventory_Insights job and choose Run job.

On the Job details tab, you will notice that the job has the following configuration:

  • Key --conf with value extraListeners=io.openlineage.spark.agent.OpenLineageSparkListener --conf spark.openlineage.transport.type=console --conf spark.openlineage.facets.custom_environment_variables=[AWS_DEFAULT_REGION;GLUE_VERSION;GLUE_COMMAND_CRITERIA;GLUE_PYTHON_VERSION;]
  • Key --user-jars-first with value true
  • Dependent JARs path set as the S3 path s3://{your bucket}/lib/openlineage-spark_2.12-1.9.1.jar
  • The AWS Glue version set as 3.0

During the run of the job, you will see the following output on the CloudShell console.

This means that the script has successfully harvested the lineage metadata from the AWS Glue job.

Now let’s create an AWS Glue table based on the data created by the AWS Glue job. For this example, we use an AWS Glue crawler.

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Select the AWSomeRetailCrawler crawler created by the CloudFormation template and choose Run.

When the crawler is complete, you will see the following message.

Now let’s open the Amazon DataZone portal to see how the diagram is represented in Amazon DataZone.

  1. On the Amazon DataZone portal, choose the Sales project.
  2. On the Data tab, choose Inventory data in the navigation pane.
  3. Choose the inventory insights asset

On the Lineage tab, you can see the diagram created by Amazon DataZone. It shows three nodes:

    • The AWS Glue crawler used to create the AWS Glue table
    • The AWS Glue table created by the crawler
    • The Amazon DataZone cataloged asset
  1. To see the lineage information about the AWS Glue job that you ran to create the inventory_insights table, choose the arrows icon on the left side of the diagram.

Now you can see the full lineage diagram for the Inventory_insights table.

  1. Choose the blue arrow icon in the inventory node to the left of the diagram.

You can see the evolution of the columns and the transformations that they had.

When you choose any of the nodes that are part of the diagram, you can see more details. For example, the inventory_insights node shows the following information.

Capture lineage from Amazon Redshift

Let’s explore how to generate a lineage diagram from Amazon Redshift. In this example, we use AWS Cloud9 because it allows us to configure the connection to the virtual private cloud (VPC) where our Redshift cluster resides. For more information about AWS Cloud9, refer to the AWS Cloud9 User Guide.

The CloudFormation template included as part of this post doesn’t cover the creation of a Redshift cluster or the creation of the tables used in this section. To learn more about how to create a Redshift cluster, see Step 1: Create a sample Amazon Redshift cluster. We use the following query to create the tables needed for this section of the post:

Create SCHEMA market

create table market.retail_sales (
  id BIGINT primary key,
  name character varying not null
);

create table market.online_sales (
  id BIGINT primary key,
  name character varying not null
);

/* Important to insert some data in the table */
INSERT INTO market.retail_sales
VALUES (123, 'item1')

INSERT INTO market.online_sales
VALUES (234, 'item2')

create table market.sales AS
Select id, name from market.retail_sales
Union ALL
Select id, name from market.online_sales;

Remember to add the IP address of your AWS Cloud9 environment to the security group with access to the Redshift cluster.

  1. Download the requirements.txt and extract_redshift_lineage.py files.
  2. On the File menu, choose Upload Local Files.
  3. Upload the requirements.txt and extract_redshift_lineage.py files.
  4. Run the following commands:
    # Install Python 
    sudo yum -y install python3
    
    # dependency set up 
    python3 -m venv env 
    . env/bin/activate
    
    pip install -r requirements.txt

You should be able to see the following messages.

  1. To set the AWS credentials, run the following command:
    export AWS_ACCESS_KEY_ID=<<Your Access Key>>
    export AWS_SECRET_ACCESS_KEY=<<Your Secret Access Key>>
    export AWS_SESSION_TOKEN=<<Your Session Token>>

  2. Run the extract_redshift_lineage.py script to harvest the metadata necessary to generate the lineage diagram:
    python extract_redshift_lineage.py \
     -r region \
     -i dzd_your_dz_domain_id \
     -n your-redshift-cluster-endpoint \
     -t your-rs-port \
     -d your-database \
     -s the-starting-date

  3. Next, you will be prompted to enter the user name and password for the connection to your Amazon DataZone database.
  4. When you receive a confirmation message, enter yes.

If the configuration was done correctly, you will see the following confirmation message.

Now let’s see how the diagram was created in Amazon DataZone.

  1. On the Amazon DataZone data portal, open the Sales project.
  2. On the Data tab, choose Data sources.
  3. Run the data source job.

For this post, we already created a data source job called Sales_DW_Enviroment-default-datasource to add the Redshift data source to our Amazon DataZone project. To learn how to create a data source job, refer to Create and run an Amazon DataZone data source for Amazon Redshift

After you run the job, you’ll see the following confirmation message.

  1. On the Data tab, choose Inventory data in the navigation pane.
  2. Choose the total_sales asset.
  1. Choose the Lineage tab.

Amazon DataZone create a three-node lineage diagram for the total sales table; you can choose any node to view its details.

  1. Choose the arrows icon next to the Job/ Job run node to view a more complete lineage diagram.
  1. Choose the Job / Job run

The Job Info section shows the query that was used to create the total sales table.

Capture lineage from Amazon MWAA

Apache Airflow is an open-source platform for developing, scheduling, and monitoring batch-oriented workflows. Amazon MWAA is a managed service for Airflow that lets you use your current Airflow platform to orchestrate your workflows. OpenLineage supports integration with Airflow 2.6.3 using the openlineage-airflow package, and the same can be enabled on Amazon MWAA as a plugin. Once enabled, the plugin converts Airflow metadata to OpenLineage events, which are consumable by DataZone.PostLineageEvent.

The following diagram shows the setup required in Amazon MWAA to capture data lineage using OpenLineage and publish it to Amazon DataZone.

The workflow uses an Amazon MWAA DAG to invoke a data pipeline. The process is as follows:

  1. The openlineage-airflow plugin is configured on Amazon MWAA as a lineage backend. Metadata about the DAG run is passed to the plugin, which converts it into OpenLineage format.
  2. The lineage information collected is written to Amazon CloudWatch log group according to the Amazon MWAA environment.
  3. A helper function captures the lineage information from the log file and publishes it to Amazon DataZone using the PostLineageEvent API.

The example used in the post uses Amazon MWAA version 2.6.3 and OpenLineage plugin version 1.4.1. For other Airflow versions supported by OpenLineage, refer to Supported Airflow versions.

Configure the OpenLineage plugin on Amazon MWAA to capture lineage

When harvesting lineage using OpenLineage, a Transport configuration needs to be set up, which tells OpenLineage where to emit the events to, for example the console or an HTTP endpoint. You can use ConsoleTransport, which logs the OpenLineage events in the Amazon MWAA task CloudWatch log group, which can then be published to Amazon DataZone using a helper function.

Specify the following in the requirements.txt file added to the S3 bucket configured for Amazon MWAA:

openlineage-airflow==1.4.1

In the Airflow logging configuration section under the MWAA configuration for the Airflow environment, enable Airflow task logs with log level INFO. The following screenshot shows a sample configuration.

A successful configuration will add a plugin to Airflow, which can be verified from the Airflow UI by choosing Plugins on the Admin menu.

In this post, we use a sample DAG to hydrate data to Redshift tables. The following screenshot shows the DAG in graph view.

Run the DAG and upon successful completion of a run, open the Amazon MWAA task CloudWatch log group for your Airflow environment (airflow-env_name-task) and filter based on the expression console.py to select events emitted by OpenLineage. The following screenshot shows the results.

Publish lineage to Amazon DataZone

Now that you have the lineage events emitted to CloudWatch, the next step is to publish them to Amazon DataZone to associate them to a data asset and visualize them on the business data catalog.

  1. Download the files requirements.txt and airflow_cw_parse_log.py and gather environment details like AWS region, Amazon MWAA environment name and Amazon DataZone Domain ID.
  2. The Amazon MWAA environment name can be obtained from the Amazon MWAA console.
  3. The Amazon DataZone domain ID can be obtained from Amazon DataZone service console or from the Amazon DataZone portal.
  4. Navigate to CloudShell and choose Upload files on the Actions menu to upload the files requirements.txt and extract_airflow_lineage.py.

  5. After the files are uploaded, run the following script to filter lineage events from the Airflow task logs and publish them to Amazon DataZone:
    # Set up virtual env and install dependencies
    python -m venv env
    pip install -r requirements.txt
    . env/bin/activate
    
    # run the script
    python extract_airflow_lineage.py \
      --region us-east-1 \
      --domain-identifier your_domain_identifier \
      --airflow-environment-name your_airflow_environment_name

The function extract_airflow_lineage.py filters the lineage events from the Amazon MWAA task log group and publishes the lineage to the specified domain within Amazon DataZone.

Visualize lineage on Amazon DataZone

After the lineage is published to DataZone, open your DataZone project, navigate to the Data tab and chose a data asset that was accessed by the Amazon MWAA DAG. In this case, it is a subscribed asset.

Navigate to the Lineage tab to visualize the lineage published to Amazon DataZone.

Choose a node to look at additional lineage metadata. In the following screenshot, we can observe the producer of the lineage has been marked as airflow.

Conclusion

In this post, we shared the preview feature of data lineage in Amazon DataZone, how it works, and how you can capture lineage events, from AWS Glue, Amazon Redshift, and Amazon MWAA, to be visualized as part of the asset browsing experience.

To learn more about Amazon DataZone and how to get started, refer to the Getting started guide. Check out the YouTube playlist for some of the latest demos of Amazon DataZone and short descriptions of the capabilities available.


About the Authors

Leonardo Gomez is a Principal Analytics Specialist at AWS, with over a decade of experience in data management. Specializing in data governance, he assists customers worldwide in maximizing their data’s potential while promoting data democratization. Connect with him on LinkedIn.

Priya Tiruthani is a Senior Technical Product Manager with Amazon DataZone at AWS. She focuses on improving data discovery and curation required for data analytics. She is passionate about building innovative products to simplify customers’ end-to-end data journey, especially around data governance and analytics. Outside of work, she enjoys being outdoors to hike, capture nature’s beauty, and recently play pickleball.

Ron Kyker is a Principal Engineer with Amazon DataZone at AWS, where he helps drive innovation, solve complex problems, and set the bar for engineering excellence for his team. Outside of work, he enjoys board gaming with friends and family, movies, and wine tasting.

Srinivasan Kuppusamy is a Senior Cloud Architect – Data at AWS ProServe, where he helps customers solve their business problems using the power of AWS Cloud technology. His areas of interests are data and analytics, data governance, and AI/ML.

Amazon Managed Service for Apache Flink now supports Apache Flink version 1.19

Post Syndicated from Francisco Morillo original https://aws.amazon.com/blogs/big-data/amazon-managed-service-for-apache-flink-now-supports-apache-flink-version-1-19/

Apache Flink is an open source distributed processing engine, offering powerful programming interfaces for both stream and batch processing, with first-class support for stateful processing and event time semantics. Apache Flink supports multiple programming languages, Java, Python, Scala, SQL, and multiple APIs with different level of abstraction, which can be used interchangeably in the same application.

Amazon Managed Service for Apache Flink offers a fully managed, serverless experience in running Apache Flink applications and now supports Apache Flink 1.19.1, the latest stable version of Apache Flink at the time of writing. AWS led the community release of the version 1.19.1, which introduces a number of bug fixes over version 1.19.0, released in March 2024.

In this post, we discuss some of the interesting new features and configuration changes available for Managed Service for Apache Flink introduced with this new release. In every Apache Flink release, there are exciting new experimental features. However, in this post, we are going to focus on the features most accessible to the user with this release.

Connectors

With the release of version 1.19.1, the Apache Flink community also released new connector versions for the 1.19 runtime. Starting from 1.16, Apache Flink introduced a new connector version numbering, following the pattern <connector-version>-<flink-version>. It’s recommended to use connectors for the runtime version you are using. Refer to Using Apache Flink connectors to stay updated on any future changes regarding connector versions and compatibility.

SQL

Apache Flink 1.19 brings new features and improvements, particularly in the SQL API. These enhancements are designed to provide more flexibility, better performance, and ease of use for developers working with Flink’s SQL API. In this section, we delve into some of the most notable SQL enhancements introduced in this release.

State TTL per operator

Configuring state TTL at the operator level was introduced in Apache Flink 1.18 but wasn’t easily accessible to the end-user. To modify an operator TTL, you had to export the plan at development time, modify it manually, and force Apache Flink to use the edited plan instead of generating a new one when the application starts. The new features added to Flink SQL in 1.19 simplify this process by allowing TTL configurations directly through SQL hints, eliminating the need for JSON plan manipulation.

The following code shows examples of how to use SQL hints to set state TTL:

-- State TTL for Joins
SELECT /*+ STATE_TTL('Orders' = '1d', 'Customers' = '20d') */ 
  *
FROM Orders 
LEFT OUTER JOIN Customers 
  ON Orders.o_custkey = Customers.c_custkey;

-- State TTL for Aggregations
SELECT /*+ STATE_TTL('o' = '1d') */ 
  o_orderkey, SUM(o_totalprice) AS revenue 
FROM Orders AS o 
GROUP BY o_orderkey;

Session window table-valued functions

Windows are at the heart of processing infinite streams in Apache Flink, splitting the stream into finite buckets for computations. Before 1.19, Apache Flink provided the following types of window table-value functions (TVFs):

  • Tumble windows – Fixed-size, non-overlapping windows
  • Hop windows – Fixed-size, overlapping windows with a specified hop interval
  • Cumulate windows – Increasingly larger windows that start at the same point but grow over time

With the Apache Flink 1.19 release, it has enhanced its SQL capabilities by supporting session window TVFs in streaming mode, allowing for more sophisticated and flexible windowing operations directly within SQL queries. Applications can create dynamic windows that group elements based on session gaps, now supported in streaming mode. The following code shows an example:

-- Session window with partition keys
SELECT 
  * 
FROM TABLE(
  SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES));

-- Apply aggregation on the session windowed table with partition keys
SELECT 
  window_start, window_end, item, SUM(price) AS total_price
FROM TABLE(
  SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES))
GROUP BY item, window_start, window_end;

Mini-batch optimization for regular joins

When using the Table API or SQL, regular joins—standard equi-joins like a table SQL join, where time is not a factor—may induce a considerable overhead for the state backend, especially when using RocksDB.

Normally, Apache Flink processes standard joins one record at a time, looking up the state for a matching record in the other side of the join, updating the state with the input record, and emitting the resulting record. This may add considerable pressure on RocksDB, with multiple reads and writes for each record.

Apache Flink 1.19 introduces the ability to use mini-batch processing with equi-joins (FLIP-415). When enabled, Apache Flink will process regular joins not one record at a time, but in small batches, substantially reducing the pressure on the RocksDB state backend. Mini-batching adds some latency, which is controllable by the user. See, for example, the following SQL code (embedded in Java):

TableConfig tableConfig = tableEnv.getConfig();
tableConfig.set("table.exec.mini-batch.enabled", "true");
tableConfig.set("table.exec.mini-batch.allow-latency", "5s");
tableConfig.set("table.exec.mini-batch.size", "5000");

tableEnv.executeSql("CREATE TEMPORARY VIEW ab AS " +
  "SELECT a.id as a_id, a.a_content, b.id as b_id, b.b_content " +
  "FROM a LEFT JOIN b ON a.id = b.id";

With this configuration, Apache Flink will buffer up to 5,000 records or up to 5 seconds, whichever comes first, before processing the join for the entire mini-batch.

In Apache Flink 1.19, mini-batching only works for regular joins, not windowed or temporal joins. Mini-batching is disabled by default, and you have to explicitly enable it and set the batch size and latency for Flink to use it. Also, mini-batch settings are global, applied to all regular join of your application. At the time of writing, it’s not possible to set mini-batching per join statement.

AsyncScalarFunction

Before version 1.19, an important limitation of SQL and the Table API, compared to the Java DataStream API, was the lack of asynchronous I/O support. Any request to an external system, for example a database or a REST API, or even any AWS API call, using the AWS SDK, is synchronous and blocking. An Apache Flink’s subtask waits for the response before completing the processing of a record and proceeding to the next one. Practically, the roundtrip latency of each request was added to the processing latency for each processed record. Apache Flink’s Async I/O API removes this limitation, but it’s only available for the DataStream API and Java. Until version 1.19, there was no simple efficient workaround in SQL, the Table API, or Python.

Apache Flink 1.19 introduces the new AsyncScalarFunction, a user-defined function (UDF) that can be implemented using non-blocking calls to the external system, to support use cases similar to asynchronous I/O in SQL and the Table API.

This new type of UDF is only available in streaming mode. At the moment, it only supports ordered output. DataStream Async I/O also supports unordered output, which may further reduce latency when strict ordering isn’t required.

Python 3.11 support

Python 3.11 is now supported, and Python 3.7 support has been completely removed (FLINK-33029). Managed Service for Apache Flink currently uses the Python 3.11 runtime to run PyFlink applications. Python 3.11 is a bugfix only version of the runtime. Python 3.11 introduced several performance improvements and bug fixes, but no API breaking changes.

Performance improvements: Dynamic checkpoint interval

In the latest release of Apache Flink 1.19, significant enhancements have been made to improve checkpoint behavior. With this new release, it gives the application the capability to adjust checkpointing intervals dynamically based on whether the source is processing backlog data (FLIP-309).

In Apache Flink 1.19, you can now specify different checkpointing intervals based on whether a source operator is processing backlog data. This flexibility optimizes job performance by reducing checkpoint frequency during backlog phases, enhancing overall throughput. Extending checkpoint intervals allows Apache Flink to prioritize processing throughput over frequent state snapshots, thereby improving efficiency and performance.

To enable it, you need to define the execution.checkpointing.interval parameter for regular intervals and execution.checkpointing.interval-during-backlog to specify a longer interval when sources report processing backlog.

For example, if you want to run checkpoints every 60 seconds during normal processing, but extend to 10 minutes during the processing of backlogs, you can set the following:

  • execution.checkpointing.interval = 60s
  • execution.checkpointing.interval-during-backlog = 10m

In Amazon Managed Service for Apache Flink, the default checkpointing interval is configured by the application configuration (60 seconds by default). You don’t need to set the configuration parameter. To set a longer checkpointing interval during backlog processing, you can raise a support case to modify execution.checkpointing.interval-during-backlog. See Modifiable Flink configuration properties for further details about modifying Apache Flink configurations.

At the time of writing, dynamic checkpointing intervals are only supported by Apache Kafka source and FileSystem source connectors. If you use any other source connector, intervals during backlog are ignored, and Apache Flink runs a checkpoint at the default interval during backlog processing.

In Apache Flink, checkpoints are always injected in the flow from the sources. This feature only involves source connectors. The sink connectors you use in your application don’t affect this feature. For a deep dive into the Apache Flink checkpoint mechanism, see Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints.

More troubleshooting information: Job initialization and checkpoint traces

With FLIP-384, Apache Flink 1.19 introduces trace reporters, which show checkpointing and job initialization traces. As of 1.19, this trace information can be sent to the logs using Slf4j. In Managed Service for Apache Flink, this is now enabled by default. You can find checkpoint and job initialization details in Amazon CloudWatch Logs, with the other logs from the application.

Checkpoint traces contain valuable information about each checkpoint. You can find similar information on the Apache Flink Dashboard, but only for the latest checkpoints and only while the application is running. Conversely, in the logs, you can find the full history of checkpoints. The following is an example of a checkpoint trace:

SimpleSpan{
  scope=org.apache.flink.runtime.checkpoint.CheckpointStatsTracker, 
  name=Checkpoint, 
  startTsMillis=1718779769305, 
  endTsMillis=1718779769542, 
  attributes={
    jobId=1b418a2404cbcf47ef89071f83f2dff9, 
    checkpointId=9774, 
    checkpointStatus=COMPLETED, 
    fullSize=9585, 
    checkpointedSize=9585
  }
}

Job initialization traces are generated when the job starts and recovers the state from a checkpoint or savepoint. You can find valuable statistics you can’t normally find elsewhere, including the Apache Flink Dashboard. The following is an example of a job initialization trace:

SimpleSpan{
  scope=org.apache.flink.runtime.checkpoint.CheckpointStatsTracker,
  name=JobInitialization,
  startTsMillis=1718781201463,
  endTsMillis=1718781409657,
  attributes={
    maxReadOutputDataDurationMs=89,
    initializationStatus=COMPLETED,
    fullSize=26167879378,
    sumMailboxStartDurationMs=621,
    sumGateRestoreDurationMs=29,
    sumDownloadStateDurationMs=199482,
    sumRestoredStateSizeBytes.LOCAL_MEMORY=46764,
    checkpointId=270,
    sumRestoredStateSizeBytes.REMOTE=26167832614,
    maxDownloadStateDurationMs=199482,
    sumReadOutputDataDurationMs=90,
    maxRestoredStateSizeBytes.REMOTE=26167832614,
    maxInitializeStateDurationMs=201122,
    sumInitializeStateDurationMs=201241,
    jobId=8edb291c9f1c91c088db51b48de42308,
    maxGateRestoreDurationMs=22,
    maxMailboxStartDurationMs=391,
    maxRestoredStateSizeBytes.LOCAL_MEMORY=46764
  }
}

Checkpoint and job initialization traces are logged at INFO level. You can find them in CloudWatch Logs only if you configure a logging level of INFO or DEBUG in your Managed Service for Apache Flink application.

Managed Service for Apache Flink behavior change

As a fully managed service, Managed Service for Apache Flink controls some runtime configuration parameters to guarantee the stability of your application. For details about the Apache Flink settings that can be modified, see Apache Flink settings.

With the 1.19 runtime, if you programmatically modify a configuration parameter that is directly controlled by Managed Service for Apache Flink, you receive an explicit ProgramInvocationException when the application starts, explaining what parameter is causing the problem and preventing the application from starting. With runtime 1.18 or earlier, changes to parameters controlled by the managed service were silently ignored.

To learn more about how Managed Service for Apache Flink handles configuration changes in runtime 1.19 or later, refer to FlinkRuntimeException: “Not allowed configuration change(s) were detected”.

Conclusion

In this post, we explored some of the new relevant features and configuration changes introduced with Apache Flink 1.19, now supported by Managed Service for Apache Flink. This latest version brings numerous enhancements aimed at improving performance, flexibility, and usability for developers working with Apache Flink.

With the support of Apache Flink 1.19, Managed Service for Apache Flink now supports the latest released Apache Flink version. We have seen some of the interesting new features available for Flink SQL and PyFlink.

You can find more details about recent releases from the Apache Flink blog and release notes:

If you’re new to Apache Flink, we recommend our guide to choosing the right API and language and following the getting started guide to start using Managed Service for Apache Flink.

If you’re already running an application in Managed Service for Apache Flink, you can safely upgrade it in-place to the new 1.19 runtime.


About the Authors

Francisco Morillo is a Streaming Solutions Architect at AWS, specializing in real-time analytics architectures. With over five years in the streaming data space, Francisco has worked as a data analyst for startups and as a big data engineer for consultancies, building streaming data pipelines. He has deep expertise in Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon Managed Service for Apache Flink. Francisco collaborates closely with AWS customers to build scalable streaming data solutions and advanced streaming data lakes, ensuring seamless data processing and real-time insights.

Lorenzo Nicora works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-centered, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Enhance data security with fine-grained access controls in Amazon DataZone

Post Syndicated from Deepmala Agarwal original https://aws.amazon.com/blogs/big-data/enhance-data-security-with-fine-grained-access-controls-in-amazon-datazone/

Fine-grained access control is a crucial aspect of data security for modern data lakes and data warehouses. As organizations handle vast amounts of data across multiple data sources, the need to manage sensitive information has become increasingly important. Making sure the right people have access to the right data, without exposing sensitive information to unauthorized individuals, is essential for maintaining data privacy, compliance, and security.

Today, Amazon DataZone has introduced fine-grained access control, providing you granular control over your data assets in the Amazon DataZone business data catalog across data lakes and data warehouses. With the new capability, data owners can now restrict access to specific records of data at row and column levels, instead of granting access to the entire data asset. For example, if your data contains columns with sensitive information such as personally identifiable information (PII), you can restrict access to only the necessary columns, making sure sensitive information is protected while still allowing access to non-sensitive data. Similarly, you can control access at the row level, allowing users to see only the records that are relevant to their role or task.

In this post, we discuss how to implement fine-grained access control with row and column asset filters using this new feature in Amazon DataZone.

Row and column filters

Row filters enable you to restrict access to specific rows based on criteria you define. For instance, if your table contains data for two regions (America and Europe) and you want to make sure that employees in Europe only access data relevant to their region, you can create a row filter that excludes rows where the region is not Europe (for example, region != 'Europe'). This way, employees in America won’t have access to Europe’s data.

Column filters allow you to limit access to specific columns within your data assets. For example, if your table includes sensitive information such as PII, you can create a column filter to exclude PII columns. This makes sure subscribers can only access non-sensitive data.

The row and column asset filters in Amazon DataZone enable you to control who can access what using a consistent, business user-friendly mechanism for all of your data across AWS data lakes and data warehouses. To use fine-grained access control in Amazon DataZone, you can create row and column filters on top of your data assets in the Amazon DataZone business data catalog. When a user requests a subscription to your data asset, you can approve the subscription by applying the appropriate row and column filters. Amazon DataZone enforces these filters using AWS Lake Formation and Amazon Redshift, making sure the subscriber can only access the rows and columns that they are authorized to use.

Solution overview

To demonstrate the new capability, we consider a sample customer use case where an electronics ecommerce platform is looking to implement fine-grained access controls using Amazon DataZone. The customer has multiple product categories, each operated by different divisions of the company. The platform governance team wants to make sure each division has visibility only to data belonging to their own categories. Additionally, the platform governance team needs to adhere to the finance team requirements that pricing information should be visible only to the finance team.

The sales team, acting as the data producer, has published an AWS Glue table called Product sales that contains data for both Laptops and Servers categories to the Amazon DataZone business data catalog using the project Product-Sales. The analytic teams in both the laptop and server divisions need to access this data for their respective analytics projects. The data owner’s objective is to grant data access to consumers based on the division they belong to. This means giving access to only rows of data with laptop sales to the laptops sales analytics team, and rows with servers sales to the server sales analytics team. Additionally, the data owner wants to restrict both teams from accessing the pricing data. This post demonstrates the implementation steps to achieve this use case in Amazon DataZone.

The steps to configure this solution are as follows:

  1. The publisher creates asset filters for limiting access:
    1. We create two row filters: a Laptop Only row filter that limits access to only the rows of data with laptop sales, and a Server Only row filter that limits access to the rows of data with server sales.
    2. We also create a column filter called exclude-price-columns that excludes the price-related columns from the Product Sales
  2. Consumers discover and request subscriptions:
    1. The analyst from the laptops division requests a subscription to the Product Sales data asset.
    2. The analyst from the servers division also request a subscription to the Product Sales data asset.
    3. Both subscription requests are sent to the publisher for approval.
  3. The publisher approves the subscriptions and applies the appropriate filters:
    1. The publisher approves the request from the analysts in the laptops division, applying the Laptop Only row filter and the exclude-price-columns columns filter.
    2. The publisher approves the request from the consumer in the servers division, applying the Server Only row filter and the exclude-price-columns columns filter.
  4. Consumers access the authorized data in Amazon Athena:
    1. After the subscription is approved, we query the data in Athena to make sure that the analyst from the laptops division can now access only the product sales data for the Laptop
    2. Similarly, the analyst from the servers division can access only the product sales data for the Server
    3. Both consumers can see all columns except the price-related columns, as per the applied column filter.

The following diagram illustrates the solution architecture and process flow.

Prerequisites

To follow along with this post, the publisher of the product sales data asset must have published a sales dataset in Amazon DataZone.

Publisher creates asset filters for limiting access

In this section, we detail the steps the publisher takes to create asset filers.

Create row filters

This dataset contains the product categories Laptops and Servers. We want to restrict access to the dataset that is authorized based on the product category. We use the row filter feature in Amazon DataZone to achieve this.

Amazon DataZone allows you to create row filters that can be used when approving subscriptions to make sure that the subscriber can only access rows of data as defined in the row filters. To create a row filter, complete the following steps:

  1. On the Amazon DataZone console, navigate to the product-sales project (the project to which the asset belongs).
  2. Navigate to the Data tab for the project.
  3. Choose Inventory data in the navigation pane, then the asset Product Sales, where you want to create the row filter.

You can add row filters for assets of type AWS Glue tables or Redshift tables.

  1. On the asset detail page, on the Asset filters tab, choose Add asset filter.

We create two row filters, one each for the Laptops and Servers categories.

  1. Complete the following steps to create a laptop only asset row filter:
    1. Enter a name for this filter (Laptop Only).
    2. Enter a description of the filter (Allow rows with product category as Laptop Only).
    3. For the filter type, select Row filter.
    4. For the row filter expression, enter one or more expressions:
      1. Choose the column Product Category from the column dropdown menu.
      2. Choose the operator = from the operator dropdown menu.
      3. Enter the value Laptops in the Value field.
    5. If you need to add another condition to the filter expression, choose Add condition. For this post, we create a filter with one condition.
    6. When using multiple conditions in the row filter expression, choose And or Or to link the conditions.
    7. You can also define the subscriber visibility. For this post, we kept the default value (No, show values to subscriber).
    8. Choose Create asset filter.
  2. Repeat the same steps to create a row filter called Server Only, except this time enter the value Servers in the Value field.

Create column filters

Next, we create column filters to restrict access to columns with price-related data. Complete the following steps:

  1. In the same asset, add another asset filter of type column filter.
  2. On the Asset filters tab, choose Add asset filter.
  3. For Name, enter a name for the filter (for this post, exclude-price-columns).
  4. For Description, enter a description of the filters (for this post, exclude price data columns).
  5. For the filter type, select Column to create the column filter. This will display all the available columns in the data asset’s schema.
  6. Select all columns except the price-related ones.
  7. Choose Create asset filter.

Consumers discover and request subscriptions

In this section, we switch to the role of an analyst from the laptop division who is working within the project Sales Analytics - Laptop. As the data consumer, we search the catalog to find the Product Sales data asset and request access by subscribing to it.

  1. Log in to your project as a consumer and search for the Product Sales data asset.
  2. On the Product Sales data asset details page, choose Subscribe.
  3. For Project, choose Sales Analytics – Laptops.
  4. For Reason for request, enter the reason for the subscription request.
  5. Choose Subscribe to submit the subscription request.

Publisher approves subscriptions with filters

After the subscription request is submitted, the publisher will receive the request, and they can approve it by following these steps:

  1. As the publisher, open the project Product-Sales.
  2. On the Data tab, choose Incoming requests in the left navigation pane.
  3. Locate the request and choose View request. You can filter by Pending to see only requests that are still open.

This opens the details of the request, where you can see details like who requested the access, for what project, and the reason for the request.

  1. To approve the request, there are two options:
    1. Full access – If you choose to approve the subscription with full access option, the subscriber will get access to all the rows and columns in our data asset.
    2. Approve with row and column filters – To limit access to specific rows and columns of data, you can choose the option to approve with row and column filters. For this post, we use both filters that we created earlier.
  2. Select Choose filter, then on the dropdown menu, choose the Laptops Only and pii-col-filter
  3. Choose Approve to approve the request.

After access is granted and fulfilled, the subscription looks as shown in the following screenshot.

  1. Now let’s log in as a consumer from the server division.
  2. Repeat the same steps, but this time, while approving the subscription, the publisher of sales data approves with the Server only The other steps remain the same.

Consumers access authorized data in Athena

Now that we have successfully published an asset to the Amazon DataZone catalog and subscribed to it, we can analyze it. Let’s log in as a consumer from the laptop division.

  1. In the Amazon DataZone data portal, choose the consumer project Sales Analytics - Laptops.
  2. On the Schema tab, we can view the subscribed assets.
  3. Choose the project Sales Analytics - Laptops and choose the Overview
  4. In the right pane, open the Athena environment.

We can now run queries on the subscribed table.

  1. Choose the table under Tables and views, then choose Preview to view the SELECT statement in the query editor.
  2. Run a query as the consumer of Sales Analytics - Laptops, in which we can view data only with product category Laptops.

Under Tables and views, you can expand the table product_sales. The price-related columns are not visible in the Athena environment for querying.

  1. Next, you can switch to the role of analyst from the server division and analyze the dataset in similar way.
  2. We run the same query and see that under product_category, the analyst can see Servers only.

Conclusion

Amazon DataZone offers a straightforward way to implement fine-grained access controls on top of your data assets. This feature allows you to define column-level and row-level filters to enforce data privacy before the data is available to data consumers. Amazon DataZone fine-grained access control is generally available in all AWS Regions that support Amazon DataZone.

Try out the fine-grained access control feature in your own use case, and let us know your feedback in the comments section.


About the Authors

Deepmala Agarwal works as an AWS Data Specialist Solutions Architect. She is passionate about helping customers build out scalable, distributed, and data-driven solutions on AWS. When not at work, Deepmala likes spending time with family, walking, listening to music, watching movies, and cooking!

Leonardo Gomez is a Principal Analytics Specialist Solutions Architect at AWS. He has over a decade of experience in data management, helping customers around the globe address their business and technical needs. Connect with him on LinkedIn.

Utkarsh Mittal is a Senior Technical Product Manager for Amazon DataZone at AWS. He is passionate about building innovative products that simplify customers’ end-to-end analytics journeys. Outside of the tech world, Utkarsh loves to play music, with drums being his latest endeavor.