Tag Archives: Big Data

How and Why Netflix Built a Real-Time Distributed Graph: Part 1 — Ingesting and Processing Data…

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/how-and-why-netflix-built-a-real-time-distributed-graph-part-1-ingesting-and-processing-data-80113e124acc

How and Why Netflix Built a Real-Time Distributed Graph: Part 1 — Ingesting and Processing Data Streams at Internet Scale

Authors: Adrian Taruc and James Dalton

This is the first entry of a multi-part blog series describing how we built a Real-Time Distributed Graph (RDG). In Part 1, we will discuss the motivation for creating the RDG and the architecture of the data processing pipeline that populates it.

Introduction

The Netflix product experience historically consisted of a single core offering: streaming video on demand. Our members logged into the app, browsed, and watched titles such as Stranger Things, Squid Game, and Bridgerton. Although this is still the core of our product, our business has changed significantly over the last few years. For example, we introduced ad-supported plans, live programming events (e.g., Jake Paul vs. Mike Tyson and NFL Christmas Day Games), and mobile games as part of a Netflix subscription. This evolution of our business has created a new class of problems where we have to analyze member interactions with the app across different business verticals. Let’s walk through a simple example scenario:

  1. Imagine a Netflix member logging into the app on their smartphone and beginning to watch an episode of Stranger Things.
  2. Eventually, they decide to watch on a bigger screen, so they log into the app on a smart TV in their home and continue watching the same episode.
  3. Finally, after completing the episode, they log into the app on their tablet and play the game “Stranger Things: 1984”.

We want to know that these three activities belong to the same member, despite occurring at different times and across various devices. In a traditional data warehouse, these events would land in at least two different tables and may be processed at different cadences. But in a graph system, they become connected almost instantly. Ultimately, analyzing member interactions in the app across domains empowers Netflix to create more personalized and engaging experiences.

In the early days of our business expansion, discovering these relationships and contextual insights was extremely difficult. Netflix is famous for adopting a microservices architecture — hundreds of microservices developed and maintained by hundreds of individual teams. Some notable benefits of microservices are:

  1. Service Decomposition: The overall platform is separated into smaller services, each responsible for a specific business capability. This modularity allows for independent service development, deployment, and scaling.
  2. Data Isolation: Each service manages its own data, reducing interdependencies. This allows teams to choose the most suitable data schemas and storage technologies for their services.

However, these benefits also led to drawbacks for our data science and engineering partners. In practice, the separation of business concerns and service development ultimately resulted in a separation of data. Manually stitching data together from our data warehouse and siloed databases was an onerous task for our partners. Our data engineering team recognized we needed a solution to process and store our enormous swath of interconnected data while enabling fast querying to discover insights. Although we could have structured the data in various ways, we ultimately settled on a graph representation. We believe a graph offers key advantages, specifically:

  • Relationship-Centric Queries: Graphs enable fast “hops” across multiple nodes and edges without expensive joins or manual denormalization that would be required in table-based data models.
  • Flexibility as Relationships Grow: As new connections and entities emerge, graphs can quickly adapt without significant schema changes or re-architecture.
  • Pattern and Anomaly Detection: Our stakeholders’ use cases often require identifying hidden relationships, cycles, or groupings in the data — capabilities much more naturally expressed and efficiently executed using graph traversals than siloed point lookups.

This is why we set out to build a Real-Time Distributed Graph, or “RDG” for short.

Ingestion and Processing

Three main layers in the system power the RDG:

  1. Ingestion and Processing — receive events from disparate upstream data sources and use them to generate graph nodes and edges.
  2. Storage — write nodes and edges to persistent data stores.
  3. Serving — expose ways for internal clients to query graph nodes and edges.

The rest of this post will focus on the first layer, while subsequent posts in this blog series will cover the other layers. The diagram below depicts a high-level overview of the ingestion and processing pipeline:

Building and updating the RDG in real-time requires continuously processing vast volumes of incoming data. Batch processing systems and traditional data warehouses cannot offer the low latency needed to maintain an up-to-date graph that supports real-time applications. We opted for a stream processing architecture, enabling us to update the graph’s data as events happen, thus minimizing delay and ensuring the system reflects the latest member interactions within the Netflix app.

Kafka as the Ingestion Backbone

Member actions in the Netflix app are published to our API Gateway, which then writes them as records to Apache Kafka topics. Kafka is the mechanism through which internal data applications can consume these events. It provides durable, replayable streams that downstream processors, such as Apache Flink jobs, can consume in real-time.

Our team’s applications consume several different Kafka topics, each generating up to roughly 1 million messages per second. Topic records are encoded in the Apache Avro format, and Avro schemas are persisted in an internal centralized schema registry. In order to strike a balance between maintaining data availability and managing the financial expenses of storage infrastructure, we tailor retention policies for each topic according to its throughput and record size. We also persist topic records to Apache Iceberg data warehouse tables, which allows us to backfill data in scenarios where older data is no longer available in the Kafka topics.

Processing Data with Apache Flink

The event records in the Kafka streams are ingested by Flink jobs. We chose Flink because of its strong capabilities around near-real-time event processing. There is also robust internal platform support for Flink within Netflix, which allows jobs to integrate with Kafka and various storage backends seamlessly. At a high level, the anatomy of an RDG Flink job looks like this:

For the sake of simplicity, the diagram above depicts a basic flow in which a member logs into their Netflix account and begins watching an episode of Stranger Things. Reading the diagram from left to right:

  • The actions of logging into the app and watching the Stranger Things episode are ultimately written as events to Kafka topics.
  • The Flink job consumes event records from the upstream Kafka topics.
  • Next, we have a series of Flink processor functions that:
  1. Apply filtering and projections to remove noise based on the individual fields that are present — or in some cases, not present — in the events.
  2. Enrich events with additional metadata, which are stored and accessed by the processor functions via side inputs.
  3. Transform events into graph primitives — nodes representing entities (e.g., member accounts and show/movie titles), and edges representing relationships or interactions between them. In this example, the diagram only shows a few nodes and an edge to keep things simple. However, in reality, we create and update up to a few dozen different nodes and edges, depending on the member actions that occurred within the Netflix app.
  4. Buffer, detect, and deduplicate overlapping updates that occur to the same nodes and edges within a small, configurable time window. This step reduces the data throughput we publish downstream. It is implemented using stateful process functions and timers.
  5. Publish nodes and edges records to Data Mesh, an abstraction layer that connects data applications and storage systems. We write a total (nodes + edges) of more than 5 million records per second to Data Mesh, which handles persisting the records to various data stores that other internal services can query.

From One Job to Many: Scaling Flink the Hard Way

Initially, we tried having just one Flink job that consumed all the Kafka source topics. However, this quickly became a big operational headache since different topics can have different data volumes and throughputs at different times during the day. Consequently, tuning the monolithic Flink job became extremely difficult — we struggled to find CPU, memory, job parallelism, and checkpointing interval configurations that ensured job stability.

Instead, we pivoted to having a 1:1 mapping from the Kafka source topic to the consuming Flink job. Although this led to additional operational overhead due to more jobs to develop and deploy, each job has been much simpler to maintain, analyze, and tune.

Similarly, each node and edge type is written to a separate Kafka topic. This means we have significantly more Kafka topics to manage. However, we decided the tradeoff of having bespoke tuning and scaling per topic was worth it. We also designed the graph data model to be as generic and flexible as possible, so adding new types of nodes and edges would be an infrequent operation.

Acknowledgements

We would be remiss if we didn’t give a special shout-out to our stunning colleagues who work on the internal Netflix data platform. Building the RDG was a multi-year effort that required us to design novel solutions, and the investments and foundations from our platform teams were critical to its successful creation. You make the lives of Netflix data engineers much easier, and the RDG would not exist without your diligent collaboration!

Thanks for reading the first season of the RDG blog series; stay tuned for Season 2, where we will go over the storage layer containing the graph’s various nodes and edges.


How and Why Netflix Built a Real-Time Distributed Graph: Part 1 — Ingesting and Processing Data… was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Scaling Muse: How Netflix Powers Data-Driven Creative Insights at Trillion-Row Scale

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/scaling-muse-how-netflix-powers-data-driven-creative-insights-at-trillion-row-scale-aa9ad326fd77

By Andrew Pierce, Chris Thrailkill, Victor Chiapaikeo

At Netflix, we prioritize getting timely data and insights into the hands of the people who can act on them. One of our key internal applications for this purpose is Muse. Muse’s ultimate goal is to help Netflix members discover content they’ll love by ensuring our promotional media is as effective and authentic as possible. It achieves this by equipping creative strategists and launch managers with data-driven insights showing which artwork or video clips resonate best with global or regional audiences and flagging outliers such as potentially misleading (clickbait-y) assets. These kinds of applications fall under Online Analytical Processing (OLAP), a category of systems designed for complex querying and data exploration. However, enabling Muse to support new, more advanced filtering and grouping capabilities while maintaining high performance and data accuracy has been a challenge. Previous posts have touched on artwork personalization and our impressions architecture. In this post, we’ll discuss some steps we’ve taken to evolve the Muse data serving layer to enable new capabilities while maintaining high performance and data accuracy.

Muse application

An Evolving Architecture

Like many early analytics applications, Muse began as a simple dashboard powered by batch data pipelines (Spark¹) and a modest Druid² cluster. As the application evolved, so did user demands. Users wanted new features like outlier detection and notification delivery, media comparison and playback, and advanced filtering, all while requiring lower latency and supporting ever-growing datasets (in the order of trillions of rows a year). One of the most challenging requirements was enabling dynamic analysis of promotional media performance by “audience” affinities: internally defined, algorithmically inferred labels representing collections of viewers with similar tastes. Answering questions like “Does specific promotional media resonate more with Character Drama fans or Pop Culture enthusiasts?” required augmenting already voluminous impression and playback data. Supporting filtering and grouping by these many-to-many audience relationships led to a combinatorial explosion in data volume, pushing the limits of our original architecture.

To address these complexities and support the evolving needs of our users, we undertook a significant evolution of Muse’s architecture. Today’s Muse is a React app that queries a GraphQL layer served with a set of Spring Boot GRPC microservices. In the remainder of this post, we’ll focus on steps we took to scale the data microservice, its backing ETL, and our Druid cluster. Specifically, we’ve changed the data model to rely on HyperLogLog (HLL) sketches, used Hollow for access to in-memory, precomputed aggregates, and taken a series of steps to tune Druid. To ensure the accuracy of these changes, we relied heavily on internal debugging tools to validate pre- and post-changes.

Muse’s Current Architecture

Moving to HyperLogLog (HLL) Sketches for Distinct Counts

Some of the most important metrics we track are impressions, the number of times an asset is shown to a user within a time window, and qualified plays, which links a playback event with a minimum duration back to a specific impression. Calculating these metrics requires counting distinct users. However, performing distinct counts in distributed systems is resource-intensive and challenging. For instance, to determine how many unique profiles have ever seen a particular asset, we need to compare each new set of profile ids with those from all days before it, potentially spanning months or even years.

For performance, we can trade accuracy. The Apache Datasketches library allows us to get distinct count estimates that are within a 1–2% error. This is tunable with a precision parameter called logK (0.8% in our case with logK of 17). We build sketches in two places:

  1. During Druid ingest: we use the HLLSketchBuild aggregator with Druid rollup set to true to reduce our data in preparation for fast distinct counting
  2. During our Spark ETL: we persist precomputed aggregates like all-time impressions per asset in the form of HLL sketches. Each day, we merge a new HLL sketch into the existing one using a combination of hll_union and hll_union_agg (functions added by our very own Ryan Berti)
We use Datasketches in our ETL and serving systems

HLL has been a huge performance boost for us both within the serving and ETL layer. Across our most common OLAP query patterns, we’ve seen latencies reduce by approx 50%. Nevertheless, running APPROX_COUNT_DISTINCT over large date ranges on the Druid cluster for very large titles exhausts limited threads, especially in high-concurrency situations. To further offload Druid query volume and preserve cluster threads, we’ve also relied extensively on the Hollow library.

Hollow as a Read-Only Key Value Store for Precomputed Aggregates

Our in-house Hollow³ infrastructure allows us to easily create Hollow feeds — essentially highly compressed and performant in-memory key/value stores — from Iceberg⁴ tables. In this setup, dedicated producer servers listen for changes to Iceberg tables, and when updates occur, they push the latest data to downstream consumers. On the consumer side, our Spring Boot applications listen to announcements from these producers and automatically refresh in-memory caches with the latest dataset.

This architecture has enabled us to migrate several data access patterns from Druid to Hollow, specifically ones with a limited number of parameter combinations per title. One of these was fetching distinct filter dimensions. For example, while most Netflix-branded titles are released globally, licensed titles often have rights restrictions that limit their availability to specific countries and time windows. As a result, a particular licensed title might only be available to members in Germany and Luxembourg.

Distinct countries queried from a Hollow feed for the assets for Manta Manta

In the past, retrieving these distinct country values per asset required issuing a SELECT DISTINCT query to our Druid cluster. With Hollow, we maintain a feed of distinct dimension values, allowing us to perform stream operations like the one below directly on a cached dataset.

/**
* Returns the possible filter values for a dimension such as countries
*/
public List<Dimension> getDimensions(long movieId, String dimensionId) {
// Access in-memory Hollow feed with near instant query time
Map<String, List<Dimension>> dimensions = dimensionsHollowConsumer.lookup(movieId);
return dimensions.getOrDefault(dimensionId, List.of()).stream()
.sorted(Comparator.comparing(Dimension::getName))
.toList();
}

Although it adds complexity to our service by requiring more intricate request routing and a higher memory footprint, pre-computed aggregates have given us greater stability and performance. In the case of fetching distinct dimensions, we’ve observed query times drop from hundreds of milliseconds to just tens of milliseconds. More importantly, this shift has offloaded high concurrency demands from our Druid cluster, resulting in more consistent query performance. In addition to this use case, cached pre-computed aggregates also power features such as retrieving recently launched titles, accessing all-time asset metrics, and serving various pieces of title metadata.

Tuning Druid

Even with the efficiencies gained from HLL sketches and Hollow feeds, ensuring that our Druid cluster operates performantly has been an ongoing challenge. Fortunately, at Netflix, we are in the company of multiple Apache Druid PMC members like Maytas Monsereenusorn and Jesse Tuğlu who have helped us wring out every ounce of performance. Some of the key optimizations we’ve implemented include:

  • Increasing broker count relative to historical nodes: We aim for a broker-to-historical ratio close to the recommended 1:15, which helps improve query throughput.
  • Tuning segment sizes: By targeting the 300–700 MB “sweet spot” for segment sizes, primarily using the tuningConfig.targetRowsPerSegment parameter during ingestion — we ensure that each segment a single historical thread scans is not overly large.
  • Leveraging Druid lookups for data enrichment: Since joins can be prohibitively expensive in Druid, we use lookups at query time for any key column enrichment.
  • Optimizing search predicates: We ensure that all search predicates operate on physical columns rather than virtual ones, creating necessary columns during ingestion with transformSpec.transforms.
  • Filtering and slimming data sources at ingest: By applying filters within transformSpec.filter and removing all unused columns in dimensionsSpec.dimensions, we keep our data sources lean and improve the possibility of higher rollup yield.
  • Use of multi-value dimensions: Exploiting the Druid multi-value dimension feature was key to overcoming the “many-to-many” combinatorial quandary when integrating audience filtering and grouping functionality mentioned in the “An Evolving Architecture” section above.

Together, these optimizations, combined with previous ones, have decreased our p99 Druid latencies by roughly 50%.

Validation & Rollout

Rolling out these changes to our metrics system required a thorough validation and release strategy. Our approach prioritized both data integrity and user trust, leveraging a blend of automation, targeted tooling, and incremental exposure to production traffic. At the core of our strategy was a parallel stack deployment: both the legacy and new metric stacks operated side-by-side within the Muse Data microservice. This setup allowed us to validate data quality, monitor real-world performance, and mitigate risk by enabling seamless fallback at any stage.

We adopted a two-pronged validation process:

  • Automated Offline Validation: Using Jupyter Notebooks, we automated the sampling and comparison of key metrics across both the legacy and new stacks. Our sampling set included a representative mix: recently accessed titles, high-profile launches, and edge-case titles with unique handling requirements. This allowed us to catch subtle discrepancies in metrics early in the process. Iterative testing on this set guided fixes, such as tuning the HLL logK parameter and benchmarking end-to-end latency improvements.
  • In-App Data Comparison Tooling: To facilitate rapid triage, we built a developer-facing comparison feature within our application that displays data from both the legacy and new metric stacks side by side. The tool automatically highlights any significant differences, making it easy to quickly spot and investigate discrepancies identified during offline validation or reported by users.

We implemented several release best practices to mitigate risk and maintain stability:

  • Staggered Implementation by Application Segment: We developed and deployed the new metric stack in stages, focusing on specific application segments. This meant building out support for asset types like artwork and video separately and then further dividing by CEE phase (Explore, Exploit). By implementing changes segment by segment, we were able to isolate issues early, validate each piece independently, and reduce overall risk during the migration.
  • Shadow Testing (“Dark Launch”): Prior to exposing the new stack to end users, we mirrored production traffic asynchronously to the new implementation. This allowed us to validate real-world latency and catch potential faults in a live environment, without impacting the actual user experience.
  • Granular Feature Flagging: We implemented fine-grained feature flags to control exposure within each segment. This allowed us to target specific user groups or titles and instantly roll back or adjust the rollout scope if any issues were detected, ensuring rapid mitigation with minimal disruption.

Learnings and Next Steps

Our journey with Muse tested the limits of several parts of the stack: the ETL layer, the Druid layer, and the data serving layer. While some choices, like leveraging Netflix’s in-house Hollow infrastructure, were influenced by available resources, simple principles like offloading query volume, pre-filtering of rows and columns before Druid rollup, and optimizing search predicates (along with a bit of HLL magic) went a long way in allowing us to support new capabilities while maintaining performance. Additionally, engineering best practices like producing side-by-side implementations and backwards-compatible changes enabled us to roll out revisions steadily while maintaining rigorous validation standards. Looking ahead, we’ll continue to build on this foundation by supporting a wider range of content types like Live and Games, incorporating synopsis data, deepening our understanding of how assets work together to influence member choosing, and incorporating new metrics to distinguish between “effective” and “authentic” promotional assets, in service of helping members find content that truly resonates with them.

¹ Apache Spark is an open-source analytics engine for processing large-scale data, enabling tasks like batch processing, machine learning, and stream processing.

² Apache Druid is a high-performance, real-time analytics database designed for quickly querying large volumes of data.

³ Hollow is a Java library for efficient in-memory storage and access to moderately sized, read-only datasets, making it ideal for high-performance data retrieval.

⁴ Apache Iceberg is an open-source table format designed for large-scale analytical datasets stored in data lakes. It provides a robust and reliable way to manage data in formats like Parquet or ORC within cloud object storage or distributed file systems.


Scaling Muse: How Netflix Powers Data-Driven Creative Insights at Trillion-Row Scale was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Export JMX metrics from Kafka connectors in Amazon Managed Streaming for Apache Kafka Connect with a custom plugin

Post Syndicated from Jaydev Nath original https://aws.amazon.com/blogs/big-data/export-jmx-metrics-from-kafka-connectors-in-amazon-managed-streaming-for-apache-kafka-connect-with-a-custom-plugin/

Organizations use streaming applications to process and analyze data in real time and adopt the Amazon MSK Connect feature of Amazon Managed Streaming for Apache Kafka (Amazon MSK) to run fully managed Kafka Connect workloads on AWS. Message brokers like Apache Kafka allow applications to handle large volumes and diverse types of data efficiently and enable timely decision-making and instant insights. It’s crucial to monitor the performance and health of each component to help ensure the seamless operation of data streaming pipelines.

Amazon MSK is a fully managed service that simplifies the deployment and operation of Apache Kafka clusters on AWS. It simplifies building and running applications that use Apache Kafka to process streaming data. Amazon MSK Connect simplifies the deployment, monitoring, and automatic scaling of connectors that transfer data between Apache Kafka clusters and external systems such as databases, file systems, and search indices. Amazon MSK Connect is fully compatible with Kafka Connect and supports Amazon MSK, Apache Kafka, and Apache Kafka compatible clusters. Amazon MSK Connect uses a custom plugin as the container for connector implementation logic.

Custom MSK connect plugins use Java Management Extensions (JMX) to expose runtime metrics. While Amazon MSK Connect sends a set of connect metrics to Amazon CloudWatch, it currently does not support exporting the JMX metrics emitted by the connector plugins natively. These metrics can be exported by modifying the custom connect plugin code directly, but it requires maintenance overhead because the plugin code needs to be modified every time it’s updated. In this post, we demonstrate an optimal approach by extending a custom connect plugin with additional modules to export JMX metrics and publish them to CloudWatch as custom metrics. These additional JMX metrics emitted by the custom connectors provide rich insights into their performance and health of the connectors. In this post, we demonstrate how you can export the JMX metrics for Debezium connector when used with MSK Connect.

Understanding JMX

Before we dive deep into exporting JMX metrics, let’s understand how JMX works. JMX is a technology that you can use to monitor and manage Java applications. Key components involved in JMX monitoring are:

  • Managed beans (MBeans) are Java objects that represent the metrics of the Java application being monitored. They contain the actual data points of the resources being monitored.
  • JMX server creates and registers the MBeans with the PlatformMBeanServer. The Java application that is being monitored acts as the JMX server and exposes the MBeans.
  • MBeanServer or JMX registry is the central registry that keeps track of all the registered MBeans in the JMX server. It is the access point for all the MBeans within the Java virtual machine (JVM).
  • JMXConnectorServer acts as a bridge between the JMX client and the JMX server and enables remote access to the exposed MBeans. JMXConnectorServerFactory creates and manages the JMXConnectorServer. It allows for the customization of the server’s properties and uses the JMXServiceURL to define the endpoint where the JMX client can connect to the JMX server.
  • JMXServiceURL provides the necessary information such as the protocol, host, and port for the client to connect to the JMX server and access the desired MBeans.
  • JMX client is an external application or tool that connect to the JMX server to access and monitor the exposed metrics.

JMX monitoring involves the steps shown in the following figure:

JMX architecture diagram showing connection flow from client to server with MBeans

JMX monitoring steps include:

  1. The Java application acting as the JMX server creates and configures MBeans for the desired metrics.
  2. JMX server registers the MBeans with the JMX registry.
  3. JMXConnectorServerFactory creates the JMXConnectorServer that defines the JMXServiceURL that provides the entry point details for the JMX client.
  4. JMXClient connects to the JMX registry in the JMX server using the JMXServiceURL and the JMXConnectorServer.
  5. The JMX server handles client requests, interacting with the JMX registry to retrieve the MBean data.

Solution overview

This method of wrapping supported Kafka connectors with custom code that exposes connector-specific operational metrics enables teams to get better insights by correlating various connector metrics with cloud-centered metrics in monitoring systems such as Amazon CloudWatch. This approach enables consistent monitoring across different components of the change data capture (CDC) pipeline, ultimately feeding metrics into unified dashboards while respecting each connector’s architectural philosophy. The consolidated metrics can be delivered to CloudWatch or the monitoring tool of your choice including partner specific application performance management (APM) tools such as Datadog, New Relic, and so on.

We have the working implementation of this same approach with two popular connectors: Debezium source connector and MongoDB Sink Connector. You can find the Github sample and ready to use plugins built for each in the repository. Review the README file for this custom implementation for more details.

For example, our custom implementation for the MongoDB Sink Connector adds a metrics export layer that calculates critical performance indicators such as latest-kafka-time-difference-ms – which measures the latency between Kafka message timestamps and connector processing time by subtracting the connector’s current clock time from the last received record’s timestamp. This custom wrapper around the MongoDB Sink Connector enables exporting relevant JMX metrics and publishing them as custom metrics to CloudWatch. We’ve open sourced this solution on GitHub, along with a ready-to-use plugin and detailed configuration guidance in the README.

CDC is the process of identifying and capturing changes made in a database and delivering those changes in real time to a downstream system. Debezium is an open source distributed platform built on top of Apache Kafka that provides CDC functionality. It provides a set of connectors to track and stream changes from databases to Kafka.

In the next section, we dive deep into the implementation details of how to export JMX metrics from Debezium MySQL Connector deployed as a custom plugin in Amazon MSK Connect. The connector plugin takes care of creating and configuring the MBeans and registering them with the JMX registry.

The following diagram shows the workflow of using Debezium MySQL Connector as a custom plugin in Amazon MSK Connect for CDC from an Amazon Aurora MySQL-Compatible Edition data source.

Data flow diagram illustrating custom Amazon MSK Connect plugin integrating Aurora, Kafka, and CloudWatch metrics

  1. MySQL binary log (binlog) is enabled in Amazon Aurora for MySQL to record all the operations in the order in which they are committed to the database.
  2. The Debezium connector plugin component of the MSK Connect custom plugin continuously monitors the MySQL database, captures the row-level changes by reading the MySQL bin logs, and streams them as change events to Kafka topics in Amazon MSK.
  3. We’ll build a custom module to enable JMX monitoring on the Debezium connector. This module will act as a JMX client to retrieve the JMX metrics from the connector and publish them as custom metrics to CloudWatch.

The Debezium connector provides three types of metrics in addition to the built-in support for default Kafka and Kafka Connect JMX metrics.

  • Snapshot metrics provide information about connector operation while performing a snapshot.
  • Streaming metrics provide information about connector operation when the connector is reading the binlog.
  • Schema history metrics provide information about the status of the connector’s schema history.

In this solution, we export the MilliSecondsBehindSource streaming metrics emitted by the Debezium MySQL connector. This metric provides the number of milliseconds that the connector is lagging behind the change events in the database.

Prerequisites

Following are the prerequisites you need:

  • Access to the AWS account where you want to set up this solution.
  • You have set up the source database and MSK cluster by following this setup instructions in the MSK Connect workshop.

Create a custom plugin

Creating a custom plugin for Amazon MSK Connect for the solution involves the following steps:

  1. Create a custom module: Create a new Maven module or project that will contain your custom code to:
    1. Enable JMX monitoring in the connector application by starting the JMX server.
    2. Create a Remote Method Invocation (RMI) registry to enable the access to the JMX metrics to the clients.
    3. Create a JMX metrics exporter to query the JMX metrics by connecting to the JMX server and push the metrics to CloudWatch as custom metrics.
    4. Schedule to run the JMX metrics exporter at a configured interval.
  2. Package and deploy the custom module as an MSK Connect custom plugin.
  3. Create a connector using the custom plugin to capture CDC from the source, stream it and validate the metrics in Amazon CloudWatch.

This custom module extends the connector functionality to export the JMX metrics without requiring any changes in the underlying connector implementation. This helps ensure that upgrading the custom plugin requires only upgrading the plugin version in the pom.xml of the custom module.

Let’s deep dive and understand the implementation of each step mentioned above.

1. Create a custom module

Create a new Maven project with dependencies on Debezium MySQL Connector to enable JMX monitoring, Kafka Connect API for configuration, and CloudWatch AWS SDK to push the metrics to CloudWatch.
Set up a JMX connector server to enable JMX monitoring: To enable JMX monitoring, the JMX server needs to be started at the time of initializing the connector. This is usually done by setting the environment variables with JMX options as described in Monitoring Debezium. In the case of an Amazon MSK Connect custom plugin, JMX monitoring is enabled programmatically at the time of connector plugin initialization. To achieve this:

  • Extend the MySqlConnector class and override the start which is the connector’s entry point to execute custom code.
public class DebeziumMySqlMetricsConnector extends MySqlConnector{
@Override
	public void start(Map<String, String> props) {
  • In the start method of the custom connector class (DebeziumMySqlMetricsConnector) that we are creating, set the following parameters to allow customization of the JMX Server properties by retrieving connector configuration from a config file.

connect.jmx.port – The port number on which the RMI registry needs to be created. JMXConnectorServer would listen to the incoming connections on this port.

database.server.name – Name of the database that is the source for the CDC.

It also retrieves the CloudWatch configuration related properties that will be used while pushing the JMX metrics to CloudWatch.

cloudwatch.namespace.name – CloudWatch NameSpace to which the metrics need to be pushed as custom metrics

cloudwatch.region – CloudWatch Region where the custom namespace is created in your AWS account

connectJMXPort = Integer.parseInt(props.getOrDefault(CONNECT_JMX_PORT_KEY, String.valueOf(DEFAULT_JMX_PORT)));
databaseServerName = props.getOrDefault(DATABASE_SERVER_NAME_KEY, "");
cwNameSpace = props.getOrDefault(CW_NAMESPACE_KEY, DEFAULT_CW_NAMESPACE);
cwRegion = props.getOrDefault(CW_REGION_KEY, null);
  • Create an RMI registry on the specified port (connectJMXPort). This registry is used by the JMXConnectorServer to store the RMI objects corresponding to the MBeans in the JMX registry. This allows the JMX clients to look up and access the MBeans on the PlatformMBeanServer.

LocateRegistry.createRegistry(connectJMXPort);

  • Retrieve the PlatformMBeanServer and construct the JMXServiceURL which is in the format service:jmx:rmi://localhost/jndi/rmi://localhost:<<jmx.port>>/jmxrmi. Create a new JMXConnectorServer instance using the JMXConnectorServerFactory and the JMXServiceURL and start the JMXConnectorServer instance.
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
String jmxServiceURL = String.format(JMX_URL_TEMPLATE, connectJMXPort);
JMXServiceURL url = new JMXServiceURL(jmxServiceURL);
JMXConnectorServer svr = JMXConnectorServerFactory.newJMXConnectorServer(url, null, mbs);
svr.start();

Implement JMX metrics exporter: Create a JMX client to connect to the JMX server, query the MilliSecondBehindSource metric from the JMX server, convert it into the required format, and export it to CloudWatch.

  • Connect to the JMX Server using the JMXConnectorFactory and JMXServiceURL
JMXServiceURL jmxUrl = new JMXServiceURL(String.format(JMX_URL_TEMPLATE,DebeziumMySqlMetricsConnector.getConnectJMXPort()));
JMXConnector jmxConnector = JMXConnectorFactory.connect(jmxUrl, null);
jmxConnector.connect();
  • Query the MBean object that holds the corresponding metric, for example, MilliSecondsBehindSource, and retrieve the metric value using sample code provided in msk-connect-custom-plugin-jmx. (you can choose one or more metrics).
  • Schedule the execution of your JMX metrics exporter at regular intervals.

getScheduler().schedule(new JMXMetricsExporter(), SCHEDULER_INITIAL_DELAY, SCHEDULER_PERIOD);

Export metrics to CloudWatch: Implement the logic to push relevant JMX metrics to CloudWatch. You can use the AWS SDK for Java to interact with the CloudWatch PutMetricData API or use the CloudWatch Logs subscription filter to ingest the metrics from a dedicated Kafka topic.

Dimension dimension = Dimension.builder()
.name("DBServerName")
.value(DebeziumMySqlMetricsConnector.getDatabaseServerName())
.build();
MetricDatum datum = MetricDatum.builder()
	     .metricName("MilliSecondsBehindSource")
	     .unit(StandardUnit.NONE)
	     .value(Double.valueOf(msBehindSource))
	     .timestamp(instant)
	     .dimensions(dimension).build();
PutMetricDataRequest request = PutMetricDataRequest.builder()
	  .namespace(DebeziumMySqlMetricsConnector.getCWNameSpace())
	  .metricData(datum).build();
cw.putMetricData(request);

For more information, see the sample implementation for the custom module in aws-samples in GitHub. This sample also provides custom plugins packaged with two different versions of Debezium MySQL connector (debezium-connector-mysql-2.5.2.Final-plugin and debezium-connector-mysql-2.7.3.Final-plugin) and the following steps would explain the steps to build a custom plugin using your custom code.

2. Package the custom module and Debezium MySQL connector as a custom plugin

Build and package the Maven project with the custom code as a JAR file and include the JAR file in the debezium-connector-mysql-2.5.2.Final-plugin folder downloaded from maven repo. Package the updated debezium-connector-mysql-2.5.2.Final-plugin as a ZIP file (Amazon MSK Connect accepts custom plugins in ZIP or JAR format). Alternatively, you can use the prebuiltcustom-debezium-mysql-connector-plugin.zip available in GitHub.

Choose the Debezium connector version (2.5 or 2.7) that fits your requirement.

When you have to upgrade to a new version of the Debezium MySQL connector, you can update the version of the dependency and build the custom module and deploy it. By doing this, you can maintain the custom plugin without modifying the original connector code. The GitHub samples provide ready-to-use plugins for two Debezium connector versions. However, you can follow the same approach to upgrade to the latest connector version as well.

Create a custom plugin in Amazon MSK

  1. If you have set up your AWS resources by following the Getting Started lab, open Amazon S3 console and locate the bucket msk-lab-${ACCOUNT_ID}-plugins-bucket/debezium .
  2. Upload the custom plugin created in the previous section custom-debezium-mysql-connector-plugin.zip to msk-lab-${ACCOUNT_ID}-plugins-bucket/debezium, as shown in the following figure.

msk-lab-s3-plugin-bucket

  1. Switch to the Amazon MSK console and choose Custom plugins in the navigation pane. Choose Create custom plugin and, browse the S3 bucket that you created above and select the custom plugin ZIP file you just uploaded.

custom-connector-plugin-s3-object

  1. Enter custom-debezium-mysql-connector-plugin for the plugin name. Optionally, enter a description and choose Create Custom Plugin.

msk-connect-create-custom-plugin-console

  1. After a few seconds you should see the plugin is created and the status is Active.
  2. Customize the worker configuration for the connector by following the instructions in the Customize worker configuration lab.

3. Create an Amazon MSK connector

The next step is to create an MSK connector.

  1. From the MSK section choose Connectors, then choose Create connector. Choose custom-debezium-mysql-connector-plugin from the list of Custom plugins, then choose Next.

msk-plugin-create

  1. Enter custom-debezium-mysql-connector in the Name textbox, and a description for the connector.

connector-properties-console-in-MSK-connect

  1. Select the MSKCluster-msk-connect-lab from the listed MSK clusters. From the Authentication dropdown, select IAM.
  2. Copy the following configuration and paste it in the connector configuration textbox.
  • Replace the <Your Aurora MySQL database endpoint>, <Your Database Password>, <Your MSK Bootstrap Server Address>, and <Your CloudWatch Region>placeholders with the corresponding details for the resources in your account.
  • Review the topic.prefix, database.user, topic.prefix, database.server.id, database.server.name, database.port, database.include.listparameters in the configuration. These parameters are configured with the values used in the workshop. Update them with the details corresponding to your configuration if you have customized it in your account.
  • Note that the connector.classparameter is updated with the qualified name of the subclass of MySqlConnector class that you created in the custom module.
  • The connect.jmx.portparameter specifies the default port to start the JMX server. You can configure this to any available port.
connector.class=com.amazonaws.msk.debezium.mysql.connect.DebeziumMySqlMetricsConnector tasks.max=1
include.schema.changes=true
topic.prefix=salesdb
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
database.user=master
database.server.id=123456
database.server.name=salesdb
database.port=3306
key.converter.schemas.enable=false
database.hostname=<Your Aurora MySQL database endpoint>
database.password=<Your Database Password>
value.converter.schemas.enable=false
database.include.list=salesdb
schema.history.internal.kafka.topic=internal.dbhistory.salesdb
schema.history.internal.kafka.bootstrap.servers=<Your MSK Bootstrap Server Address>
schema.history.internal.producer.sasl.mechanism=AWS_MSK_IAM
schema.history.internal.consumer.sasl.mechanism=AWS_MSK_IAM
schema.history.internal.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
schema.history.internal.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
schema.history.internal.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
schema.history.internal.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
schema.history.internal.consumer.security.protocol=SASL_SSL
schema.history.internal.producer.security.protocol=SASL_SSL
connect.jmx.port=7098
cloudwatch.namespace.name=MSK_Connect
cloudwatch.region=<Your CloudWatch Region>

connector-properties-configuration-settings

5. Follow the remaining instructions from the Create MSK Connector lab and create the connector. Verify that the connector status changes to Running.

Debezium MySQL custom connector version (2.7.3) provides additional flexibility to configure optional properties that can be added to your MSK connector configuration and selectively include and exclude metrics to emit to CloudWatch. The following are the example configuration properties that can be used with version 2.7.3 :

  • cloudwatch.debezium.streaming.metrics.include – A comma-separated list of streaming metrics type that must be exported to CloudWatch as custom metrics.
  • cloudwatch.debezium.streaming.metrics.exclude – Specify a comma-separated list of streaming metrics types to exclude from being sent to CloudWatch as custom metrics.
  • Similarly include and exclude properties for snapshot metrics type are cloudwatch.debezium.snapshot.metrics.include and cloudwatch.debezium.snapshot.metrics.exclude
  • Include and exclude properties for schema history metrics type are cloudwatch.debezium.schema.history.metrics.include and cloudwatch.debezium.schema.history.metrics.exclude

The following is a sample configuration excerpt.

  "cloudwatch.debezium.streaming.metrics.include": "LastTransactionId, TotalNumberOfEventsSeen, MilliSecondsBehindSource,CapturedTables",
  "cloudwatch.debezium.streaming.metrics.exclude": "LastTransactionId",
  "cloudwatch.debezium.schema.history.metrics.exclude": "MilliSecondsSinceLastAppliedChange",

Review the GitHub README file for more details on the use of these properties with MSK connector configurations.

Verify the replication in the Kafka cluster and CloudWatch metrics

Follow the instructions in the Verify the replication in the Kafka cluster lab to set up a client and make changes to the source DB and verify that the changes are captured and sent to Kafka topics by the connector.

To verify that the connector has published the JMX metrics to CloudWatch, go to the CloudWatch console and choose Metrics in the navigation pane, then choose All Metrics. Under Custom namespace, you can see MSK_Connect with database name as the dimension. Select the database name to view the metrics.

Amazon CloudWatch interface with time series graph and MSK Connect metric details

Select the MilliSecondBehindSource metric with statistic as Average in the Graphed Metric to plot the graph. You can verify that the MilliSecondBehindSource metric value is greater than zero whenever any operation is being performed on the source database and returns to zero during the idle time.

 Amazon CloudWatch console showing custom metric visualization with detailed controls and timeline analysis

Clean up

Delete the resources that you created such as the Aurora DB, Amazon MSK Cluster and connectors by following the instructions at Cleanup in the Amazon MSK Connect lab if you have been following along to set up the solution on your account.

Conclusion

In this post, we showed you how to extend the Debezium MySQL connector plugin with an additional module to export the JMX metrics to CloudWatch as custom metrics. As a next step, you can create a CloudWatch alarm to monitor the metrics and take remediation actions when the alarm is triggered. In addition to exporting the JMX metrics to CloudWatch, you can export these metrics to third-party applications such as Prometheus or DataDog using CloudWatch Metric Streams. You can follow a similar approach to export the JMX metrics of other connectors from MSK Connect. You can learn more about creating your own connectors by visiting the Connector Developer Guide and how to deploy them as custom plugins in the MSK Connect documentation.


About the authors

Jaydev NathJaydev Nath is a Solutions Architect at AWS, where he works with ISV customers to build secure, scalable, reliable, and cost-efficient cloud solutions. He brings strong expertise in building SaaS architecture on AWS with a focus on Generative AI and data analytics technologies to help deliver practical, valuable business outcomes for customers.

David John Chakram is a Principal Solutions Architect at AWS. He specializes in building data platforms and architecting seamless data ecosystems. With a profound passion for databases, data analytics, and machine learning, he excels at transforming complex data challenges into innovative solutions and driving businesses forward with data-driven insights.

Sharmila Shanmugam is a Solutions Architect at Amazon Web Services. She is passionate about solving the customers’ business challenges with technology and automation and reduce the operational overhead. In her current role, she helps customers across industries in their digital transformation journey and build secure, scalable, performant and optimized workloads on AWS.

Express brokers for Amazon MSK: Turbo-charged Kafka scaling with up to 20 times faster performance

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/express-brokers-for-amazon-msk-turbo-charged-kafka-scaling-with-up-to-20-times-faster-performance/

Managing and scaling data streams efficiently is a cornerstone of success for many organizations. Apache Kafka has emerged as a leading platform for real-time data streaming, offering unmatched scalability and reliability. However, setting up and scaling Kafka clusters can be challenging, requiring significant time, expertise, and resources. This is where Amazon Managed Streaming for Apache Kafka (Amazon MSK) Express brokers come into play.

Express brokers are a new broker type in Amazon MSK that are designed to simplify Kafka deployment and scaling.

In this post, we walk you through the implementation of MSK Express brokers, highlighting their core features, benefits, and best practices for rapid Kafka scaling.

Key features of MSK Express brokers

MSK Express brokers revolutionize Kafka cluster management by delivering exceptional performance and operational simplicity. With up to three times more throughput per broker, Express brokers can sustainably handle an impressive 500 MBps ingress and 1000 MBps egress on m7g.16xl instances, setting new standards for data streaming performance.

Their standout feature is their fast scaling capability—up to 20 times faster than standard Kafka brokers—allowing rapid cluster expansion within minutes. This is complemented by 90% faster recovery from failures and built-in three-way replication, providing robust reliability for mission-critical applications.

Express brokers eliminate traditional storage management responsibility by offering unlimited storage without pre-provisioning, while simplifying operations through preconfigured best practices and automated cluster management. With full compatibility with existing Kafka APIs and comprehensive monitoring through Amazon CloudWatch and Prometheus, MSK Express brokers provide an ideal solution for organizations seeking a highly-performant and low-maintenance data streaming infrastructure.

Comparison with traditional Kafka deployment

Although Kafka provides robust fault-tolerance mechanisms, its traditional architecture, where brokers store data locally on attached storage volumes, can lead to several issues impacting the availability and resiliency of the cluster. The following diagram compares the deployment architecture.

Comparison with traditional Kafka deployment

The traditional architecture comes with the following limitations:

  • Extended recovery times – When a broker fails, recovery requires copying data from surviving replicas to the newly assigned broker. This replication process can be time-consuming, particularly for high-throughput workloads or in cases where recovery requires a new volume, resulting in extended recovery periods and reduced system availability.
  • Suboptimal load distribution – Kafka achieves load balancing by redistributing partitions across brokers. However, this rebalancing operation can strain system resources and take considerable time due to the volume of data that must be transferred between nodes.
  • Complex scaling operations – Expanding a Kafka cluster requires adding brokers and redistributing existing partitions across the new nodes. For large clusters with substantial data volumes, this scaling operation can impact performance and require significant time to complete.

MSK Express brokers offers fully managed and highly available Regional Kafka storage. This significantly decouples compute and storage resources, addressing the aforementioned challenges and improving the availability and resiliency of Kafka clusters. The benefits include:

  • Faster and more reliable broker recovery – When Express brokers recover, they do so in up to 90% less time than standard brokers and place negligible strain on the clusters’ resources, which makes recovery faster and more reliable.
  • Efficient load balancing – Load balancing in MSK Express brokers is faster and less resource-intensive, enabling more frequent and seamless load balancing operations.
  • Faster scaling – MSK Express brokers enable efficient cluster scaling through rapid broker addition, minimizing data transfer overhead and partition rebalancing time. New brokers become operational quickly due to accelerated catch-up processes, resulting in faster throughput improvements and minimal disruption during scaling operations.

Scaling use case example

Consider a use case requiring 300 MBps data ingestion on a Kafka topic. We implemented this using an MSK cluster with three m7g.4xlarge Express brokers. The configuration included a topic with 3,000 partitions and 24-hour data retention, with each broker initially managing 1,000 partitions.

To prepare for anticipated midday peak traffic, we needed to double the cluster capacity. This scenario highlights one of Express brokers’ key advantages: rapid, safe scaling without disrupting application traffic or requiring extensive advance planning. During this scenario, the cluster was actively handling approximately 300 MBps of ingestion. The following graph shows the total ingress on this cluster and the number of partitions it is holding across three brokers.

Scaling use case example

The scaling process involved two main steps:

  • Adding three additional brokers to the cluster, which completed in approximately 18 minutes
  • Using Cruise Control to redistribute the 3,000 partitions evenly across all six brokers, which took about 10 minutes

Scaling use case example

As shown in the following graph, the scaling operation completed smoothly, with partition rebalancing occurring rapidly across all six brokers while maintaining uninterrupted producer traffic.

Scaling use case example

Notably, throughout the entire process, we observed no disruption to producer traffic. The entire operation to double the cluster’s capacity was completed in just 28 minutes, demonstrating MSK Express brokers’ ability to scale efficiently with minimal impact on ongoing operations.

Best practices

Consider the following guidelines to adopt MSK Express brokers:

  • When implementing new streaming workloads on Kafka, select MSK Express brokers as your default option. If uncertain about your workload requirements, begin with express.m7g.large instances.
  • Use the Amazon MSK sizing tool to calculate optimal broker count and type for your workload. Although this provides a good baseline, always validate through load testing that simulates your real-world usage patterns.
  • Review and implement MSK Express broker best practices.
  • Choose larger instance types for high-throughput workloads. A smaller number of large instances is preferable to many smaller instances, because fewer total brokers can simplify cluster management operations and reduce operational overhead.

Conclusion

MSK Express brokers represent a significant advancement in Kafka deployment and management, offering a compelling solution for organizations seeking to modernize their data streaming infrastructure. Through its innovative architecture that decouples compute and storage, MSK Express brokers deliver simplified operations, superior performance, and rapid scaling capabilities.

The key advantages demonstrated throughout this post—including 3 times higher throughput, 20 times faster scaling, and 90% faster recovery times—make MSK Express brokers an attractive option for both new Kafka implementations and migrations from traditional deployments.

As organizations continue to face growing demands for real-time data processing, MSK Express brokers provide a future-proof solution that combines the reliability of Kafka with the operational simplicity of a fully managed service.

To get started, refer to Amazon MSK Express brokers.


About the Author

masudursMasudur Rahaman Sayem is a Streaming Data Architect at AWS with over 25 years of experience in the IT industry. He collaborates with AWS customers worldwide to architect and implement sophisticated data streaming solutions that address complex business challenges. As an expert in distributed computing, Sayem specializes in designing large-scale distributed systems architecture for maximum performance and scalability. He has a keen interest and passion for distributed architecture, which he applies to designing enterprise-grade solutions at internet scale.

Let’s Architect! Modern data architectures

Post Syndicated from Luca Mezzalira original https://aws.amazon.com/blogs/architecture/lets-architect-modern-data-architectures-2/

Data is the fuel for AI; modern data is even more important for generative AI and advanced data analytics, producing more accurate, relevant, and impactful results. Modern data comes in various forms: real-time, unstructured, or user-generated. Each form requires a different solution. AWS’s data journey began with Amazon Simple Storage Service (Amazon S3) in 2006, marking the start of cloud-based data storage at scale. Since then, AWS has expanded its data offerings to cover the entire data lifecycle, offering a comprehensive ecosystem of services designed to harness the full potential of modern data, from ingestion and storage to processing and analysis, supporting the entire lifecycle of AI-driven innovation.

In this blog post, we will cover some AWS use cases for modern data architectures, showing how AWS enables organizations to leverage the power of data and generative AI technologies.

Key considerations when choosing a database for your generative AI applications

This blog focuses on selecting the right database for generative AI applications and provide knowledge that can enhance your understanding, guide your decision making, and ultimately lead to more successful AI projects. Selecting the right database for generative AI applications is not just about storage; it significantly impacts performance, scalability, ease of integration, and overall effectiveness of the AI solution.

Diagram that shows the key steps in a RAG workflow

Figure 1. Diagram that shows the key steps in a RAG workflow

Take me to this blog

Strategies for building a data mesh-based enterprise solution on AWS

Adopting a data mesh architecture can enhance an organization’s ability to manage data effectively, leading to improved performance, innovation, and overall business success. In this guidance, you will discover some strategies to build data mesh solutions on AWS.

Screenshot showing the AWS Prescriptive Guidance data mesh strategies page

Figure 2. The data mesh organizes data into domains, where data are seen as quality products to expose for consumption

Take me to this guidance

Optimizing storage price and performance with Amazon S3

Amazon S3 is an object storage service that supports multiple use cases, including data architectures. Big data pipelines can use Amazon S3 to store input, output, and intermediate results. Machine learning systems use Amazon S3 to process application logs and build the datasets both for experimentation and for production model training. Given the importance of the service and the number of use cases that a foundational storage service can support, we want to share best practices, performance optimization, and cost optimization strategies to work with Amazon S3. This video shows how Anthropic designs its architecture around Amazon S3 in their data architecture.

Storage class comparison chart showing classes of Amazon S3 options

Figure 3. Workloads with predictable patterns often have low retrieval rates for long periods of time after, so we can design to adopt cheaper storage classes for them

Take me to this video

If you are curious about the underlying architecture of Amazon S3 and want to drill down into its internal design, you can watch the re:Invent video Dive deep on Amazon S3.

How HPE Aruba Supply Chain optimized cost and performance by migrating to an AWS modern data architecture

This is an AWS case study on how HPE Aruba Supply Chain successfully re-architected and deployed their data solution by adopting a modern data architecture on AWS. The new solution has helped Aruba integrate data from multiple sources, along with optimizing their cost, performance, and scalability. This has also allowed the Aruba Supply Chain leadership to receive in-depth and timely insights for better decision-making, thereby elevating the customer experience.

Reference architecture diagram showing HPE Aruba Supply Chain's architecture, featuring Amazon S3

Figure 4. Reference architecture diagram showing HPE Aruba Supply Chain’s architecture, featuring Amazon S3

Take me to this blog

AWS Modern Data Architecture Immersion Day

This workshop highlights advantage of adopting a modern data architecture on AWS. By integrating the flexibility of a data lake with specialized analytics services, organizations can significantly enhance their data-driven decision-making capabilities. We encourage everyone to explore how this architecture can streamline their analytics processes and support diverse use cases, from real-time insights to advanced machine learning. It’s an excellent opportunity to leverage modern data architecture.

Diagram showing AWS services in a flywheel

Figure 5. Data architectures are fundamental to power use cases ranging from analytics to machine learning

Take me to this workshop

See you next time!

Thanks for reading! In the next blog, we will cover some tips on how to get the best out of your developer experience on AWS. To revisit any of our previous posts or explore the entire series, visit the Let’s Architect! page.

Modernize your legacy databases with AWS data lakes, Part 2: Build a data lake using AWS DMS data on Apache Iceberg

Post Syndicated from Shaheer Mansoor original https://aws.amazon.com/blogs/big-data/modernize-your-legacy-databases-with-aws-data-lakes-part-2-build-a-data-lake-using-aws-dms-data-on-apache-iceberg/

This is part two of a three-part series where we show how to build a data lake on AWS using a modern data architecture. This post shows how to load data from a legacy database (SQL Server) into a transactional data lake (Apache Iceberg) using AWS Glue. We show how to build data pipelines using AWS Glue jobs, optimize them for both cost and performance, and implement schema evolution to automate manual tasks. To review the first part of the series, where we load SQL Server data into Amazon Simple Storage Service (Amazon S3) using AWS Database Migration Service (AWS DMS), see Modernize your legacy databases with AWS data lakes, Part 1: Migrate SQL Server using AWS DMS.

Solution overview

In this post, we go over the process of building a data lake, providing the rationale behind the different decisions, and share best practices when building such a solution.

The following diagram illustrates the different layers of the data lake.

Overall Architecture

To load data into the data lake, AWS Step Functions can define a workflow, Amazon Simple Queue Service (Amazon SQS) can track the order of incoming files, and AWS Glue jobs and the Data Catalog can be used create the data lake silver layer. AWS DMS produces files and writes these files to the bronze bucket (as we explained in Part 1).

We can turn on Amazon S3 notifications and push the new arriving file names to an SQS first-in-first-out (FIFO) queue. A Step Functions state machine can consume messages from this queue to process the files in the order they arrive.

For processing the files, we need to create two types of AWS Glue jobs:

  • Full load – This job loads the entire table data dump into an Iceberg table. Data types from the source are mapped to an Iceberg data type. After the data is loaded, the job updates the Data Catalog with the table schemas.
  • CDC – This job loads the change data capture (CDC) files into the respective Iceberg tables. The AWS Glue job implements the schema evolution feature of Iceberg to handle schema changes such as addition or deletion of columns.

As in Part 1, the AWS DMS jobs will place the full load and CDC data from the source database (SQL Server) in the raw S3 bucket. Now we process this data using AWS Glue and save it to the silver bucket in Iceberg format. AWS Glue has a plugin for Iceberg; for details, see Using the Iceberg framework in AWS Glue.

Along with moving data from the bronze to the silver bucket, we also create and update the Data Catalog for further processing the data for the gold bucket.

The following diagram illustrates how the full load and CDC jobs are defined inside the Step Functions workflow.

Step Functions for loading data into the lake

In this post, we discuss the AWS Glue jobs for defining the workflow. We recommend using AWS Step Functions Workflow Studio, and setting up Amazon S3 event notifications and an SNS FIFO queue to receive the filename as messages.

Prerequisites

To follow the solution, you need the following prerequisites set up as well as certain access rights and AWS Identity and Access Management (IAM) privileges:

  • An IAM role to run Glue jobs
  • IAM privileges to create AWS DMS resources (this role was created in Part 1 of this series; you can use the same role here)
  • The AWS DMS job from Part 1 working and producing files for the source database on Amazon S3.

Create an AWS Glue connection for the source database

We need to create a connection between AWS Glue and the source SQL Server database so the AWS Glue job can query the source for the latest schema while loading the data files. To create the connection, follow these steps:

  1. On the AWS Glue console, choose Connections in the navigation pane.
  2. Choose Create custom connector.
  3. Give the connection a name and choose JDBC as the connection type.
  4. In the JDBC URL section, enter the following string and replace the name of your source database endpoint and database that was set up in Part 1: jdbc:sqlserver://{Your RDS End Point Name}:1433/{Your Database Name}.
  5. Select Require SSL connection, then choose Create connector.

Clue Connections

Create and configure the full load AWS Glue job

Complete the following steps to create the full load job:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose Script editor and select Spark.
  3. Choose Start fresh and select Create script.
  4. Enter a name for the full load job and choose the IAM role (mentioned in the prerequisites) for running the job.
  5. Finish creating the job.
  6. On the Job details tab, expand Advanced properties.
  7. In the Connections section, add the connection you created.
  8. Under Job parameters, pass the following arguments to the job:
    1. target_s3_bucket – The silver S3 bucket name.
    2. source_s3_bucket – The raw S3 bucket name.
    3. secret_id – The ID of the AWS Secrets Manager secret for the source database credentials.
    4. dbname – The source database name.
    5. datalake-formats – This sets the data format to iceberg.

Glue Job Parameters

The full load AWS Glue job starts after the AWS DMS task reaches 100%. The job loops over the files located in the raw S3 bucket and processes them one at time. For each file, the job infers the table name from the file name and gets the source table schema, including column names and primary keys.

If the table has one or more primary keys, the job creates an equivalent Iceberg table. If the job has no primary key, the file is not processed. In our use case, all the tables have primary keys, so we enforce this check. Depending on your data, you might need to handle this scenario differently.

You can use the following code to process the full load files. To start the job, choose Run.

import sys, boto3, json
import boto3
import json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession

#Get the arguments passed to the script
args = getResolvedOptions(sys.argv, ['JOB_NAME',
                           'target_s3_bucket',
                           'secret_id',
                           'source_s3_bucket'])
dbname = "AdventureWorks"
schema = "HumanResources"

#Initialize parameters
target_s3_bucket = args['target_s3_bucket']
source_s3_bucket = args['source_s3_bucket']
secret_id = args['secret_id']
unprocessed_tables = []
drop_column_list = ['db', 'table_name', 'schema_name', 'Op', 'last_update_time']  # DMS added columns

#Helper Function: Get Credentials from Secrets Manager
def get_db_credentials(secret_id):
    secretsmanager = boto3.client('secretsmanager')
    response = secretsmanager.get_secret_value(SecretId=secret_id)
    secrets = json.loads(response['SecretString'])
    return secrets['host'], int(secrets['port']), secrets['username'], secrets['password']

#Helper Function: Load Iceberg table with Primary key(s)
def load_table(full_load_data_df, dbname, table_name):

    try:
        full_load_data_df = full_load_data_df.drop(*drop_column_list)
        full_load_data_df.createOrReplaceTempView('full_data')

        query = """
        CREATE TABLE IF NOT EXISTS glue_catalog.{0}.{1}
        USING iceberg
        LOCATION "s3://{2}/{0}/{1}"
        AS SELECT * FROM full_data
        """.format(dbname, table_name, target_s3_bucket)
        spark.sql(query)
        
        #Update Table property to accept Schema Changes
        spark.sql("""ALTER TABLE glue_catalog.{0}.{1} SET TBLPROPERTIES (
                      'write.spark.accept-any-schema'='true'
                    )""".format(dbname, table_name))
        
    except Exception as ex:
        print(ex)
        failed_table = {"table_name": table_name, "Reason": ex}
        unprocessed_tables.append(failed_table)
        
def get_table_key(host, port, username, password, dbname):
    
    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, dbname)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.TABLE_CONSTRAINTS', properties=connectionProperties).createOrReplaceTempView("TABLE_CONSTRAINTS")
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE', properties=connectionProperties).createOrReplaceTempView("CONSTRAINT_COLUMN_USAGE")
    df_table_pkeys = spark.sql("select c.TABLE_NAME, C.COLUMN_NAME as primary_key FROM TABLE_CONSTRAINTS T JOIN CONSTRAINT_COLUMN_USAGE C ON C.CONSTRAINT_NAME=T.CONSTRAINT_NAME WHERE T.CONSTRAINT_TYPE='PRIMARY KEY'")
    return df_table_pkeys


#Setup Spark configuration for reading and writing Iceberg tables
spark = (
    SparkSession.builder
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://{0}".format(dbname))
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .getOrCreate()
)


#Initialize MSSQL credentials
host, port, username, password = get_db_credentials(secret_id)

#Initialize primary keys for all tables
df_table_pkeys = get_table_key(host, port, username, password, dbname)

#Read Full load csv files from s3
s3 = boto3.client('s3')
full_load_tables = s3.list_objects_v2(Bucket=source_s3_bucket, Prefix="raw/{0}/{1}".format(args['dbname'], args['schema']))

#Loop over files
for item in full_load_tables['Contents']:
    pkey_list = []
    table_name = item["Key"].split("/")[3].lower()
    print("Table name {0}".format(table_name))
    current_table_df = df_table_pkeys.where(df_table_pkeys.TABLE_NAME == table_name)

    # Only Process tables with at least 1 Primary key
    if not current_table_df.isEmpty():
        for i in current_table_df.collect():
            pkey_list.append(i["primary_key"])
    else:
        failed_table = {"table_name": table_name, "Reason": "No primary key"}
        unprocessed_tables.append(failed_table)
        # ToDo Handle these cases

    full_data_path = "s3://{0}/{1}".format(source_s3_bucket, item['Key'])
    full_load_data_df = (spark
                        .read
                        .option("header", True)
                        .option("inferSchema", True)
                        .option("recursiveFileLookup", "true")
                        .csv(full_data_path)
                        )

    primary_key = ",".join(pkey_list)

    if table_name not in unprocessed_tables:
        load_table(full_load_data_df, dbname, table_name)

When the job is complete, it creates the database and tables in the Data Catalog, as shown in the following screenshot.

Data lake silver layer data

Create and configure the CDC AWS Glue job

The CDC AWS Glue job is created similar to the full load job. As with the full load AWS Glue job, you need to use the source database connection and pass the job parameters with one additional parameter, cdc_file, which contains the location of the CDC file to be processed. Because a CDC file can contain data for multiple tables, the job loops over the tables in a file and loads the table metadata from the source table ( RDS column names).

If the CDC operation is DELETE, the job deletes the records from the Iceberg table. If the CDC operation is INSERT or UPDATE, the job merges the data into the Iceberg table.

You can use the following code to process the CDC files. To start the job, choose Run

import sys
import boto3
import json
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import SparkSession

# Get the arguments passed to the script
args = getResolvedOptions(sys.argv, ['JOB_NAME',
                           'target_s3_bucket',
                           'secret_id',
                           'source_s3_bucket',
                           'cdc_file'])
dbname = "AdventureWorks"
schema = "HumanResources"
target_s3_bucket = args['target_s3_bucket']
source_s3_bucket = args['source_s3_bucket']
secret_id = args['secret_id']
cdc_file = args['cdc_file']
unprocessed_tables = []
drop_column_list = ['db', 'table_name', 'schema_name', 'Op', 'last_update_time']  # DMS added columns
source_s3_cdc_file_key = "raw/AdventureWorks/cdc/" + cdc_file



# Helper Function: Get Credentials from Secrets Manager
def get_db_credentials(secret_id):
    secretsmanager = boto3.client('secretsmanager')
    response = secretsmanager.get_secret_value(SecretId=secret_id)
    secrets = json.loads(response['SecretString'])
    return secrets['host'], int(secrets['port']), secrets['username'], secrets['password']

# Helper Function: Column names from RDS
def get_table_colums(table, host, port, username, password, dbname):

    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, dbname)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.COLUMNS', properties= connectionProperties).createOrReplaceTempView("TABLE_COLUMNS")
    columns = list((row.COLUMN_NAME) for (index, row) in spark.sql("select TABLE_NAME, TABLE_CATALOG, COLUMN_NAME from TABLE_COLUMNS where TABLE_NAME = '{0}' and TABLE_CATALOG = '{1}'".format(table, dbname)).select("COLUMN_NAME").toPandas().iterrows())
    return columns

# Helper Function: Get Colum names and datatypes from RDS
def get_table_colum_datatypes(table, host, port, username, password, dbname):

    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, dbname)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.COLUMNS', properties= connectionProperties).createOrReplaceTempView("TABLE_COLUMNS")
    return spark.sql("select TABLE_NAME, COLUMN_NAME, DATA_TYPE from TABLE_COLUMNS WHERE TABLE_NAME ='{0}'".format(table))

# Helper Function: Setup the primary key condition
def get_iceberg_table_condition(database, tablename):
    
    jdbc_url = "jdbc:sqlserver://{0}:{1};databaseName={2}".format(host, port, database)
    
    connectionProperties = {
      "user" : username,
      "password" : password
    }
    
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.TABLE_CONSTRAINTS', properties=connectionProperties).createOrReplaceTempView("TABLE_CONSTRAINTS")
    spark.read.jdbc(url=jdbc_url, table='INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE', properties=connectionProperties).createOrReplaceTempView("CONSTRAINT_COLUMN_USAGE")
    
    condition = ''
    
    for key in spark.sql("select C.COLUMN_NAME FROM TABLE_CONSTRAINTS T JOIN CONSTRAINT_COLUMN_USAGE C ON C.CONSTRAINT_NAME=T.CONSTRAINT_NAME WHERE T.CONSTRAINT_TYPE='PRIMARY KEY' AND c.TABLE_NAME = '{0}'".format(table)).collect():
        condition += "target.{0} = source.{0} and".format(key.COLUMN_NAME)
    return condition[:-4]

    
# Read incoming data from Amazon S3
def read_cdc_S3(source_s3_bucket, source_s3_cdc_file_key):
    
    inputDf = (spark
                    .read
                    .option("header", False)
                    .option("inferSchema", True)
                    .option("recursiveFileLookup", "true")
                    .csv("s3://" + source_s3_bucket + "/" + source_s3_cdc_file_key)
                    )
    return inputDf

# Setup Spark configuration for reading and writing Iceberg tables
spark = (
    SparkSession.builder
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.glue_catalog.warehouse", "s3://{0}".format(target_s3_bucket))
    .config("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
    .config("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .getOrCreate()
)

#Initialize MSSQL credentials
host, port, username, password = get_db_credentials(secret_id)

#Read the cdc file 
cdc_df = read_cdc_S3(source_s3_bucket, source_s3_cdc_file_key)

tables = cdc_df.toPandas()._c1.unique().tolist()

#Loop over tables in the cdc file
for table in tables:
    #Create dataframes for delets and for inserts and updates
    table_df_deletes = cdc_df.where((cdc_df._c1 == table) & (cdc_df._c0 == "D")).drop(cdc_df.columns[0], cdc_df.columns[1], cdc_df.columns[2], cdc_df.columns[3])
    table_df_upserts = cdc_df.where((cdc_df._c1 == table) & ((cdc_df._c0 == "I") | (cdc_df._c0 == "U"))).drop(cdc_df.columns[0], cdc_df.columns[1], cdc_df.columns[2], cdc_df.columns[3])
    
    #Update column names for the dataframes
    columns = get_table_colums(table, host, port, username, password, dbname) 
    selectExpr = [] 

    for column in columns: 
        selectExpr.append(cdc_df.where((cdc_df._c1 == table)).drop(cdc_df.columns[0], cdc_df.columns[1], cdc_df.columns[2], cdc_df.columns[3]).columns[columns.index(column)] + " as " + column)

    table_df_deletes = table_df_deletes.selectExpr(selectExpr) 
    table_df_upserts = table_df_upserts.selectExpr(selectExpr)
    
    #Process Deletes
    if table_df_deletes.count() > 0:
        
        print("Delete Triggered")
        table_df_deletes.createOrReplaceTempView('deleted_rows')
        
        sql_string = """MERGE INTO glue_catalog.{0}.{1} target
                        USING (SELECT * FROM deleted_rows) source
                        ON {2}
                        WHEN MATCHED 
                        THEN DELETE""".format(database, table.lower(), get_iceberg_table_condition(database, table.lower()))
        spark.sql(sql_string)
    
    if table_df_upserts.count() > 0:
        print("Upsert triggered")

        #Upsert Records when there are Schema Changes
        if len(table_df_upserts.columns) != len(columns):

            #Handle column deletes
            if len(table_df_upserts.columns) < len(columns):

                drop_columns = list(set(columns) - set(table_df_upserts.columns))

                for drop_column in drop_columns:
                    sql_string = """
                                    ALTER TABLE glue_catalog.{0}.{1}
                                    DROP COLUMN {2}""".format(dbname.lower(), table.lower(), drop_column)
                    spark.sql(sql_string)

            #Handle column additions
            elif len(table_df_upserts.columns) > len(columns):

                column_datatype_df = get_table_colum_datatypes(table, host, port, username, password, dbname)
                add_columns = list(set(table_df_upserts.columns) - set(columns))

                for add_column in add_columns:

                    #Set Iceberg data type
                    data_type = list((row.DATA_TYPE) for (index, row) in column_datatype_df.filter("COLUMN_NAME='{0}'".format(add_column)).select("DATA_TYPE").toPandas().iterrows())[0]

                    # Convert MSSQL Datatypes to Iceberg supported datatypes
                    if data_type.lower() in ["varchar", "char"]:
                        data_type = "string"

                    if data_type.lower() in ["bigint"]:
                        data_type = "long"

                    if data_type.lower() in ["array"]:
                        data_type = "list"

                    sql_string = """
                                    ALTER TABLE glue_catalog.{0}.{1}
                                    ADD COLUMN {2} {3}""".format(dbname.lower(), table.lower(), add_column, data_type)
                    spark.sql(sql_string)
                    
            #Create statement to update columns
            update_table_column_list = ""
            insert_column_list = ""
            columns = get_table_colums(table, host, port, username, password, dbname)             

            for column in columns:

                update_table_column_list+="""target.{0}=source.{0},""".format(column)
                insert_column_list+="""source.{0},""".format(column)

            table_df_upserts.createOrReplaceTempView('updated_rows')

            sql_string = """MERGE INTO glue_catalog.{0}.{1} target
                            USING (SELECT * FROM updated_rows) source
                            ON {2}
                            WHEN MATCHED 
                            THEN UPDATE SET {3} 
                            WHEN NOT MATCHED THEN INSERT ({4}) VALUES ({5})""".format(dbname.lower(), 
                                                                                      table.lower(), 
                                                                                      get_iceberg_table_condition(dbname.lower(), table.lower()), 
                                                                                      update_table_column_list.rstrip(","), 
                                                                                      ",".join(columns), 
                                                                                      insert_column_list.rstrip(","))

            spark.sql(sql_string)

    
print("CDC job complete")

The Iceberg MERGE INTO syntax can handle cases where a new column is added. For more details on this feature, see the Iceberg MERGE INTO syntax documentation. If the CDC job needs to process many tables in the CDC file, the job can be multi-threaded to process the file in parallel.

 

Configure EventBridge notifications, SQS queue, and Step Functions state machine

You can use EventBridge notifications to send notifications to EventBridge when certain events occur on S3 buckets, such as when new objects are created and deleted. For this post, we’re interested in the events when new CDC files from AWS DMS arrive in the bronze S3 bucket. You can create event notifications for new objects and insert the file names into an SQS queue. A Lambda function within Step Functions would consume from the queue, extract the file name, start a CDC Glue job, and pass the file name as a parameter to the job.

AWS DMS CDC files contain database insert, update, and delete statements. We need to process these in order, so we use an SQS FIFO queue, which preserves the order of messages in which they arrive. You can also configure Amazon SQS to set a time to live (TTL); this parameter defines how long a message stays in the queue before it expires.

Another important parameter to consider when configuring an SQS queue is the message visibility timeout value. While a message is being processed, it disappears from the queue to make sure that the message isn’t consumed by multiple consumers (AWS Glue jobs in our case). If the message is consumed successfully, it should be deleted from the queue before the visibility timeout. However, if the visibility timeout expires and the message isn’t deleted, the message reappears in the queue. In our solution, this timeout must be greater than the time it takes for the CDC job to process a file.

Lastly, we recommend using Step Functions to define a workflow for handling the full load and CDC files. Step Functions has built-in integrations to other AWS services like Amazon SQS, AWS Glue, and Lambda, which makes it a good candidate for this use case.

The Step Functions state machine starts with checking the status of the AWS DMS task. The AWS DMS tasks can be queried to check the status of the full load, and we check the value of the parameter FullLoadProgressPercent. When this value gets to 100%, we can start processing the full load files. After the AWS Glue job processes the full load files, we start polling the SQS queue to check the size of the queue. If the queue size is greater than 0, this means new CDC files have arrived and we can start the AWS Glue CDC job to process these files. The AWS Glue jobs processes the CDC files and deletes the messages from the queue. When the queue size reaches 0, the AWS Glue job exits and we loop in the Step Functions workflow to check the SQS queue size.

Because the Step Functions state machine is supposed to run indefinitely, it’s good to keep in mind that there will be service limits you need to adhere to. Namely, the maximum runtime, which is 1 year, and maximum run history size, i.e., state transitions or events for a state machine which is 25,000. We recommend adding an additional step at the end to check if either of these conditions are being met to stop the current state machine run and start a new one.

The following diagram illustrates how you can use Step Functions state machine history size to monitor and start a new Step Functions state machine run.

Step Functions Workflow

Configure the pipeline

The pipeline needs to be configured to address cost, performance, and resilience goals. You might want a pipeline that can load fresh data into the data lake and make it available quickly, and you might also want to optimize costs by loading large chunks of data into the data lake. At the same time, you should make the pipeline resilient and be able to recover in case of failures. In this section, we cover the different parameters and recommended settings to achieve these goals.

Step Functions is designed to process incoming AWS DMS CDC files by running AWS Glue jobs. AWS Glue jobs can take a couple of minutes to boot up, and when they’re running, it’s efficient to process large chunks of data. You can configure AWS DMS to write CSV files to Amazon S3 by configuring the following AWS DMS task parameters:

  • CdcMaxBatchInterval – Defines the maximum time limit AWS DMS will wait before writing a batch to Amazon S3
  • CdcMinFileSize – Defines the minimum file size AWS DMS will write to Amazon S3

Whichever condition is met first will invoke the write operation. If you want to prioritize data freshness, you should have a short CdcMaxBatchInterval value (10 seconds) and a small CdcMinFileSize value (1–5 MB). This will result in many small CSV files being written to Amazon S3 and will invoke a lot of AWS Glue jobs to process the data, making the extract, transform, and load (ETL) process faster. If you want to optimize costs, you should have a moderate CdcMaxBatchInterval (minutes) and a large CdcMinFileSize value (100–500 MB). In this scenario, we start a few AWS Glue jobs that will process large chunks of data, making the ETL flow more efficient. In a real-world use case, the required values for these parameters might fall somewhere that’s a good compromise between throughput and cost. You can configure these parameters when creating a target endpoint using the AWS DMS console, or by using the create-endpoint command in the AWS Command Line Interface (AWS CLI).

For the full list of parameters, see Using Amazon S3 as a target for AWS Database Migration Service.

Choosing the right AWS Glue worker types for the full load and CDC jobs is also crucial for performance and cost optimization. The AWS Glue (Spark) workers range from G1X to G8X, which have an increasing number of data processing units (DPUs). Full load files are usually much larger in size compared to CDC files, and therefore it’s more cost- and performance-effective to select a larger worker. For CDC files, it would be more cost-effective to select a smaller worker because files sizes are smaller.

You should design the Step Functions state machine in such a way that if anything fails, the pipeline can be redeployed after repair and resume processing from where it left off. One important parameter here is TTL for the messages in the SQS queue. This parameter defines how long a message stays in the queue before expiring. In case of failures, we want this parameter to be long enough for us to deploy a fix. Amazon SQS has a maximum of 14 days for a message’s TTL. We recommend setting this to a large enough value to minimize messages being expired in case of pipeline failures.

Clean up

Complete the following steps to clean up the resources you created in this post:

  1. Delete the AWS Glue jobs:
    1. On the AWS Glue console, choose ETL jobs in the navigation pane.
    2. Select the full load and CDC jobs and on the Actions menu, choose Delete.
    3. Choose Delete to confirm.
  2. Delete the Iceberg tables:
    1. On the AWS Glue console, under Data Catalog in the navigation pane, choose Databases.
    2. Choose the database in which the Iceberg tables reside.
    3. Select the tables to delete, choose Delete, and confirm the deletion.
  3. Delete the S3 bucket:
    1. On the Amazon S3 console, choose Buckets in the navigation pane.
    2. Choose the silver bucket and empty the files in the bucket.
    3. Delete the bucket.

Conclusion

In this post, we showed how to use AWS Glue jobs to load AWS DMS files into a transactional data lake framework such as Iceberg. In our setup, AWS Glue provided highly scalable and simple-to-maintain ETL jobs. Furthermore, we share a proposed solution using Step Functions to create an ETL pipeline workflow, with Amazon S3 notifications and an SQS queue to capture newly arriving files. We shared how to design this system to be resilient towards failures and to automate one of the most time-consuming tasks in maintaining a data lake: schema evolution.

In Part 3, we will share how to process the data lake to create data marts.


About the Authors

Shaheer Mansoor is a Senior Machine Learning Engineer at AWS, where he specializes in developing cutting-edge machine learning platforms. His expertise lies in creating scalable infrastructure to support advanced AI solutions. His focus areas are MLOps, feature stores, data lakes, model hosting, and generative AI.

Anoop Kumar K M is a Data Architect at AWS with focus in the data and analytics area. He helps customers in building scalable data platforms and in their enterprise data strategy. His areas of interest are data platforms, data analytics, security, file systems and operating systems. Anoop loves to travel and enjoys reading books in the crime fiction and financial domains.

Sreenivas Nettem is a Lead Database Consultant at AWS Professional Services. He has experience working with Microsoft technologies with a specialization in SQL Server. He works closely with customers to help migrate and modernize their databases to AWS.

How Getir unleashed data democratization using a data mesh architecture with Amazon Redshift

Post Syndicated from Asser Moustafa original https://aws.amazon.com/blogs/big-data/how-getir-unleashed-data-democratization-using-a-data-mesh-architecture-with-amazon-redshift/

This blog post is co-written with Pinar Yasar from Getir.

Amazon Redshift is a fully managed cloud data warehouse that’s used by tens of thousands of customers for price-performance, scale, and advanced data analytics. Amazon Redshift enables data warehousing by seamlessly integrating with other data stores and services in the modern data organization through features such as Zero-ETL, data sharing, streaming ingestion, data lake integration, and Redshift ML.

In this post, we explain how ultrafast delivery pioneer, Getir, unleashed the power of data democratization on a large scale through their data mesh architecture using Amazon Redshift.

We start by introducing Getir and their vision—to seamlessly, securely, and efficiently share business data across different teams within the organization for BI, extract, transform, and load (ETL), and other use cases. We’ll then explore how Amazon Redshift data sharing powered the data mesh architecture that allowed Getir to achieve this transformative vision. We will also explain how Getir’s data mesh architecture enabled data democratization, shorter time-to-market, and cost-efficiencies. Next, we’ll provide a broader overview of modern data trends reinforced by Getir’s vision. In conclusion, we’ll offer some thoughts on how you can apply a similar approach to eliminate costly and barrier-inducing data silos using Amazon Redshift.

Who is Getir?

Getir is an ultrafast delivery pioneer that revolutionized last-mile delivery in 2015 with its 10-minute grocery delivery proposition.Getir’s story started in Istanbul, and they have launched multiple products since inception: GetirFood, GetirMore, GetirWater, GetirLocals, GetirBitaksi (taxi service), GetirDrive (car rental service), and GetirJobs (recruitment).

Getir serves dozens of cities throughout the world with more than 30,000 employees. The following figure shows the Getir app.

Figure 1: Getir app

Figure 1: Getir app

Overview of Getir’s main use case

Getir’s business is characterized by a tremendous volume of data generation and growth, in addition to ample opportunities to gain valuable insights. However, siloing this data and creating friction for teams trying to access the information they needed wasn’t a viable option. Allowing teams to duplicate data wherever required can be an anti-pattern, leading to operational complexity, cost overruns, and fragile data storage bloat.

Similarly, relying on dedicated teams to create data extracts or insights for downstream consumers introduces bottlenecks, stifles innovation, and increases the time-to-market. This approach isn’t optimal for a data-driven organization like Getir, which needs to empower its teams with seamless access to the information they require to drive the business forward. The various business lines within the organization made it abundantly clear that they wanted unfettered access to the company’s entire data ecosystem in a secure, cost-efficient, near real-time, and well-governed manner.

Furthermore, the organization was anticipating the emergence of data-as-a-serviceservice and generative AI use cases in the near future. This would necessitate the ability to securely share and potentially monetize the company’s data with external partners, such as franchises.

Overview of Getir’s use of Amazon Redshift and modern data architecture

To strike a balance that addresses these concerns and enables Getir teams to effectively use the wealth of data to generate meaningful insights and drive strategic decision-making across the organization, we chose a data mesh architecture.

Getir’s data analytics environment encompasses hundreds of terabytes of data, thousands of tables, and billions upon billions of data rows. Additionally, it processes millions of messaging events daily, all of which must be ingested, refined, and made available to analysts querying multiple Amazon Redshift warehouses. The end-to-end service level agreements (SLAs) for this data ecosystem can be extremely aggressive, with requirements that can be as stringent as single-digit minutes to single-digit seconds. This underscores the scale and complexity of Getir’s data analytics capabilities, which must operate with the utmost efficiency and responsiveness to meet the demands of the business. We were able to easily implement the envisioned data mesh architecture using Amazon Redshift’s native data sharing capabilities.

Figure 2: Data mesh architecture using Amazon Redshift data sharing

Figure 2: Data mesh architecture using Amazon Redshift data sharing

As the preceding diagram shows, at the heart of Getir’s architecture, was an ETL Redshift data warehouse that was used for various data sets from all over the organization, creating a refined 360-degree view of critical assets. It also was a producer for downstream Redshift data warehouses.

The demand was quite heavy on this main ETL cluster, so we relied on data sharing to isolate noisy workloads on a different Redshift data warehouse without having to duplicate the data on the main ETL cluster.

Using Redshift data sharing, individual business line teams could now rely solely on their dedicated Redshift cluster to provide them with their own data and analytics capabilities, but also the refined 360-degree views of data generated from all over the organization—without any data duplication or overstepping compute boundaries. BI analysts gained access to all of the data they needed to power their most complex dashboards with consistent performance free of noisy jobs. Additional warehouses were integrated into the data mesh for visualization, reporting, and machine learning.

Another benefit of Amazon Redshift data sharing and the data mesh architecture, was the relative ease with which we were able to maintain a chargeback model for ensuring costs were spread fairly across different teams.

Finally, the data sharing capability also enabled the seamless propagation of newly created tables within a schema to the subscribed consumers.

Modern data trends reinforced by Getir’s case study

Getir’s case study showcases the strategic uses of a data mesh architecture and Amazon Redshift, but more importantly provides tremendous insights into five key trends across all industries as modern data organizations move away from costly data silos that hinder collaboration, business insights, and time-to-market. As highlighted in the following diagram, those trends are 1/interconnected, purpose-built data stores that enable users to access data regardless of its physical location, 2/data democratization empowering users with self-service analytics capabilities, 3/real-time insights to drive greater value from data, 4/resilient data services ensuring business continuity, 5/leveraging generative AI to extract even deeper insights from data more expeditiously.

Figure 3: Key trends in the modern data organization reinforced by Getir's use case and solution

Figure 3: Key trends in the modern data organization reinforced by Getir’s use case and solution

As Getir showed, the modern data organization is adopting data architectures that democratize data securely and enable self-service analytics. To realize data’s true potential, the modern data organization has progressed beyond basic dashboarding and reporting on limited, point-in-time data sets, and evolved to use more sophisticated ETL processes that can ingest data from diverse sources. Near real-time analytics in addition to predictive models have become standard fare, significantly reducing the time to actionable insights.

Furthermore, the data landscape has been democratized to empower analysts in numerous ways through the rise of transactional data lakes powered by open table formats such as Apache Iceberg and the assistance of generative AI. This holistic approach has elevated data organizations’ capabilities well beyond traditional reporting, unlocking greater business value from the wealth of data available.

Using generative AI with data mesh architecture

In addition to the five key trends previously mentioned, the present-day data landscape is characterized by three key facts that are leading data organizations like Getir to increasingly harness the power of generative AI to drive the next evolution of data-informed decision-making.

Data is an organization’s most valuable asset and the ability to effectively use data is central to an organization’s success and growth. Data analytics and insights are absolutely crucial to strengthening and expanding the business. Deriving meaningful insights from data is essential for making informed, strategic decisions. Democratizing data and enabling self-service analytics can greatly expand the range of business insights, while reducing the time to market for those insights. Empowering users across the organization to access and analyze data can unlock tremendous value. Generative AI’s ability to respond to natural language prompts, explore and analyze complex data, and summarize lengthy content makes it a valuable tool for translating large amounts of data into valuable insights. However, the true potential of generative AI for organizations lies in Retrieval Augmented Generation (RAG).

Out of the box, generative AI models start with a relatively generic knowledge base, which can lead to unreliable or inaccurate information. RAG addresses this by introducing the model to additional datasets that are specific to the organization or context. This allows generative AI models to produce far more accurate, attributable, and highly contextualized outputs to support decision-making.

Data mesh architecture can play a crucial role in enabling and facilitating RAG. By facilitating access to multiple data sources within the organization, the data mesh provides the necessary fuel for the generative AI model to draw from, resulting in more reliable and insightful information. This, in turn, empowers data-driven decision-making and helps organizations harness the full potential of their data assets.

Conclusion

In this post, we examined how Getir implemented a data mesh architecture and Amazon Redshift data sharing to meet their evolving data requirements. This entailed dedicated data warehouses tailored to different business lines and needs, while maintaining robust data governance and secure data access. Additionally, we highlighted the key industry trends that Getir’s case study reinforces across the broader data landscape. For more information, contact AWS or connect with your AWS Technical Account Manager or Solutions Architect, who will be happy to provide more detailed guidance and support.


About the Authors

Asser Moustafa is a Principal Worldwide Specialist Solutions Architect at AWS, based in Dallas, Texas, USA. He partners with customers worldwide, advising them on all aspects of their data architectures, migrations, and strategic data visions to help organizations adopt cloud-based solutions, maximize the value of their data assets, modernize legacy infrastructures, and implement cutting-edge capabilities like machine learning and advanced analytics. Prior to joining AWS, Asser held various data and analytics leadership roles, completing an MBA from New York University and an MS in Computer Science from Columbia University in New York. He is passionate about empowering organizations to become truly data-driven and unlock the transformative potential of their data.

Pinar Yasar is the Data Engineering Manager at Getir. Her passion is to accelerate self-service analytics for her internal customers and build highly scalable and cost-effective solutions in the cloud.

Building a scalable streaming data platform that enables real-time and batch analytics of electric vehicles on AWS

Post Syndicated from Ayush Agrawal original https://aws.amazon.com/blogs/big-data/building-a-scalable-streaming-data-platform-that-enables-real-time-and-batch-analytics-of-electric-vehicles-on-aws/

The automobile industry has undergone a remarkable transformation because of the increasing adoption of electric vehicles (EVs). EVs, known for their sustainability and eco-friendliness, are paving the way for a new era in transportation. As environmental concerns and the push for greener technologies have gained momentum, the adoption of EVs has surged, promising to reshape our mobility landscape.

The surge in EVs brings with it a profound need for data acquisition and analysis to optimize their performance, reliability, and efficiency. In the rapidly evolving EV industry, the ability to harness, process, and derive insights from the massive volume of data generated by EVs has become essential for manufacturers, service providers, and researchers alike.

As the EV market is expanding with many new and incumbent players trying to capture the market, the major differentiating factor will be the performance of the vehicles.

Modern EVs are equipped with an array of sensors and systems that continuously monitor various aspects of their operation including parameters such as voltage, temperature, vibration, speed, and so on. From battery management to motor performance, these data-rich machines provide a wealth of information that, when effectively captured and analyzed, can revolutionize vehicle design, enhance safety, and optimize energy consumption. The data can be used to do predictive maintenance, device anomaly detection, real-time customer alerts, remote device management, and monitoring.

However, managing this deluge of data isn’t without its challenges. As the adoption of EVs accelerates, the need for robust data pipelines capable of collecting, storing, and processing data from an exponentially growing number of vehicles becomes more pronounced. Moreover, the granularity of data generated by each vehicle has increased significantly, making it essential to efficiently handle the ever-increasing number of data points. The challenges include not only the technical intricacies of data management but also concerns related to data security, privacy, and compliance with evolving regulations.

In this blog post, we delve into the intricacies of building a reliable data analytics pipeline that can scale to accommodate millions of vehicles, each generating hundreds of metrics every second using Amazon OpenSearch Ingestion. We also provide guidelines and sample configurations to help you implement a solution.

Of the prerequisites that follow, the IOT topic rule and the Amazon Managed Streaming for Apache Kafka (Amazon MSK) cluster can be set up by following How to integrate AWS IoT Core with Amazon MSK. The steps to create an Amazon OpenSearch Service cluster are available in Creating and managing Amazon OpenSearch Service domains.

Prerequisites

Before you begin the implementing the solution, you need the following:

  • IOT topic rule
  • Amazon MSK Simple Authentication and Security Layer/Salted Challenge Response Mechanism (SASL/SCRAM) cluster
  • Amazon OpenSearch Service domain

Solution overview

The following architecture diagram provides a scalable and fully managed modern data streaming platform. The architecture uses Amazon OpenSearch Ingestion to stream data into OpenSearch Service and Amazon Simple Storage Service (Amazon S3) to store the data. The data in OpenSearch powers real-time dashboards. The data can also be used to notify customers of any failures occurring on the vehicle (see Configuring alerts in Amazon OpenSearch Service). The data in Amazon S3 is used for business intelligence and long-term storage.

Architecture diagram

In the following sections, we focus on the following three critical pieces of the architecture in depth:

1. Amazon MSK to OpenSearch ingestion pipeline

2. Amazon OpenSearch Ingestion pipeline to OpenSearch Service

3. Amazon OpenSearch Ingestion to Amazon S3

Solution Walkthrough

Step 1: MSK to Amazon OpenSearch Ingestion pipeline

Because each electric vehicle streams massive volumes of data to Amazon MSK clusters through AWS IoT Core, making sense of this data avalanche is critical. OpenSearch Ingestion provides a fully managed serverless integration to tap into these data streams.

The Amazon MSK source in OpenSearch Ingestion uses Kafka’s Consumer API to read records from one or more MSK topics. The MSK source in OpenSearch Ingestion seamlessly connects to MSK to ingest the streaming data into OpenSearch Ingestion’s processing pipeline.

The following snippet illustrates the pipeline configuration for an OpenSearch Ingestion pipeline used to ingest data from an MSK cluster.

While creating an OpenSearch Ingestion pipeline, add the following snippet in the Pipeline configuration section.

version: "2"
msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true                  
      topics: 
         - name: "ev-device-topic " 
           group_id: "opensearch-consumer" 
           serde_format: json                 
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        sts_role_arn: "arn:aws:iam:: ::<<account-id>>:role/opensearch-pipeline-Role"
        # Provide the region of the domain. 
        region: "<<region>>" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:<<region>>:<<account-id>>:cluster/<<name>>/<<id>>" 

When configuring Amazon MSK and OpenSearch Ingestion, it’s essential to establish an optimal relationship between the number of partitions in your Kafka topics and the number of OpenSearch Compute Units (OCUs) allocated to your ingestion pipelines. This optimal configuration ensures efficient data processing and maximizes throughput. You can read more about it in Configure recommended compute units (OCUs) for the Amazon MSK pipeline.

Step 2: OpenSearch Ingestion pipeline to OpenSearch Service

OpenSearch Ingestion offers a direct method for streaming EV data into OpenSearch. The OpenSearch sink plugin channels data from multiple sources directly into the OpenSearch domain. Instead of manually provisioning the pipeline, you define the capacity for your pipeline using OCUs. Each OCU provides 6 GB of memory and two virtual CPUs. To use OpenSearch Ingestion auto-scaling optimally, it’s essential to configure the maximum number of OCUs for a pipeline based on the number of partitions in the topics being ingested. If a topic has a large number of partitions (for example, more than 96, which is the maximum OCUs per pipeline), it’s recommended to configure the pipeline with a maximum of 1–96 OCUs. This way, the pipeline can automatically scale up or down within this range as needed. However, if a topic has a low number of partitions (for example, fewer than 96), it’s advisable to set the maximum number of OCUs to be equal to the number of partitions. This approach ensures that each partition is processed by a dedicated OCU enabling parallel processing and optimal performance. In scenarios where a pipeline ingests data from multiple topics, the topic with the highest number of partitions should be used as a reference to configure the maximum OCUs. Additionally, if higher throughput is required, you can create another pipeline with a new set of OCUs for the same topic and consumer group, enabling near-linear scalability.

OpenSearch Ingestion provides several pre-defined configuration blueprints that can help you quickly build your ingestion pipeline on AWS

The following snippet illustrates pipeline configuration for an OpenSearch Ingestion pipeline using OpenSearch as a SINK with a dead letter queue (DLQ) to Amazon S3. When a pipeline encounters write errors, it creates DLQ objects in the configured S3 bucket. DLQ objects exist within a JSON file as an array of failed events.

sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          hosts: [ "https://<<domain-name>>.<<region>>.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
            sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>" 
          # Provide the region of the domain. 
            region: "<<region>>" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # serverless: true 
          # index name can be auto-generated from topic name 
          index: "index_ev_pipe-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          #distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          dlq: 
            s3: 
            # Provide an S3 bucket 
              bucket: "<<bucket-name>>"
            # Provide a key path prefix for the failed requests
              key_path_prefix: "oss-pipeline-errors/dlq"
            # Provide the region of the bucket.
              region: "<<region>>"
            # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
              sts_role_arn: "arn:aws:iam:: <<account-id>>:role/<<role-name>>"

Step 3: OpenSearch Ingestion to Amazon S3

OpenSearch Ingestion offers a built-in sink for loading streaming data directly into S3. The service can compress, partition, and optimize the data for cost-effective storage and analytics in Amazon S3. Data loaded into S3 can be partitioned for easier query isolation and lifecycle management. Partitions can be based on vehicle ID, date, geographic region, or other dimensions as needed for your queries.

The following snippet illustrates how we’ve partitioned and stored EV data in Amazon S3.

- s3:
            aws:
              # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
              # Provide the region of the domain.
                region: "<<region>>"
            # Replace with the bucket to send the logs to
            bucket: "evbucket"
            object_key:
              # Optional path_prefix for your s3 objects
              path_prefix: "index_ev_pipe/year=%{yyyy}/month=%{MM}/day=%{dd}/hour=%{HH}"
            threshold:
              event_collect_timeout: 60s
            codec:
              parquet:
                auto_schema: true

The pipeline can be created following the steps in Creating Amazon OpenSearch Ingestion pipelines.

The following is the complete pipeline configuration, combining the configuration of all three steps. Update the Amazon Resource Names (ARNs), AWS Region, Open Search Service domain endpoint, and S3 names as needed.

The entire OpenSearch Ingestion pipeline configuration can be directly copied into the ‘Pipeline configuration’ field in the AWS Management Console while creating the OpenSearch Ingestion pipeline

version: "2"
msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true           # Default is false  
      topics: 
         - name: "<<msk-topic-name>>" 
           group_id: "opensearch-consumer" 
           serde_format: json        
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
        # Provide the region of the domain. 
        region: "<<region>>" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:us-east-1:<<account-id>>:cluster/<<cluster-name>>/<<cluster-id>>" 
  processor:
      - parse_json:
  sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          hosts: [ "https://<<opensearch-service-domain-endpoint>>.us-east-1.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
            sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>" 
          # Provide the region of the domain. 
            region: "<<region>>" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # index name can be auto-generated from topic name 
          index: "index_ev_pipe-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          #distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          dlq: 
            s3: 
            # Provide an S3 bucket 
              bucket: "<<bucket-name>>"
            # Provide a key path prefix for the failed requests
              key_path_prefix: "oss-pipeline-errors/dlq"
            # Provide the region of the bucket.
              region: "<<region>>"
            # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
              sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
      - s3:
            aws:
              # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
                sts_role_arn: "arn:aws:iam::<<account-id>>:role/<<role-name>>"
              # Provide the region of the domain.
                region: "<<region>>"
            # Replace with the bucket to send the logs to
            bucket: "<<bucket-name>>"
            object_key:
              # Optional path_prefix for your s3 objects
              path_prefix: "index_ev_pipe/year=%{yyyy}/month=%{MM}/day=%{dd}/hour=%{HH}"
            threshold:
              event_collect_timeout: 60s
            codec:
              parquet:
                auto_schema: true

Real-time analytics

After the data is available in OpenSearch Service, you can build real-time monitoring and notifications. OpenSearch Service has robust support for multiple notification channels, allowing you to receive alerts through services like Slack, Chime, custom webhooks, Microsoft Teams, email, and Amazon Simple Notification Service (Amazon SNS).

The following screenshot illustrates supported notification channels in OpenSearch Service.

The notification feature in OpenSearch Service allows you to create monitors that will watch for certain conditions or changes in your data and launch alerts, such as monitoring vehicle telemetry data and launching alerts for issues like battery degradation or abnormal energy consumption. For example, you can create a monitor that analyzes battery capacity over time and notifies the on-call team using Slack if capacity drops below expected degradation curves in a significant number of vehicles. This could indicate a potential manufacturing defect requiring investigation.

In addition to notifications, OpenSearch Service makes it easy to build real-time dashboards to visually track metrics across your fleet of vehicles. You can ingest vehicle telemetry data like location, speed, fuel consumption, and so on, and visualize it on maps, charts, and gauges. Dashboards can provide real-time visibility into vehicle health and performance.

The following screenshot illustrates creating a sample dashboard on OpenSearch Service

Opensearch Dashboard

A key benefit of OpenSearch Service is its ability to handle high sustained ingestion and query rates with millisecond latencies. It distributes incoming vehicle data across data nodes in a cluster for parallel processing. This allows OpenSearch to scale out to handle very large fleets while still delivering the real-time performance needed for operational visibility and alerting.

Batch analytics

After the data is available in Amazon S3, you can build a secure data lake to power a variety of analytics use cases deriving powerful insights. As an immutable store, new data is continually stored in S3 while existing data remains unaltered. This serves as a single source of truth for downstream analytics.

For business intelligence and reporting, you can analyze trends, identify insights, and create rich visualizations powered by the data lake. You can use Amazon QuickSight to build and share dashboards without needing to set up servers or infrastructure. Here’s an example of a Quicksight dashboard for IoT device data. For example, you can use a dashboard to gain insights from historical data that can help with better vehicle and battery design.

The Amazon Quicksight public gallery shows examples of dashboards across different domains.

You should consider Amazon OpenSearch dashboards for your operational day-to-day use cases to identify issues and alert in near real time whereas Amazon Quicksight should be used to analyze big data stored in a lake house and generate actionable insights from them.

Clean up

Delete the OpenSearch pipeline and Amazon MSK cluster to stop incurring costs on these services.

Conclusion

In this post, you learned how Amazon MSK, OpenSearch Ingestion, OpenSearch Services, and Amazon S3 can be integrated to ingest, process, store, analyze, and act on endless streams of EV data efficiently.

With OpenSearch Ingestion as the integration layer between streams and storage, the entire pipeline scales up and down automatically based on demand. No more complex cluster management or lost data from bursts in streams.

See Amazon OpenSearch Ingestion to learn more.


About the authors

Ayush Agrawal is a Startups Solutions Architect from Gurugram, India with 11 years of experience in Cloud Computing. With a keen interest in AI, ML, and Cloud Security, Ayush is dedicated to helping startups navigate and solve complex architectural challenges. His passion for technology drives him to constantly explore new tools and innovations. When he’s not architecting solutions, you’ll find Ayush diving into the latest tech trends, always eager to push the boundaries of what’s possible.

Fraser SequeiraFraser Sequeira is a Solutions Architect with AWS based in Mumbai, India. In his role at AWS, Fraser works closely with startups to design and build cloud-native solutions on AWS, with a focus on analytics and streaming workloads. With over 10 years of experience in cloud computing, Fraser has deep expertise in big data, real-time analytics, and building event-driven architecture on AWS.

Detect and handle data skew on AWS Glue

Post Syndicated from Salim Tutuncu original https://aws.amazon.com/blogs/big-data/detect-and-handle-data-skew-on-aws-glue/

AWS Glue is a fully managed, serverless data integration service provided by Amazon Web Services (AWS) that uses Apache Spark as one of its backend processing engines (as of this writing, you can use Python Shell, Spark, or Ray).

Data skew occurs when the data being processed is not evenly distributed across the Spark cluster, causing some tasks to take significantly longer to complete than others. This can lead to inefficient resource utilization, longer processing times, and ultimately, slower performance. Data skew can arise from various factors, including uneven data distribution, skewed join keys, or uneven data processing patterns. Even though the biggest issue is often having nodes running out of disk during shuffling, which leads to nodes falling like dominoes and job failures, it’s also important to mention that data skew is hidden. The stealthy nature of data skew means it can often go undetected because monitoring tools might not flag an uneven distribution as a critical issue, and logs don’t always make it evident. As a result, a developer may observe that their AWS Glue jobs are completing without apparent errors, yet the system could be operating far from its optimal efficiency. This hidden inefficiency not only increases operational costs due to longer runtimes but can also lead to unpredictable performance issues that are difficult to diagnose without a deep dive into the data distribution and task run patterns.

For example, in a dataset of customer transactions, if one customer has significantly more transactions than the others, it can cause a skew in the data distribution.

Identifying and handling data skew issues is key to having good performance on Apache Spark and therefore on AWS Glue jobs that use Spark as a backend. In this post, we show how you can identify data skew and discuss the different techniques to mitigate data skew.

How to detect data skew

When an AWS Glue job has issues with local disks (split disk issues), doesn’t scale with the number of workers, or has low CPU usage (you can enable Amazon CloudWatch metrics for your job to be able to see this), you may have a data skew issue. You can detect data skew with data analysis or by using the Spark UI. In this section, we discuss how to use the Spark UI.

The Spark UI provides a comprehensive view of Spark applications, including the number of tasks, stages, and their duration. To use it you need to enable Spark UI event logs for your job runs. It is enabled by default on Glue console and once enabled, Spark event log files will be created during the job run and stored in your S3 bucket. Then, those logs are parsed, and you can use the AWS Glue serverless Spark UI to visualize them. You can refer to this blogpost for more details. In those jobs where the AWS Glue serverless Spark UI does not work as it has a limit of 512 MB of logs, you can set up the Spark UI using an EC2 instance.

You can use the Spark UI to identify which tasks are taking longer to complete than others, and if the data distribution among partitions is balanced or not (remember that in Spark, one partition is mapped to one task). If there is data skew, you will see that some partitions have significantly more data than others. The following figure shows an example of this. We can see that one task is taking a lot more time than the others, which can indicate data skew.

Another thing that you can use is the summary metrics for each stage. The following screenshot shows another example of data skew.

These metrics represent the task-related metrics below which a certain percentage of tasks completed. For example, the 75th percentile task duration indicates that 75% of tasks completed in less time than this value. When the tasks are evenly distributed, you will see similar numbers in all the percentiles. When there is data skew, you will see very biased values in each percentile. In the preceding example, it didn’t write many shuffle files (less than 50 MiB) in Min, 25th percentile, Median, and 75th percentile. However, in Max, it wrote 460 MiB, 10 times the 75th percentile. It means there was at least one task (or up to 25% of tasks) that wrote much bigger shuffle files than the rest of the tasks. You can also see that the duration of the tax in Max is 46 seconds and the Median is 2 seconds. These are all indicators that your dataset may have data skew.

AWS Glue interactive sessions

You can use interactive sessions to load your data from the AWS Glue Data Catalog or just use Spark methods to load the files such as Parquet or CSV that you want to analyze. You can use a similar script to the following to detect data skew from the partition size perspective; the more important issue is related to data skew while shuffling, and this script does not detect that kind of skew:

from pyspark.sql.functions import spark_partition_id, asc, desc
#input_dataframe being the dataframe where you want to check for data skew
partition_sizes_df=input_dataframe\
    .withColumn("partitionId", spark_partition_id())\
    .groupBy("partitionId")\
    .count()\
    .orderBy(asc("count"))\
    .withColumnRenamed("count","partition_size")
#calculate average and standar deviation for the partition sizes
avg_size = partition_sizes_df.agg({"partition_size": "avg"}).collect()[0][0]
std_dev_size = partition_sizes_df.agg({"partition_size": "stddev"}).collect()[0][0]

""" 
 the code calculates the absolute difference between each value in the "partition_size" column and the calculated average (avg_size).
 then, calculates twice the standard deviation (std_dev_size) and use 
 that as a boolean mask where the condition checks if the absolute difference is greater than twice the standard deviation
 in order to mark a partition 'skewed'
"""
skewed_partitions_df = partition_sizes_df.filter(abs(partition_sizes_df["partition_size"] - avg_size) > 2 * std_dev_size)
if skewed_partitions_df.count() > 0:
    skewed_partitions = [row["partition_id"] for row in skewed_partitions_df.collect()]
    print(f"The following partitions have significantly different sizes: {skewed_partitions}")
else:
    print("No data skew detected.")

You can calculate the average and standard deviation of partition sizes using the agg() function and identify partitions with significantly different sizes using the filter() function, and you can print their indexes if any skewed partitions are detected. Otherwise, the output prints that no data skew is detected.

This code assumes that your data is structured, and you may need to modify it if your data is of a different type.

How to handle data skew

You can use different techniques in AWS Glue to handle data skew; there is no single universal solution. The first thing to do is confirm that you’re using latest AWS Glue version, for example AWS Glue 4.0 based on Spark 3.3 has enabled by default some configs like Adaptative Query Execution (AQE) that can help improve performance when data skew is present.

The following are some of the techniques that you can employ to handle data skew:

  • Filter and perform – If you know which keys are causing the skew, you can filter them out, perform your operations on the non-skewed data, and then handle the skewed keys separately.
  • Implementing incremental aggregation – If you are performing a large aggregation operation, you can break it up into smaller stages because in large datasets, a single aggregation operation (like sum, average, or count) can be resource-intensive. In those cases, you can perform intermediate actions. This could involve filtering, grouping, or additional aggregations. This can help distribute the workload across the nodes and reduce the size of intermediate data.
  • Using a custom partitioner – If your data has a specific structure or distribution, you can create a custom partitioner that partitions your data based on its characteristics. This can help make sure that data with similar characteristics is in the same partition and reduce the size of the largest partition.
  • Using broadcast join – If your dataset is small but exceeds the spark.sql.autoBroadcastJoinThreshold value (default is 10 MB), you have the option to either provide a hint to use broadcast join or adjust the threshold value to accommodate your dataset. This can be an effective strategy to optimize join operations and mitigate data skew issues resulting from shuffling large amounts of data across nodes.
  • Salting – This involves adding a random prefix to the key of skewed data. By doing this, you distribute the data more evenly across the partitions. After processing, you can remove the prefix to get the original key values.

These are just a few techniques to handle data skew in PySpark; the best approach will depend on the characteristics of your data and the operations you are performing.

The following is an example of joining skewed data with the salting technique:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, ceil, rand, concat, col

# Define the number of salt values
num_salts = 3

# Function to identify skewed keys
def identify_skewed_keys(df, key_column, threshold):
    key_counts = df.groupBy(key_column).count()
    return key_counts.filter(key_counts['count'] > threshold).select(key_column)

# Identify skewed keys
skewed_keys = identify_skewed_keys(skewed_data, "key", skew_threshold)

# Splitting the dataset
skewed_data_subset = skewed_data.join(skewed_keys, ["key"], "inner")
non_skewed_data_subset = skewed_data.join(skewed_keys, ["key"], "left_anti")

# Apply salting to skewed data
skewed_data_subset = skewed_data_subset.withColumn("salt", ceil((rand() * 10) % num_salts))
skewed_data_subset = skewed_data_subset.withColumn("salted_key", concat(col("key"), lit("_"), col("salt")))

# Replicate skewed rows in non-skewed dataset
def replicate_skewed_rows(df, keys, multiplier):
    replicated_df = df.join(keys, ["key"]).crossJoin(spark.range(multiplier).withColumnRenamed("id", "salt"))
    replicated_df = replicated_df.withColumn("salted_key", concat(col("key"), lit("_"), col("salt")))
    return replicated_df.drop("salt")

replicated_non_skewed_data = replicate_skewed_rows(non_skewed_data, skewed_keys, num_salts)

# Perform the JOIN operation on the salted keys for skewed data
result_skewed = skewed_data_subset.join(replicated_non_skewed_data, "salted_key")

# Perform regular join on non-skewed data
result_non_skewed = non_skewed_data_subset.join(non_skewed_data, "key")

# Combine results
final_result = result_skewed.union(result_non_skewed)

In this code, we first define a salt value, which can be a random integer or any other value. We then add a salt column to our DataFrame using the withColumn() function, where we set the value of the salt column to a random number using the rand() function with a fixed seed. The function replicate_salt_rows is defined to replicate each row in the non-skewed dataset (non_skewed_data) num_salts times. This ensures that each key in the non-skewed data has matching salted keys. Finally, a join operation is performed on the salted_key column between the skewed and non-skewed datasets. This join is more balanced compared to a direct join on the original key, because salting and replication have mitigated the data skew.

The rand() function used in this example generates a random number between 0–1 for each row, so it’s important to use a fixed seed to achieve consistent results across different runs of the code. You can choose any fixed integer value for the seed.

The following figures illustrate the data distribution before (left) and after (right) salting. Heavily skewed key2 identified and salted into key2_0, key2_1, and key2_2, balancing the data distribution and preventing any single node from being overloaded. After processing, the results can be aggregated back, so that that the final output is consistent with the unsalted key values.

Other techniques to use on skewed data during the join operation

When you’re performing skewed joins, you can use salting or broadcasting techniques, or divide your data into skewed and regular parts before joining the regular data and broadcasting the skewed data.

If you are using Spark 3, there are automatic optimizations for trying to optimize Data Skew issues on joins. Those can be tuned because they have dedicated configs on Apache Spark.

Conclusion

This post provided details on how to detect data skew in your data integration jobs using AWS Glue and different techniques for handling it. Having a good data distribution is key to achieving the best performance on distributed processing systems like Apache Spark.

Although this post focused on AWS Glue, the same concepts apply to jobs you may be running on Amazon EMR using Apache Spark or Amazon Athena for Apache Spark.

As always, AWS welcomes your feedback. Please leave your comments and questions in the comments section.


About the Authors

Salim Tutuncu is a Sr. PSA Specialist on Data & AI, based from Amsterdam with a focus on the EMEA North and EMEA Central regions. With a rich background in the technology sector that spans roles as a Data Engineer, Data Scientist, and Machine Learning Engineer, Salim has built a formidable expertise in navigating the complex landscape of data and artificial intelligence. His current role involves working closely with partners to develop long-term, profitable businesses leveraging the AWS Platform, particularly in Data and AI use cases.

Angel Conde Manjon is a Sr. PSA Specialist on Data & AI, based in Madrid, and focuses on EMEA South and Israel. He has previously worked on research related to Data Analytics and Artificial Intelligence in diverse European research projects. In his current role, Angel helps partners develop businesses centered on Data and AI.

How the GoDaddy data platform achieved over 60% cost reduction and 50% performance boost by adopting Amazon EMR Serverless

Post Syndicated from Brandon Abear original https://aws.amazon.com/blogs/big-data/how-the-godaddy-data-platform-achieved-over-60-cost-reduction-and-50-performance-boost-by-adopting-amazon-emr-serverless/

This is a guest post co-written with Brandon Abear, Dinesh Sharma, John Bush, and Ozcan IIikhan from GoDaddy.

GoDaddy empowers everyday entrepreneurs by providing all the help and tools to succeed online. With more than 20 million customers worldwide, GoDaddy is the place people come to name their ideas, build a professional website, attract customers, and manage their work.

At GoDaddy, we take pride in being a data-driven company. Our relentless pursuit of valuable insights from data fuels our business decisions and ensures customer satisfaction. Our commitment to efficiency is unwavering, and we’ve undertaken an exciting initiative to optimize our batch processing jobs. In this journey, we have identified a structured approach that we refer to as the seven layers of improvement opportunities. This methodology has become our guide in the pursuit of efficiency.

In this post, we discuss how we enhanced operational efficiency with Amazon EMR Serverless. We share our benchmarking results and methodology, and insights into the cost-effectiveness of EMR Serverless vs. fixed capacity Amazon EMR on EC2 transient clusters on our data workflows orchestrated using Amazon Managed Workflows for Apache Airflow (Amazon MWAA). We share our strategy for the adoption of EMR Serverless in areas where it excels. Our findings reveal significant benefits, including over 60% cost reduction, 50% faster Spark workloads, a remarkable five-times improvement in development and testing speed, and a significant reduction in our carbon footprint.

Background

In late 2020, GoDaddy’s data platform initiated its AWS Cloud journey, migrating an 800-node Hadoop cluster with 2.5 PB of data from its data center to EMR on EC2. This lift-and-shift approach facilitated a direct comparison between on-premises and cloud environments, ensuring a smooth transition to AWS pipelines, minimizing data validation issues and migration delays.

By early 2022, we successfully migrated our big data workloads to EMR on EC2. Using best practices learned from the AWS FinHack program, we fine-tuned resource-intensive jobs, converted Pig and Hive jobs to Spark, and reduced our batch workload spend by 22.75% in 2022. However, scalability challenges emerged due to the multitude of jobs. This prompted GoDaddy to embark on a systematic optimization journey, establishing a foundation for more sustainable and efficient big data processing.

Seven layers of improvement opportunities

In our quest for operational efficiency, we have identified seven distinct layers of opportunities for optimization within our batch processing jobs, as shown in the following figure. These layers range from precise code-level enhancements to more comprehensive platform improvements. This multi-layered approach has become our strategic blueprint in the ongoing pursuit of better performance and higher efficiency.

Seven layers of improvement opportunities

The layers are as follows:

  • Code optimization – Focuses on refining the code logic and how it can be optimized for better performance. This involves performance enhancements through selective caching, partition and projection pruning, join optimizations, and other job-specific tuning. Using AI coding solutions is also an integral part of this process.
  • Software updates – Updating to the latest versions of open source software (OSS) to capitalize on new features and improvements. For example, Adaptive Query Execution in Spark 3 brings significant performance and cost improvements.
  • Custom Spark configurations Tuning of custom Spark configurations to maximize resource utilization, memory, and parallelism. We can achieve significant improvements by right-sizing tasks, such as through spark.sql.shuffle.partitions, spark.sql.files.maxPartitionBytes, spark.executor.cores, and spark.executor.memory. However, these custom configurations might be counterproductive if they are not compatible with the specific Spark version.
  • Resource provisioning time The time it takes to launch resources like ephemeral EMR clusters on Amazon Elastic Compute Cloud (Amazon EC2). Although some factors influencing this time are outside of an engineer’s control, identifying and addressing the factors that can be optimized can help reduce overall provisioning time.
  • Fine-grained scaling at task level Dynamically adjusting resources such as CPU, memory, disk, and network bandwidth based on each stage’s needs within a task. The aim here is to avoid fixed cluster sizes that could result in resource waste.
  • Fine-grained scaling across multiple tasks in a workflow Given that each task has unique resource requirements, maintaining a fixed resource size may result in under- or over-provisioning for certain tasks within the same workflow. Traditionally, the size of the largest task determines the cluster size for a multi-task workflow. However, dynamically adjusting resources across multiple tasks and steps within a workflow result in a more cost-effective implementation.
  • Platform-level enhancements – Enhancements at preceding layers can only optimize a given job or a workflow. Platform improvement aims to attain efficiency at the company level. We can achieve this through various means, such as updating or upgrading the core infrastructure, introducing new frameworks, allocating appropriate resources for each job profile, balancing service usage, optimizing the use of Savings Plans and Spot Instances, or implementing other comprehensive changes to boost efficiency across all tasks and workflows.

Layers 1–3: Previous cost reductions

After we migrated from on premises to AWS Cloud, we primarily focused our cost-optimization efforts on the first three layers shown in the diagram. By transitioning our most costly legacy Pig and Hive pipelines to Spark and optimizing Spark configurations for Amazon EMR, we achieved significant cost savings.

For example, a legacy Pig job took 10 hours to complete and ranked among the top 10 most expensive EMR jobs. Upon reviewing TEZ logs and cluster metrics, we discovered that the cluster was vastly over-provisioned for the data volume being processed and remained under-utilized for most of the runtime. Transitioning from Pig to Spark was more efficient. Although no automated tools were available for the conversion, manual optimizations were made, including:

  • Reduced unnecessary disk writes, saving serialization and deserialization time (Layer 1)
  • Replaced Airflow task parallelization with Spark, simplifying the Airflow DAG (Layer 1)
  • Eliminated redundant Spark transformations (Layer 1)
  • Upgraded from Spark 2 to 3, using Adaptive Query Execution (Layer 2)
  • Addressed skewed joins and optimized smaller dimension tables (Layer 3)

As a result, job cost decreased by 95%, and job completion time was reduced to 1 hour. However, this approach was labor-intensive and not scalable for numerous jobs.

Layers 4–6: Find and adopt the right compute solution

In late 2022, following our significant accomplishments in optimization at the previous levels, our attention moved towards enhancing the remaining layers.

Understanding the state of our batch processing

We use Amazon MWAA to orchestrate our data workflows in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks referred to as workflows. In this post, the terms workflow and job are used interchangeably, referring to the Directed Acyclic Graphs (DAGs) consisting of tasks orchestrated by Amazon MWAA. For each workflow, we have sequential or parallel tasks, and even a combination of both in the DAG between create_emr and terminate_emr tasks running on a transient EMR cluster with fixed compute capacity throughout the workflow run. Even after optimizing a portion of our workload, we still had numerous non-optimized workflows that were under-utilized due to over-provisioning of compute resources based on the most resource-intensive task in the workflow, as shown in the following figure.

This highlighted the impracticality of static resource allocation and led us to recognize the necessity of a dynamic resource allocation (DRA) system. Before proposing a solution, we gathered extensive data to thoroughly understand our batch processing. Analyzing the cluster step time, excluding provisioning and idle time, revealed significant insights: a right-skewed distribution with over half of the workflows completing in 20 minutes or less and only 10% taking more than 60 minutes. This distribution guided our choice of a fast-provisioning compute solution, dramatically reducing workflow runtimes. The following diagram illustrates step times (excluding provisioning and idle time) of EMR on EC2 transient clusters in one of our batch processing accounts.

Furthermore, based on the step time (excluding provisioning and idle time) distribution of the workflows, we categorized our workflows into three groups:

  • Quick run – Lasting 20 minutes or less
  • Medium run – Lasting between 20–60 minutes
  • Long run – Exceeding 60 minutes, often spanning several hours or more

Another factor we needed to consider was the extensive use of transient clusters for reasons such as security, job and cost isolation, and purpose-built clusters. Additionally, there was a significant variation in resource needs between peak hours and periods of low utilization.

Instead of fixed-size clusters, we could potentially use managed scaling on EMR on EC2 to achieve some cost benefits. However, migrating to EMR Serverless appears to be a more strategic direction for our data platform. In addition to potential cost benefits, EMR Serverless offers additional advantages such as a one-click upgrade to the newest Amazon EMR versions, a simplified operational and debugging experience, and automatic upgrades to the latest generations upon rollout. These features collectively simplify the process of operating a platform on a larger scale.

Evaluating EMR Serverless: A case study at GoDaddy

EMR Serverless is a serverless option in Amazon EMR that eliminates the complexities of configuring, managing, and scaling clusters when running big data frameworks like Apache Spark and Apache Hive. With EMR Serverless, businesses can enjoy numerous benefits, including cost-effectiveness, faster provisioning, simplified developer experience, and improved resilience to Availability Zone failures.

Recognizing the potential of EMR Serverless, we conducted an in-depth benchmark study using real production workflows. The study aimed to assess EMR Serverless performance and efficiency while also creating an adoption plan for large-scale implementation. The findings were highly encouraging, showing EMR Serverless can effectively handle our workloads.

Benchmarking methodology

We split our data workflows into three categories based on total step time (excluding provisioning and idle time): quick run (0–20 minutes), medium run (20–60 minutes), and long run (over 60 minutes). We analyzed the impact of the EMR deployment type (Amazon EC2 vs. EMR Serverless) on two key metrics: cost-efficiency and total runtime speedup, which served as our overall evaluation criteria. Although we did not formally measure ease of use and resiliency, these factors were considered throughout the evaluation process.

The high-level steps to assess the environment are as follows:

  1. Prepare the data and environment:
    1. Choose three to five random production jobs from each job category.
    2. Implement required adjustments to prevent interference with production.
  2. Run tests:
    1. Run scripts over several days or through multiple iterations to gather precise and consistent data points.
    2. Perform tests using EMR on EC2 and EMR Serverless.
  3. Validate data and test runs:
    1. Validate input and output datasets, partitions, and row counts to ensure identical data processing.
  4. Gather metrics and analyze results:
    1. Gather relevant metrics from the tests.
    2. Analyze results to draw insights and conclusions.

Benchmark results

Our benchmark results showed significant enhancements across all three job categories for both runtime speedup and cost-efficiency. The improvements were most pronounced for quick jobs, directly resulting from faster startup times. For instance, a 20-minute (including cluster provisioning and shut down) data workflow running on an EMR on EC2 transient cluster of fixed compute capacity finishes in 10 minutes on EMR Serverless, providing a shorter runtime with cost benefits. Overall, the shift to EMR Serverless delivered substantial performance improvements and cost reductions at scale across job brackets, as seen in the following figure.

Historically, we devoted more time to tuning our long-run workflows. Interestingly, we discovered that the existing custom Spark configurations for these jobs did not always translate well to EMR Serverless. In cases where the results were insignificant, a common approach was to discard previous Spark configurations related to executor cores. By allowing EMR Serverless to autonomously manage these Spark configurations, we often observed improved outcomes. The following graph shows the average runtime and cost improvement per job when comparing EMR Serverless to EMR on EC2.

Per Job Improvement

The following table shows a sample comparison of results for the same workflow running on different deployment options of Amazon EMR (EMR on EC2 and EMR Serverless).

Metric EMR on EC2
(Average)
EMR Serverless
(Average)
EMR on EC2 vs
EMR Serverless
Total Run Cost ($) $ 5.82 $ 2.60 55%
Total Run Time (Minutes) 53.40 39.40 26%
Provisioning Time (Minutes) 10.20 0.05 .
Provisioning Cost ($) $ 1.19 . .
Steps Time (Minutes) 38.20 39.16 -3%
Steps Cost ($) $ 4.30 . .
Idle Time (Minutes) 4.80 . .
EMR Release Label emr-6.9.0 .
Hadoop Distribution Amazon 3.3.3 .
Spark Version Spark 3.3.0 .
Hive/HCatalog Version Hive 3.1.3, HCatalog 3.1.3 .
Job Type Spark .

AWS Graviton2 on EMR Serverless performance evaluation

After seeing compelling results with EMR Serverless for our workloads, we decided to further analyze the performance of the AWS Graviton2 (arm64) architecture within EMR Serverless. AWS had benchmarked Spark workloads on Graviton2 EMR Serverless using the TPC-DS 3TB scale, showing a 27% overall price-performance improvement.

To better understand the integration benefits, we ran our own study using GoDaddy’s production workloads on a daily schedule and observed an impressive 23.8% price-performance enhancement across a range of jobs when using Graviton2. For more details about this study, see GoDaddy benchmarking results in up to 24% better price-performance for their Spark workloads with AWS Graviton2 on Amazon EMR Serverless.

Adoption strategy for EMR Serverless

We strategically implemented a phased rollout of EMR Serverless via deployment rings, enabling systematic integration. This gradual approach let us validate improvements and halt further adoption of EMR Serverless, if needed. It served both as a safety net to catch issues early and a means to refine our infrastructure. The process mitigated change impact through smooth operations while building team expertise of our Data Engineering and DevOps teams. Additionally, it fostered tight feedback loops, allowing prompt adjustments and ensuring efficient EMR Serverless integration.

We divided our workflows into three main adoption groups, as shown in the following image:

  • Canaries This group aids in detecting and resolving any potential problems early in the deployment stage.
  • Early adopters This is the second batch of workflows that adopt the new compute solution after initial issues have been identified and rectified by the canaries group.
  • Broad deployment rings The largest group of rings, this group represents the wide-scale deployment of the solution. These are deployed after successful testing and implementation in the previous two groups.

Rings

We further broke down these workflows into granular deployment rings to adopt EMR Serverless, as shown in the following table.

Ring # Name Details
Ring 0 Canary Low adoption risk jobs that are expected to yield some cost saving benefits.
Ring 1 Early Adopters Low risk Quick-run Spark jobs that expect to yield high gains.
Ring 2 Quick-run Rest of the Quick-run (step_time <= 20 min) Spark jobs
Ring 3 LargerJobs_EZ High potential gain, easy move, medium-run and long-run Spark jobs
Ring 4 LargerJobs Rest of the medium-run and long-run Spark jobs with potential gains
Ring 5 Hive Hive jobs with potentially higher cost savings
Ring 6 Redshift_EZ Easy migration Redshift jobs that suit EMR Serverless
Ring 7 Glue_EZ Easy migration Glue jobs that suit EMR Serverless

Production adoption results summary

The encouraging benchmarking and canary adoption results generated considerable interest in wider EMR Serverless adoption at GoDaddy. To date, the EMR Serverless rollout remains underway. Thus far, it has reduced costs by 62.5% and accelerated total batch workflow completion by 50.4%.

Based on preliminary benchmarks, our team expected substantial gains for quick jobs. To our surprise, actual production deployments surpassed projections, averaging 64.4% faster vs. 42% projected, and 71.8% cheaper vs. 40% predicted.

Remarkably, long-running jobs also saw significant performance improvements due to the rapid provisioning of EMR Serverless and aggressive scaling enabled by dynamic resource allocation. We observed substantial parallelization during high-resource segments, resulting in a 40.5% faster total runtime compared to traditional approaches. The following chart illustrates the average enhancements per job category.

Prod Jobs Savings

Additionally, we observed the highest degree of dispersion for speed improvements within the long-run job category, as shown in the following box-and-whisker plot.

Whisker Plot

Sample workflows adopted EMR Serverless

For a large workflow migrated to EMR Serverless, comparing 3-week averages pre- and post-migration revealed impressive cost savings—a 75.30% decrease based on retail pricing with 10% improvement in total runtime, boosting operational efficiency. The following graph illustrates the cost trend.

Although quick-run jobs realized minimal per-dollar cost reductions, they delivered the most significant percentage cost savings. With thousands of these workflows running daily, the accumulated savings are substantial. The following graph shows the cost trend for a small workload migrated from EMR on EC2 to EMR Serverless. Comparing 3-week pre- and post-migration averages revealed a remarkable 92.43% cost savings on the retail on-demand pricing, alongside an 80.6% acceleration in total runtime.

Sample workflows adopted EMR Serverless 2

Layer 7: Platform-wide improvements

We aim to revolutionize compute operations at GoDaddy, providing simplified yet powerful solutions for all users with our Intelligent Compute Platform. With AWS compute solutions like EMR Serverless and EMR on EC2, it provided optimized runs of data processing and machine learning (ML) workloads. An ML-powered job broker intelligently determines when and how to run jobs based on various parameters, while still allowing power users to customize. Additionally, an ML-powered compute resource manager pre-provisions resources based on load and historical data, providing efficient, fast provisioning at optimum cost. Intelligent compute empowers users with out-of-the-box optimization, catering to diverse personas without compromising power users.

The following diagram shows a high-level illustration of the intelligent compute architecture.

Insights and recommended best-practices

The following section discusses the insights we’ve gathered and the recommended best practices we’ve developed during our preliminary and wider adoption stages.

Infrastructure preparation

Although EMR Serverless is a deployment method within EMR, it requires some infrastructure preparedness to optimize its potential. Consider the following requirements and practical guidance on implementation:

  • Use large subnets across multiple Availability Zones – When running EMR Serverless workloads within your VPC, make sure the subnets span across multiple Availability Zones and are not constrained by IP addresses. Refer to Configuring VPC access and Best practices for subnet planning for details.
  • Modify maximum concurrent vCPU quota For extensive compute requirements, it is recommended to increase your max concurrent vCPUs per account service quota.
  • Amazon MWAA version compatibility When adopting EMR Serverless, GoDaddy’s decentralized Amazon MWAA ecosystem for data pipeline orchestration created compatibility issues from disparate AWS Providers versions. Directly upgrading Amazon MWAA was more efficient than updating numerous DAGs. We facilitated adoption by upgrading Amazon MWAA instances ourselves, documenting issues, and sharing findings and effort estimates for accurate upgrade planning.
  • GoDaddy EMR operator To streamline migrating numerous Airflow DAGs from EMR on EC2 to EMR Serverless, we developed custom operators adapting existing interfaces. This allowed seamless transitions while retaining familiar tuning options. Data engineers could easily migrate pipelines with simple find-replace imports and immediately use EMR Serverless.

Unexpected behavior mitigation

The following are unexpected behaviors we ran into and what we did to mitigate them:

  • Spark DRA aggressive scaling For some jobs (8.33% of initial benchmarks, 13.6% of production), cost increased after migrating to EMR Serverless. This was due to Spark DRA excessively assigning new workers briefly, prioritizing performance over cost. To counteract this, we set maximum executor thresholds by adjusting spark.dynamicAllocation.maxExecutor, effectively limiting EMR Serverless scaling aggression. When migrating from EMR on EC2, we suggest observing the max core count in the Spark History UI to replicate similar compute limits in EMR Serverless, such as --conf spark.executor.cores and --conf spark.dynamicAllocation.maxExecutors.
  • Managing disk space for large-scale jobs When transitioning jobs that process large data volumes with substantial shuffles and significant disk requirements to EMR Serverless, we recommend configuring spark.emr-serverless.executor.disk by referring to existing Spark job metrics. Furthermore, configurations like spark.executor.cores combined with spark.emr-serverless.executor.disk and spark.dynamicAllocation.maxExecutors allow control over the underlying worker size and total attached storage when advantageous. For example, a shuffle-heavy job with relatively low disk usage may benefit from using a larger worker to increase the likelihood of local shuffle fetches.

Conclusion

As discussed in this post, our experiences with adopting EMR Serverless on arm64 have been overwhelmingly positive. The impressive results we’ve achieved, including a 60% reduction in cost, 50% faster runs of batch Spark workloads, and an astounding five-times improvement in development and testing speed, speak volumes about the potential of this technology. Furthermore, our current results suggest that by widely adopting Graviton2 on EMR Serverless, we could potentially reduce the carbon footprint by up to 60% for our batch processing.

However, it’s crucial to understand that these results are not a one-size-fits-all scenario. The enhancements you can expect are subject to factors including, but not limited to, the specific nature of your workflows, cluster configurations, resource utilization levels, and fluctuations in computational capacity. Therefore, we strongly advocate for a data-driven, ring-based deployment strategy when considering the integration of EMR Serverless, which can help optimize its benefits to the fullest.

Special thanks to Mukul Sharma and Boris Berlin for their contributions to benchmarking. Many thanks to Travis Muhlestein (CDO), Abhijit Kundu (VP Eng), Vincent Yung (Sr. Director Eng.), and Wai Kin Lau (Sr. Director Data Eng.) for their continued support.


About the Authors

Brandon Abear is a Principal Data Engineer in the Data & Analytics (DnA) organization at GoDaddy. He enjoys all things big data. In his spare time, he enjoys traveling, watching movies, and playing rhythm games.

Dinesh Sharma is a Principal Data Engineer in the Data & Analytics (DnA) organization at GoDaddy. He is passionate about user experience and developer productivity, always looking for ways to optimize engineering processes and saving cost. In his spare time, he loves reading and is an avid manga fan.

John Bush is a Principal Software Engineer in the Data & Analytics (DnA) organization at GoDaddy. He is passionate about making it easier for organizations to manage data and use it to drive their businesses forward. In his spare time, he loves hiking, camping, and riding his ebike.

Ozcan Ilikhan is the Director of Engineering for the Data and ML Platform at GoDaddy. He has over two decades of multidisciplinary leadership experience, spanning startups to global enterprises. He has a passion for leveraging data and AI in creating solutions that delight customers, empower them to achieve more, and boost operational efficiency. Outside of his professional life, he enjoys reading, hiking, gardening, volunteering, and embarking on DIY projects.

Harsh Vardhan is an AWS Solutions Architect, specializing in big data and analytics. He has over 8 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Sliding window rate limits in distributed systems

Post Syndicated from Grab Tech original https://engineering.grab.com/frequency-capping

Like many other companies, Grab uses marketing communications to notify users of promotions or other news. If a user receives these notifications from multiple companies, it would be a form of information overload and they might even start considering these communications as spam. Over time, this could lead to some users revoking their consent to receive marketing communications altogether. Hence, it is important to find a rate-limited solution that sends the right amount of communications to our users.

Background

In Grab, marketing emails and push notifications are part of carefully designed campaigns to ensure that users get the right notifications (i.e. based on past orders or usage patterns). Trident is Grab’s in-house tool to compose these campaigns so that they run efficiently at scale. An example of a campaign is scheduling a marketing email blast to 10 million users at 4 pm. Read more about Trident’s architecture here.

Trident relies on Hedwig, another in-house service, to deliver the messages to users. Hedwig does the heavy lifting of delivering large amounts of emails and push notifications to users while maintaining a high query per second (QPS) rate and minimal delay. The following high-level architectural illustration demonstrates the interaction between Trident and Hedwig.

Diagram of data interaction between Trident and Hedwig

The aim is to regulate the number of marketing comms sent to users daily and weekly, tailored based on their interaction patterns with the Grab superapp.

Solution

Based on their interaction patterns with our superapp, we have clustered users into a few segments.

For example:

New: Users recently signed up to the Grab app but haven’t taken any rides yet.
Active: Users who took rides in the past month.

With these metrics, we came up with optimal daily and weekly frequency limit values for each clustered user segment. The solution discussed in this article ensures that the comms sent to a user do not exceed the daily and weekly thresholds for the segment. This is also called frequency capping.

However, frequency capping can be split into two sub-problems:

Efficient storage of clustered user data

With a huge customer base of over 270 million users, storing the user segment membership information has to be cost-efficient and memory-sleek. Querying the segment to which a user belongs should also have minimal latency.

Persistent tracking of comms sent per user

To stay within the daily and weekly thresholds, we need to actively track the number of comms sent to each user, which can be referred to make rate limiting decisions. The rate limiting logic should also have minimal latency, be cost efficient, and not take up too much memory storage.

Optimising storage of user segment data

The problem here is figuring out which segment a particular user belongs to and ensuring that the user doesn’t appear in more than one segment. There are two options that suit our needs and we’ll explain more about each option, as well as what was the best option for us.

Bloom filter 

A Bloom filter is a space-efficient probabilistic data structure that addresses this problem well. Simply put, Bloom filters internally use arrays to track memberships of the elements.

For our scenario, each user segment would need its own bloom filter. We used this bloom filter calculator to estimate the memory required for each bloom filter. We found that we needed approximately 1 GB of memory and 23 hash functions to accurately represent the membership information of 270 million users in an array. Additionally, this method guarantees a false positive rate of  1.0E-7, which means 1 in 1 million elements may get wrong membership results because of hash collision.

With Grab’s existing segments, this approach needs 4GB of memory, which may increase as we increase the number of segments in the future. Moreover, the potential hash collision needs to be handled by increasing the memory size with even more hash functions. Another thing to note is that Bloom filters do not support deletion so every time a change needs to be done, you need to create a new version of the Bloom filter. Although Bloom filters have many advantages, these shortcomings led us to explore another approach.

Roaring bitmaps Roaring bitmaps are sets of unsigned integers consisting of containers of disjoint subsets, which can store large amounts of data in a compressed form. Essentially, roaring bitmaps could reduce memory storage significantly and overcome the hash collision problem. To understand the intuition behind this, first, we need to know how bitmaps work and the possible drawbacks behind it.

To represent a list of numbers as a bitmap, we first need to create an array with a size equivalent to the largest element in the list. For every element in the list, we then mark the bit value as 1 in the corresponding index in the array. While bitmaps work very well for storing integers in closer intervals, they occupy more space and become sparse when storing integer ranges with uneven distribution, as shown in the image below.

Diagram of bitmaps with uneven distribution

To reduce memory footprint and improve the performance of bitmaps, there are compression techniques such as Run-Length Encoding (RLE), and Word Aligned Hybrid (WAH). However, this would require additional effort to implement, whereas using roaring bitmaps would solve these issues.

Roaring bitmaps’ hybrid data storage approach offers the following advantages:

  • Faster set operations (union, intersection, differencing).
  • Better compression ratio when handling mixed datasets (both dense and sparse data distribution).
  • Ability to scale to large datasets without significant performance loss.

To summarise, roaring bitmaps can store positive integers from 0 to (2^32)-1. Each positive integer value is converted to a 32-bit binary, where the 16 Most Significant Bits (MSB) are used as the key and the remaining 16 Least Significant Bits (LSB) are represented as the value. The values are then stored in an array, a bitmap, or used to run containers with RLE encoding data structures.

If the number of integers mapped to the key is less than 4096, then all the integers are stored in an array in sorted order and converted into a bitmap container in the runtime as the size exceeds. Roaring bitmap analyses the distribution of set bits in the bitmap container i.e. if the continuous interval of set bits is more than a given threshold, the bitmap container can be more efficiently represented using the RLE container. Internally, the RLE container uses an array where the even indices store the beginning of the runs and the odd indices represent the length of the runs. This enables the roaring bitmap to dynamically switch between the containers to optimise storage and performance.

The following diagram shows how a set of elements with different distributions are stored in roaring bitmaps.

Diagram of how roaring bitmaps store elements with different distributions

In Grab, we developed a microservice that abstracts roaring bitmaps implementations and provides an API to check set membership and enumeration of elements in the sets. Check out this blog to learn more about it.

Distributed rate limiting

The second part of the problem involves rate limiting the number of communication messages sent to users on a daily or weekly basis and each segment has specific daily and weekly limits. By utilising roaring bitmaps, we can determine the segment to which a user belongs. After identifying the appropriate segment, we will apply the personalised limits to the user using a distributed rate limiter, which will be discussed in further detail in the following sections.

Choosing the right datastore

Based on our use case, Amazon ElasticCache for Redis and DynamoDB were two viable options for storing the sent communication messages count per user. However, we decided to choose Redis due to a number of factors:

  • Higher throughput at lower latency – Redis shards data across nodes in the cluster.
  • Cost-effective – Usage of Lua script reduces unnecessary data transfer overheads.
  • Better at handling spiky rate limiting workloads at scale.

Distributed rate limiter

To appropriately limit the comms our users receive, we needed a rate limiting algorithm, which could execute directly in the datastore cluster, then return the results in the application logic for further processing. The two rate limiting algorithms we considered were the sliding window rate limiter and sliding log rate limiter.

The sliding window rate limiter algorithm divides time into a fixed-size window (we defined this as 1 minute) and counts the number of requests within each window. On the other hand, the sliding log maintains a log of each request timestamp and counts the number of requests between two timestamp ranges, providing a more fine-grained method of rate limiting. Although sliding log consumes more memory to store the log of request timestamp, we opted for the sliding log approach as the accuracy of the rate limiting was more important than memory consumption.

The sliding log rate limiter utilises a Redis sorted set data structure to efficiently track and organise request logs. Each timestamp in milliseconds is stored as a unique member in the set. The score assigned to each member represents the corresponding timestamp, allowing for easy sorting in ascending order. This design choice optimises the speed of search operations when querying for the total request count within specific time ranges.

Sliding Log Rate limiter Algorithm:

Input:
  # user specific redis key where the request timestamp logs are stored as sorted set
  keys => user_redis_key

  # limit_value is the limit that needs to be applied for the user
  # start_time_in_millis is the starting point of the time window
  # end_time_in_millis is the ending point of the time window
  # current_time_in_millis is the current time the request is sent
  # eviction_time_in_millis, members in the set whose value is less than this will be evicted from the set

  args => limit_value, start_time_in_millis, end_time_in_millis, current_time_in_millis, eviction_time_in_millis

Output:
  # 0 means not_allowed and 1 means allowed
  response => 0 / 1

Logic:
  # zcount fetches the count of the request timestamp logs falling between the start and the end timestamp
  request_count = zcount user_redis_key start_time_in_millis end_time_in_millis

  response = 0
  # if the count of request logs is less than allowed limits then record the usage by adding current timestamp in sorted set

  if request_count < limit_value then
    zadd user_redis_key current_time_in_millis current_time_in_millis
    response = 1

  # zremrangebyscore removes the members in the sorted set whose score is less than eviction_time_in_millis

  zremrangebyscore user_redis_key -inf eviction_time_in_millis
  return response

This algorithm takes O(log n) time complexity, where n is the number of request logs stored in the sorted set. It is not possible to evict entries in the sorted set like how we have time-to-live (TTL) for Redis keys. To prevent the size of the sorted set from increasing over time, we have a fixed variable eviction_time_in_millis that is passed to the script. The zremrangebyscore command then deletes members from the sorted set whose score is less than eviction_time_in_millis in O(log n) time complexity.

Lua script optimisations

In Redis Cluster mode, all Redis keys accessed by a Lua script must be present on the same node, and they should be passed as part of the KEYS input array of the script. If the script attempts to access keys located on different nodes within the cluster, a CROSSSLOT error will be thrown. Redis keys, or userIDs, are distributed across multiple nodes in the cluster so it is not feasible to send a batch of userIDs within the same Lua script for rate limiting, as this might result in a CROSSSLOT error.

Invoking a separate Lua script call for each user is a possible approach, but it incurs a significant number of network calls, which can be optimised further with the following approach:

  1. Upload the Lua script into the Redis server during the server startup with the SCRIPT LOAD command and we get the SHA1 hash of the script if the upload is successful.
  2. The SHA1 hash can then be used to invoke the Lua script with the EVALSHA command passing the keys and arguments as script input.
  3. Redis pipelining takes in multiple EVALSHA commands that call the Lua script and each invocation corresponds to a userID for getting the rate limiting result.
  4. Redis pipelining groups the EVALSHA Redis commands with Redis keys located on the same nodes internally. It then sends the grouped commands in a single network call to the relevant nodes within the Redis cluster and provides the rate limiting outcome to the client.

Since Redis operates on a single thread, any long-running Lua script can cause other Redis commands to be blocked until the script completes execution. Thus, it’s optimal for the Lua script to execute in under 5 milliseconds. Additionally, the current time is passed as an argument to the script to account for potential variations in time when the script is executed on a node’s replica, which could be caused by clock drift.

By bringing together roaring bitmaps and the distributed rate limiter, this is what our final solution looks like:

Our final solution using roaring bitmaps and distributed rate limiter

The roaring bitmaps structure is serialised and stored in an AWS S3 bucket, which is then downloaded in the instance during server startup. After which, triggering a user segment membership check can simply be done with a local method call. The configuration service manages the mapping information between the segment and allowed rate limiting values.

Whenever a marketing message needs to be sent to a user, we first find the segment to which the user belongs, retrieve the defined rate limiting values from the configuration service, then execute the Lua script to get the rate limiting decision. If there is enough quota available for the user, we send the comms.

The architecture of the messaging service looks something like this:

Architecture of the messaging service

Impact

In addition to decreasing the unsubscription rate, there was a significant enhancement in the latency of sending communications. Eliminating redundant communications also alleviated the system load, resulting in a reduction of the delay between the scheduled time and the actual send time of comms.

Conclusion

Applying rate limiters to safeguard our services is not only a standard practice but also a necessary process. Many times, this can be achieved by configuring the rate limiters at the instance level. The need for rate limiters for business logic may not be as common, but when you need it, the solution must be lightning-fast, and capable of seamlessly operating within a distributed environment.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Road localisation in GrabMaps

Post Syndicated from Grab Tech original https://engineering.grab.com/road-localisation-grabmaps

Introduction

In 2022, Grab achieved self-sufficiency in its Geo services. As part of this transition, one crucial step was moving towards using an internally-developed map tailored specifically to the market in which Grab operates. Now that we have full control over the map layer, we can add more data to it or improve it according to the needs of the services running on top. One key aspect that this transition unlocked for us was the possibility of creating hyperlocal data at map level.

For instance, by determining the country to which a road belongs, we can now automatically infer the official language of that country and display the street name in that language. In another example, knowing the country for a specific road, we can automatically infer the driving side (left-handed or right-handed) leading to an improved navigation experience. Furthermore, this capability also enables us to efficiently handle various scenarios. For example, if we know that a road is part of a gated community, an area where our driver partners face restricted access, we can prevent the transit through that area.

These are just some examples of the possibilities from having full control over the map layer. By having an internal map, we can align our maps with specific markets and provide better experiences for our driver-partners and customers.

Background

For all these to be possible, we first needed to localise the roads inside the map. Our goal was to include hyperlocal data into the map, which refers to data that is specific to a certain area, such as a country, city, or even a smaller part of the city like a gated community. At the same time, we aimed to deliver our map with a high cadence, thus, we needed to find the right way to process this large amount of data while continuing to create maps in a cost-effective manner.

Solution

In the following sections of this article, we will use an extract from the Southeast Asia map to provide visual representations of the concepts discussed.

In Figure 1, Image 1 shows a visualisation of the road network, the roads belonging to this area. The coloured lines in Image 2 represent the borders identifying the countries in the same area. Overlapping the information from Image 1 and Image 2, we can extrapolate and say that the entire surface included in a certain border could have the same set of common properties as shown in Image 3. In Image 4, we then proceed with adding localised roads for each area.

Figure 1 – Map of Southeast Asia

For this to be possible, we have to find a way to localise each road and identify its associated country. Once this localisation process is complete, we can replicate all this information specific to a given border onto each individual road. This information includes details such as the country name, driving side, and official language. We can go even further and infer more information, and add hyperlocal data. For example, in Vietnam, we can automatically prevent motorcycle access on the motorways.

Assigning each road on the map to a specific area, such as a country, service area, or subdivision, presents a complex task. So, how can we efficiently accomplish this?

Implementation

The most straightforward approach would be to test the inclusion of each road into each area boundary, but that is easier said than done. With close to 30 million road segments in the Southeast Asia map and over 10 thousand areas, the computational cost of determining inclusion or intersection between a polyline and a polygon is expensive.

Our solution to this challenge involves replacing the expensive yet precise operation with a decent approximation. We introduce a proxy entity, the geohash, and we use it to approximate the areas and also to localise the roads.

We replace the geometrical inclusion with a series of simpler and less expensive operations. First, we conduct an inexpensive precomputation where we identify all the geohases that belong to a certain area or within a defined border. We then identify the geohashes to which the roads belong to. Finally, we use these precomputed values to assign roads to their respective areas. This process is also computationally inexpensive.

Given the large area we process, we leverage big data techniques to distribute the execution across multiple nodes and thus speed up the operation. We want to deliver the map daily and this is one of the many operations that are part of the map-making process.

What is a geohash?

To further understand our implementation we will first explain the geohash concept. A geohash is a unique identifier of a specific region on the Earth. The basic idea is that the Earth is divided into regions of user-defined size and each region is assigned a unique id, which is known as its geohash. For a given location on earth, the geohash algorithm converts its latitude and longitude into a string.

Geohashes uses a Base-32 alphabet encoding system comprising characters ranging from 0 to 9 and A to Z, excluding “A”, “I”, “L” and “O”. Imagine dividing the world into a grid with 32 cells. The first character in a geohash identifies the initial location of one of these 32 cells. Each of these cells are then further subdivided into 32 smaller cells.This subdivision process continues and refines to specific areas in the world. Adding characters to the geohash sub-divides a cell, effectively zooming in to a more detailed area.

The precision factor of the geohash determines the size of the cell. For instance, a precision factor of one creates a cell 5,000 km high and 5,000 km wide. A precision factor of six creates a cell 0.61km high and 1.22 km wide. Furthermore, a precision factor of nine creates a cell 4.77 m high and 4.77 m wide. It is important to note that cells are not always square and can have varying dimensions.

In Figure 2, we have exemplified a geohash 6 grid and its code is wsdt33.

Figure 2 – An example of geohash code wsdt33

Using less expensive operations

Calculating the inclusion of the roads inside a certain border is an expensive operation. However, quantifying the exact expense is challenging as it depends on several factors. One factor is the complexity of the border. Borders are usually irregular and very detailed, as they need to correctly reflect the actual border. The complexity of the road geometry is another factor that plays an important role as roads are not always straight lines.

Figure 3 – Roads to localise

Since this operation is expensive both in terms of cloud cost and time to run, we need to identify a cheaper and faster way that would yield similar results. Knowing that the complexity of the border lines is the cause of the problem, we tried using a different alternative, a rectangle. Calculating the inclusion of a polyline inside a rectangle is a cheaper operation.

Figure 4 – Roads inside a rectangle

So we transformed this large, one step operation, where we test each road segment for inclusion in a border, into a series of smaller operations where we perform the following steps:

  1. Identify all the geohashes that are part of a certain area or belong to a certain border. In this process we include additional areas to make sure that we cover the entire surface inside the border.
  2. For each road segment, we identify the list of geohashes that it belongs to. A road, depending on its length or depending on its shape, might belong to multiple geohashes.

In Figure 5, we identify that the road belongs to two geohashes and that the two geohashes are part of the border we use.

Figure 5 – Geohashes as proxy

Now, all we need to do is join the two data sets together. This kind of operation is a great candidate for a big data approach, as it allows us to run it in parallel and speed up the processing time.

Precision tradeoff

We mentioned earlier that, for the sake of argument, we replace precision with a decent approximation. Let’s now delve into the real tradeoff by adopting this approach.

The first thing that stands out with this approach is that we traded precision for cost. We are able to reduce the cost as this approach uses less hardware resources and computation time. However, this reduction in precision suffers, particularly for roads located near the borders as they might be wrongly classified.

Going back to the initial example, let’s take the case of the external road, on the left side of the area. As you can see in Figure 6, it is clear that the road does not belong to our border. But when we apply the geohash approach it gets included into the middle geohash.

Figure 6 – Wrong road localisation

Given that just a small part of the geohash falls inside the border, the entire geohash will be classified as belonging to that area, and, as a consequence, the road that belongs to that geohash will be wrongly localised and we’ll end up adding the wrong localisation information to that road. This is clearly a consequence of the precision tradeoff. So, how can we solve this?

Geohash precision

One option is to increase the geohash precision. By using smaller and smaller geohashes, we can better reflect the actual area. As we go deeper and we further split the geohash, we can accurately follow the border. However, a high geohash precision also equates to a computationally intensive operation bringing us back to our initial situation. Therefore, it is crucial to find the right balance between the geohash size and the complexity of operations.

Figure 7 – Geohash precision

Geohash coverage percentage

To find a balance between precision and data loss, we looked into calculating the geohash coverage percentage. For example, in Figure 8, the blue geohash is entirely within the border. Here we can say that it has a 100% geohash coverage.

Figure 8 – Geohash inside the border

However, take for example the geohash in Figure 9. It touches the border and has only around 80% of its surface inside the area. Given that most of its surface is within the border, we still can say that it belongs to the area.

Figure 9 – Geohash partially inside the border

Let’s look at another example. In Figure 10, only a small part of the geohash is within the border. We can say that the geohash coverage percentage here is around 5%. For these cases, it becomes difficult for us to determine whether the geohash does belong to the area. What would be a good tradeoff in this case?

Figure 10 – Geohash barely inside the border

Border shape

To go one step further, we can consider a mixed solution, where we use the border shape but only for the geohashes touching the border. This would still be an intensive computational operation but the number of roads located in these geohashes will be much smaller, so it is still a gain.

For the geohashes with full coverage inside the area, we’ll use the geohash for the localisation, the simpler operation. For the geohashes that are near the border, we’ll use a different approach. To increase the precision around the borders, we can cut the geohash following the border’s shape. Instead of having a rectangle, we’ll use a more complex shape which is still simpler than the initial border shape.

Figure 11 – Geohash following a border’s shape

Result

We began with a simple approach and we enhanced it to improve precision. This also increased the complexity of the operation. We then asked, what are the actual gains? Was it worthwhile to go through all this process? In this section, we put this to the test.

We first created a benchmark by taking a small sample of the data and ran the localisation process on a laptop. The sample comprised approximately 2% of the borders and 0.0014% of the roads. We ran the localisation process using two approaches.

  • With the first approach, we calculated the intersection between all the roads and borders. The entire operation took around 38 minutes.
  • For the second approach, we optimised the operation using geohashes. In this approach, the runtime was only 78 seconds (1.3 minutes).

However, it is important to note that this is not an apples-to-apples comparison. The operation that we measured was the localisation of the roads but we did not include the border filling operation where we fill the borders with geohashes. This is because this operation does not need to be run every time. It can be run once and reused multiple times.

Though not often required, it is still crucial to understand and consider the operation of precomputing areas and filling borders with geohashes. The precomputation process depends on several factors:

  • Number and shape of the borders – The more borders and the more complex the borders are, the longer the operation will take.
  • Geohash precision – How accurate do we need our localisation to be? The more accurate it needs to be, the longer it will take.
  • Hardware availability

Going back to our hypothesis, although this precomputation might be expensive, it is rarely run as the borders don’t change often and can be triggered only when needed. However, regular computation, where we find the area to which each road belongs to, is often run as the roads change constantly. In our system, we run this localisation for each map processing.

We can also further optimise this process by applying the opposite approach. Geohashes that have full coverage inside a border can be merged together into larger geohashes thus simplifying the computation inside the border. In the end, we can have a solution that is fully optimised for our needs with the best cost-to-performance ratio.

Figure 12 – Optimised geohashes

Conclusion

Although geohashes seem to be the right solution for this kind of problem, we also need to monitor their content. One consideration is the road density inside a geohash. For example, a geohash inside a city centre usually has a lot of roads while one in the countryside may have much less. We need to consider this aspect to have a balanced computation operation and take full advantage of the big data approach. In our case, we achieve this balance by considering the number of road kilometres within a geohash.

Figure 13 – Unbalanced data

Additionally, the resources that we choose also matter. To optimise time and cost, we need to find the right balance between the running time and resource cost. As shown in Figure 14, based on a sample data we ran, sometimes, we get the best result when using smaller machines.

Figure 14 – Cost vs runtime

The achievements and insights showcased in this article are indebted to the contributions made by Mihai Chintoanu. His expertise and collaborative efforts have profoundly enriched the content and findings presented herein.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Streaming SQL in Data Mesh

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/streaming-sql-in-data-mesh-0d83f5a00d08

Democratizing Stream Processing @ Netflix

By Guil Pires, Mark Cho, Mingliang Liu, Sujay Jain

Data powers much of what we do at Netflix. On the Data Platform team, we build the infrastructure used across the company to process data at scale.

In our last blog post, we introduced “Data Mesh” — A Data Movement and Processing Platform. When a user wants to leverage Data Mesh to move and transform data, they start by creating a new Data Mesh pipeline. The pipeline is composed of individual “Processors” that are connected by Kafka topics. The Processors themselves are implemented as Flink jobs that use the DataStream API.

Since then, we have seen many use cases (including Netflix Graph Search) adopt Data Mesh for stream processing. We were able to onboard many of these use cases by offering some commonly used Processors out of the box, such as Projection, Filtering, Unioning, and Field Renaming.

An example of a Data Mesh pipeline which moves and transforms data using Union, GraphQL Enrichment, and Column Rename Processor before writing to an Iceberg table.

By keeping the logic of individual Processors simple, it allowed them to be reusable so we could centrally manage and operate them at scale. It also allowed them to be composable, so users could combine the different Processors to express the logic they needed.

However, this design decision led to a different set of challenges.

Some teams found the provided building blocks were not expressive enough. For use cases which were not solvable using existing Processors, users had to express their business logic by building a custom Processor. To do this, they had to use the low-level DataStream API from Flink and the Data Mesh SDK, which came with a steep learning curve. After it was built, they also had to operate the custom Processors themselves.

Furthermore, many pipelines needed to be composed of multiple Processors. Since each Processor was implemented as a Flink Job connected by Kafka topics, it meant there was a relatively high runtime overhead cost for many pipelines.

We explored various options to solve these challenges, and eventually landed on building the Data Mesh SQL Processor that would provide additional flexibility for expressing users’ business logic.

The existing Data Mesh Processors have a lot of overlap with SQL. For example, filtering and projection can be expressed in SQL through SELECT and WHERE clauses. Additionally, instead of implementing business logic by composing multiple individual Processors together, users could express their logic in a single SQL query, avoiding the additional resource and latency overhead that came from multiple Flink jobs and Kafka topics. Furthermore, SQL can support User Defined Functions (UDFs) and custom connectors for lookup joins, which can be used to extend expressiveness.

Data Mesh SQL Processor

Since Data Mesh Processors are built on top of Flink, it made sense to consider using Flink SQL instead of continuing to build additional Processors for every transform operation we needed to support.

The Data Mesh SQL Processor is a platform-managed, parameterized Flink Job that takes schematized sources and a Flink SQL query that will be executed against those sources. By leveraging Flink SQL within a Data Mesh Processor, we were able to support the streaming SQL functionality without changing the architecture of Data Mesh.

Underneath the hood, the Data Mesh SQL Processor is implemented using Flink’s Table API, which provides a powerful abstraction to convert between DataStreams and Dynamic Tables. Based on the sources that the processor is connected to, the SQL Processor will automatically convert the upstream sources as tables within Flink’s SQL engine. User’s query is then registered with the SQL engine and translated into a Flink job graph consisting of physical operators that can be executed on a Flink cluster. Unlike the low-level DataStream API, users do not have to manually build a job graph using low-level operators, as this is all managed by Flink’s SQL engine.

SQL Experience on Data Mesh

The SQL Processor enables users to fully leverage the capabilities of the Data Mesh platform. This includes features such as autoscaling, the ability to manage pipelines declaratively via Infrastructure as Code, and a rich connector ecosystem.

In order to ensure a seamless user experience, we’ve enhanced the Data Mesh platform with SQL-centric features. These enhancements include an Interactive Query Mode, real-time query validation, and automated schema inference.

To understand how these features help the users be more productive, let’s take a look at a typical user workflow when using the Data Mesh SQL Processor.

  • Users start their journey by live sampling their upstream data sources using the Interactive Query Mode.
  • As the user iterate on their SQL query, the query validation service provides real-time feedback about the query.
  • With a valid query, users can leverage the Interactive Query Mode again to execute the query and get the live results streamed back to the UI within seconds.
  • For more efficient schema management and evolution, the platform will automatically infer the output schema based on the fields selected by the SQL query.
  • Once the user is done editing their query, it is saved to the Data Mesh Pipeline, which will then be deployed as a long running, streaming SQL job.
Overview of the SQL Processor workflow.

Users typically iterate on their SQL query multiple times before deploying it. Validating and analyzing queries at runtime after deployment will not only slow down their iteration, but also make it difficult to automate schema evolution in Data Mesh.

To address this challenge, we have implemented a query validation service that can verify a Flink SQL query and provide a meaningful error message for violations in real time. This enables users to have prompt validation feedback while they are editing the query. We leverage Apache Flink’s internal Planner classes to parse and transform SQL queries without creating a fully-fledged streaming table environment. This makes the query service lightweight, scalable, and execution agnostic.

To effectively operate thousands of use cases at the platform layer, we built opinionated guardrails to limit some functionalities of Flink SQL. We plan on gradually expanding the supported capabilities over time. We implemented the guardrails by recursively inspecting the Calcite tree constructed from user’s query. If the tree contains nodes that we currently don’t support, the query will be rejected from being deployed. Additionally, we translate Flink’s internal exceptions containing cryptic error messages into more meaningful error messages for our users. We plan on continuing our investments into improving the guardrails, as having proper guardrails help to improve the user experience. Some ideas for the future include rules to reject expensive and suboptimal queries.

To help Data Mesh users iterate quickly on their business logic, we have built the Interactive Query Mode as part of the platform. Users can start live sampling their streaming data by executing a simple `SELECT * FROM <table>` query. Using the Interactive Query Mode, Data Mesh platform will execute the Flink SQL query and display the results in the UI in seconds. Since this is a Flink SQL query on streaming data, new results will continue to be delivered to the user in real-time.

Users can continue to iterate and modify their Flink SQL query and once they’re satisfied with their query output, they can save the query as part of their stream processing pipeline.

To provide this interactive experience, we maintain an always-running Flink Session Cluster that can run concurrent parameterized queries. These queries will output their data to a Mantis sink in order to stream the results back to the user’s browser.

An animated gif showing the interactive query mode in action
Interactive Query mode in action

Learnings from our journey

In hindsight, we wish we had invested in enabling Flink SQL on the DataMesh platform much earlier. If we had the Data Mesh SQL Processor earlier, we would’ve been able to avoid spending engineering resources to build smaller building blocks such as the Union Processor, Column Rename Processor, Projection and Filtering Processor.

Since we’ve productionized Data Mesh SQL Processor, we’ve seen excitement and quick adoption from our Data Mesh users. Thanks to the flexibility of Flink SQL, users have a new way to express their streaming transformation logic other than writing a custom processor using the low-level DataStream API.

While Flink SQL is a powerful tool, we view the Data Mesh SQL Processor as a complimentary addition to our platform. It is not meant to be a replacement for custom processors and Flink jobs using low-level DataStream API. Since SQL is a higher-level abstraction, users no longer have control over low-level Flink operators and state. This means that if state evolution is critical to the user’s business logic, then having complete control over the state can only be done through low-level abstractions like the DataStream API. Even with this limitation, we have seen that there are many new use cases that are unlocked through the Data Mesh SQL Processor.

Our early investment in guardrails has helped set clear expectations with our users and keep the operational burden manageable. It has allowed us to productionize queries and patterns that we are confident about supporting, while providing a framework to introduce new capabilities gradually.

Future of SQL on Data Mesh

While introducing the SQL Processor to the Data Mesh platform was a great step forward, we still have much more work to do in order to unlock the power of stream processing at Netflix. We’ve been working with our partner teams to prioritize and build the next set of features to extend the SQL Processor. These include stream enrichment using Slowly-Changing-Dimension (SCD) tables, temporal joins, and windowed aggregations.

Stay tuned for more updates!


Streaming SQL in Data Mesh was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Building hyperlocal GrabMaps

Post Syndicated from Grab Tech original https://engineering.grab.com/building-hyperlocal-grabmaps

Introduction

Southeast Asia (SEA) is a dynamic market, very different from other parts of the world. When travelling on the road, you may experience fast-changing road restrictions, new roads appearing overnight, and high traffic congestion. To address these challenges, GrabMaps has adapted to the SEA market by leveraging big data solutions. One of the solutions is the integration of hyperlocal data in GrabMaps.

Hyperlocal information is oriented around very small geographical communities and obtained from the local knowledge that our map team gathers. The map team is spread across SEA, enabling us to define clear specifications (e.g. legal speed limits), and validate that our solutions are viable.

Figure 1 – Map showing detections from images and probe data, and hyperlocal data.

Hyperlocal inputs make our mapping data even more robust, adding to the details collected from our image and probe detection pipelines. Figure 1 shows how data from our detection pipeline is overlaid with hyperlocal data, and then mapped across the SEA region. If you are curious and would like to check out the data yourself, you can download it here.

Processing hyperlocal data

Now let’s go through the process of detecting hyperlocal data.

Download data

GrabMaps is based on OpenStreetMap (OSM). The first step in the process is to download the .pbf file for Asia from geofabrick.de. This .pbf file contains all the data that is available on OSM, such as details of places, trees, and roads. Take for example a park, the .pbf file would contain data on the park name, wheelchair accessibility, and many more.

For this article, we will focus on hyperlocal data related to the road network. For each road, you can obtain data such as the type of road (residential or motorway), direction of traffic (one-way or more), and road name.

Convert data

To take advantage of big data computing, the next step in the process is to convert the .pbf file into Parquet format using a Parquetizer. This will convert the binary data in the .pbf file into a table format. Each road in SEA is now displayed as a row in a table as shown in Figure 2.

Figure 2 – Road data in Parquet format.

Identify hyperlocal data

After the data is prepared, GrabMaps then identifies and inputs all of our hyperlocal data, and delivers a consolidated view to our downstream services. Our hyperlocal data is obtained from various sources, either by looking at geometry, or other attributes in OSM such as the direction of travel and speed limit. We also apply customised rules defined by our local map team, all in a fully automated manner. This enhances the map together with data obtained from our rides and deliveries GPS pings and from KartaView, Grab’s product for imagery collection.

Figure 3 – Architecture diagram showing how hyperlocal data is integrated into GrabMaps.

Benefit of our hyperlocal GrabMaps

GrabNav, a turn-by-turn navigation tool available on the Grab driver app, is one of our products that benefits from having hyperlocal data. Here are some hyperlocal data that are made available through our approach:

  • Localisation of roads: The country, state/county, or city the road is in
  • Language spoken, driving side, and speed limit
  • Region-specific default speed regulations
  • Consistent name usage using language inference
  • Complex attributes like intersection links

To further explain the benefits of this hyperlocal feature, we will use intersection links as an example. In the next section, we will explain how intersection links data is used and how it impacts our driver-partners and passengers.

An intersection link is when two or more roads meet. Figure 4 and 5 illustrates what an intersection link looks like in a GrabMaps mock and in OSM.

Figure 4 – Mock of an intersection link.
Figure 5 – Intersection link illustration from a real road network in OSM.

To locate intersection links in a road network, there are computations involved. We would first combine big data processing (which we do using Spark) with graphs. We use geohash as the unit of processing, and for each geohash, a bi-directional graph is created.

From such resulting graphs, we can determine intersection links if:

  • Road segments are parallel
  • The roads have the same name
  • The roads are one way roads
  • Angles and the shape of the road are in the intervals or requirements we seek

Each intersection link we identify is tagged in the map as intersection_links. Our downstream service teams can then identify them by searching for the tag.

Impact

The impact we create with our intersection link can be explained through the following example.

Figure 6 – Longer route, without GrabMaps intersection link feature. The arrow indicates where the route should have suggested a U-turn.
Figure 7 – Shorter route using GrabMaps by taking a closer link between two main roads.

Figure 6 and Figure 7 show two different routes for the same origin and destination. However, you can see that Figure 7 has a shorter route and this is made available by taking an intersection link early on in the route. The highlighted road segment in Figure 7 is an intersection link, tagged by the process we described earlier. The route is now much shorter making GrabNav more efficient in its route suggestion.

There are numerous factors that can impact a driver-partner’s trip, and intersection links are just one example. There are many more features that GrabMaps offers across Grab’s services that allow us to “outserve” our partners.

Conclusion

GrabMaps and GrabNav deliver enriched experiences to our driver-partners. By integrating certain hyperlocal data features, we are also able to provide more accurate pricing for both our driver-partners and passengers. In our mission towards sustainable growth, this is an area that we will keep on improving by leveraging scalable tech solutions.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Let’s Architect! Architecting a data mesh

Post Syndicated from Luca Mezzalira original https://aws.amazon.com/blogs/architecture/lets-architect-architecting-a-data-mesh/

Data architectures were mainly designed around technologies rather than business domains in the past. This changed in 2019, when Zhamak Dehghani introduced the data mesh. Data mesh is an application of the Domain-Driven-Design (DDD) principles to data architectures: Data is organized into data domains and the data is the product that the team owns and offers for consumption.

A data mesh architecture unites the disparate data sources within an organization through centrally managed data-sharing and governance guidelines. Business functions can maintain control over how shared data is accessed because data mesh also solves advanced data security challenges through distributed, decentralized ownership.

This edition of Let’s Architect! introduces data mesh, highlights the foundational concepts of data architectures, and covers the patterns for designing a data mesh in the AWS cloud with supporting resources.

Data lakes, lake houses and data mesh: what, why, and how?

Let’s explore a video introduction to data lakes, lake houses, and data mesh. This resource explains how to leverage those concepts to gain greater data insights across different business segments, with a special focus on best practices to build a well-architected, modern data architecture on AWS. It also gives an overview of the AWS cloud services that can be used to create such architectures and describes the fundamental pillars of designing them.

Take me to this intro to data lakes, lake houses, and data mesh video!

Data mesh is an architecture pattern where data are organized into domains and seen as products to expose for consumption

Data mesh is an architecture pattern where data are organized into domains and seen as products to expose for consumption

Building data mesh architectures on AWS

Knowing what a data mesh architecture is, here is a step-by-step video from re:Invent 2022 on designing one. It covers a use case on how GoDaddy considered and implemented data mesh, in addition to:

  • The fundamental pillars behind a well-architected data mesh in the cloud
  • Finding an approach to build a data mesh architecture using native AWS services
  • Reasons for considering a data mesh architecture where data lakes provide limitations in some scenarios
  • How data mesh can be applied in practice to overcome them
  • The mental models to apply during the data mesh design process

Take me to this re:Invent 2022 video!

In the data mesh architecture the producers expose their data for consumption to the consumers. Access is regulated through a centralized governance layer.

In the data mesh architecture the producers expose their data for consumption to the consumers. Access is regulated through a centralized governance layer.

Amazon DataZone: Democratize data with governance

Now let’s explore data accessibility as it relates to data mesh architectures.

Amazon DataZone is a new AWS business data catalog allowing you to unlock data across organizational boundaries with built-in governance. This service provides a unified environment where everyone in an organization—from data producers to data consumers—can access, share, and consume data in a governed manner.

Here is a video to learn how to apply AWS analytics services to discover, access, and share data across organizational boundaries within the context of a data mesh architecture.

Take me to this re:Invent 2022 video!

Amazon DataZone accelerates the adoption of the data mesh pattern by making it scalable to high number of producers and consumers.

Amazon DataZone accelerates the adoption of the data mesh pattern by making it scalable to high number of producers and consumers.

Build a data mesh on AWS

Feeling inspired to build? Hands-on experience is a great way to learn and see how the theoretical concepts apply in practice.

This workshop teaches you a data mesh architecture building approach on AWS. Many organizations are interested in implementing this architecture to:

  1. Move away from centralized data lakes to decentralized ownership
  2. Deliver analytics solutions across business units

Learn how a data mesh architecture can be implemented with AWS native services.

Take me to this workshop!

The diagrams shows how to separate the producers, consumers and governance components through a multi-account strategy.

The diagrams shows how to separate the producers, consumers and governance components through a multi-account strategy.

See you next time!

Thanks for exploring architecture tools and resources with us!

Next time we’ll talk about monitoring and observability.

To find all the posts from this series, check out the Let’s Architect! page of the AWS Architecture Blog.

AWS Local Zones and AWS Outposts, choosing the right technology for your edge workload

Post Syndicated from Sheila Busser original https://aws.amazon.com/blogs/compute/aws-local-zones-and-aws-outposts-choosing-the-right-technology-for-your-edge-workload/

This blog post is written by Joe Sacco, Senior Technical Account Manager.

The AWS Global Cloud Infrastructure includes 30 Launched Regions, 96 Availability Zones (AZs), 410+ Points of Presence with 400+ Edge Locations, and 13 Regional Edge Caches.  With over 200 AWS services, most customer workloads can run in the AWS Regions. However, for some location-sensitive workloads with low-latency or data residency requirements, and when an AWS Region isn’t close enough, AWS offers two additional infrastructure options: AWS Local Zones and AWS Outposts. Although Local Zones and Outposts solve for similar problems, we’ll review use cases as well as the services and features available that can help you decide which offering best suits your needs.

Let’s start with an overview of Local Zones and Outposts.

What are Local Zones?

Local Zones are a new type of infrastructure deployment that places AWS compute, storage, database, and other select AWS services in large metropolitan areas closer to end users. This gives you access to single-digit millisecond latency with the use of AWS Direct Connect and the ability to meet data residency requirements. Local Zones are also connected to their parent Region via AWS’s redundant and high bandwidth private network. This gives applications running in Local Zones fast, secure, and seamless access to a complete list of services in the parent Region.

Unlike Outposts, which you deploy within your datacenter or a co-location of your choice, Local Zones are owned, managed, and operated by AWS. Local Zones eliminate the need for you to manage power, connectivity, and capacity. Furthermore, you can provision workloads on a Local Zone from your AWS Management Console just as you would for AZs and Regions today.

AWS Local Zones how it worksWhat is Outposts?

Outposts is a family of fully managed solutions delivering AWS infrastructure and services to virtually any on-premises or edge location for a truly consistent hybrid experience. Outposts lets you run some AWS services locally and connect to a broad range of services available in the local AWS Region. Outposts comes in two types of offerings: Outposts rack and Outposts servers, with which you can run applications and workloads on-premises using the same AWS infrastructure, services, tools, and APIs as in AWS Regions.

The Outposts rack is available as an industry standard 42U form factor. It provides the same AWS infrastructure, services, tools, and APIs to your data center or co-location space  that you would find in an AWS Region.

Outposts Rack

The Outposts servers come in a 1U or 2U form factor and are designed for locations that have limited space or smaller capacity requirements. Both support different compute instances, as detailed in the Outposts servers feature page.

Outposts ServersCustomer use cases

Now that we have an overview of both Local Zones and Outposts service offerings, let’s dive into use cases, the differences between them, and how your business can leverage each to accomplish your workloads requirements.

Low latency

Customers today require low latency computing for workloads, such as medical imaging, transaction processing for Enterprise Resource Planning (ERP) applications, enterprise migration with hybrid architecture, real-time multiplayer gaming, telco network function virtualization, and regulated gaming workloads.

Outposts can meet ultra-low latency requirements. This is accomplished by bringing AWS services on premises and to the edge at Outpost Sites. An Outpost site is the physical location where your Outpost operates, and it can be local within one of your data centers or at a co-location facility of your choice.

When accessing from within the same metro, Local Zones will provide you with a low, single millisecond latency experience when communicating with your applications. Latency between Local Zones and AWS Regions or Local Zones and on-premises environments varies, and these will depend on how close the nearest Local Zone is as well as the type of modality used for the connection (Public Internet, VPN, and AWS Direct Connect). You should always choose the closest Local Zone location to achieve the lowest possible latency. For use cases such as mobile gaming, you can utilize Local Zones by deploying your applications to a Local Zone location nearest to your end users. Local Zones are generally available in 17 metros across the US, 4 outside the US, and we are continuing to launch Local Zones in 30 cities across 25 countries. Check out updates for more general availability of Local Zones.

Data residency

On occasion, data must remain in a specific geographic region for regulatory or information security reasons. Healthcare and other regulated industries, such as financial services or Oil & Gas, have specific data residency requirements.

Outposts helps meet a customer’s data residency requirements because it’s installed on premises and essentially brings AWS to where the data currently resides. This allows you to pick and control where your workloads run, and where your data will stay. Check out the full list of countries and territories where Outposts is available on the FAQs page of Outposts rack and the FAQs page of Outposts servers.

Local Zones bring AWS closer or within a customer’s geographic boundary in a fully AWS owned and operated mode. Although Local Zones can help meet data residency use cases in some scenarios, data residency requirements vary depending on the jurisdictions. Therefore, you should work closely with your compliance and information security teams when choosing the Local Zone location in which to deploy your regulated workloads.

Migration and modernization

When trying to migrate to the cloud and modernize your stack, some workloads can be challenging. Often there are on-premises applications which are difficult to move into Regions due to latency-sensitive system intermittencies between their various components. As dependencies arise, you may choose to segment these migrations into smaller pieces. Then this will require latency-sensitive connectivity between the various parts of the application.

Outposts and Local Zones both allow for a gradual migration and modernization of your stack. You can choose to migrate parts of their workloads while still maintaining latency-sensitive connectivity between components until the entirety is ready to move.

Factors in selecting Local Zones or Outposts

Choosing between Local Zones and Outposts will depend on the following factors, and you should examine all of them together when selecting a service for your use case.

  1. Latency requirements

Local Zones can achieve low single millisecond latency when accessing within the same metro. On the other hand, Outposts can achieve ultra-low latency requirements when deployed within your datacenter or at a co-location facility of your choice. When selecting one over the other, you must work backward from your goal and workload requirements.

If you’re conducting a migration and modernization strategy which requires ultra-low latency between a workloads application and database tiers that are difficult to migrate to the AWS Regions, then Outposts would be the right solution for you.

Alternatively, if your workload involves streaming live broadcasts to end users which requires low single millisecond latency, but your end users are located where an AWS Region isn’t available, then Local Zones distributed across various metros would work best to serve your content.

  1. Availability of services needed to support your workload

Local Zones and Outposts differ with their list of supported AWS services, and you must review your workload’s service requirements when determining the best fit for you. For example, if a customer has a computer vision workload that requires storing and retrieving large volumes of images locally using Amazon Simple Storage Service (Amazon S3), then Outposts and certain Local Zones meet this requirement while other Local Zones don’t. Learn how you can use Amazon S3 on Outposts for computer vision workloads.

Outposts rack and servers support different sets of AWS services locally. You can view comparisons between them, or visit the Outposts servers and Outposts rack feature sites for more details.

Local Zones’ features vary depending on the location in which you choose to deploy. You can view more details and a full list of supported features and services per location on our Local Zones features page.

  1. Investment and management of infrastructure on-premises

Management of the infrastructure and prerequisites are another factor when considering which AWS service best suits your needs.

Outposts is ordered through AWS, and it requires installation in a customer’s on-premises datacenter or co-location provider of their choice. Outposts rack installation is handled by AWS, while Outposts servers installation is done by the customer or a third-party of their choosing. There are power and redundant networking requirements for the Outpost Site, as well as a required subscription to AWS Enterprise Support or On-Ramp Support.

Local Zones infrastructure is fully-managed by AWS, including the power, networking, and capacity. This reduces operational management as well as the overhead cost for customers. An Enterprise support agreement isn’t required to utilize Local Zones.

You should always choose Regions or Local Zones if your use case allows, and use Outposts when a Region or Local Zone isn’t a good fit. If both Outposts and Local Zones fit a customer’s use case and requirements, then Local Zones will be the preferred choice.

  1. Regulations, compliance, and information security

If a Local Zone is either unavailable or unable to meet your residency requirements within your geographic boundary consider Outposts, which can be deployed to a data center or co-location facility of your choice. Data residency requirements can be a factor based on your industry and the regulations to which your workload must adhere. Furthermore, you should work closely with your compliance and information security teams when choosing between Local Zones or Outposts.

Conclusion

Whether you’re dealing with latency-sensitive applications, data residency requirements, or a migration and modernization strategy, AWS provides options and flexibility for you to leverage the same AWS infrastructure, services, APIs, and tools to metro areas and on-premises locations with Local Zones and Outposts.

The decision of which technology to use will depend on several factors that we discussed above. You must work across teams within your organization to make sure that the latency requirements (low single millisecond latency within a metro for Local Zones vs the ultra low latency of Outposts when deployed close to or within your datacenter), data reseidency needs, installation prerequisites, and availability of services to support your workload are met.

Once these factors are taken into account, and you have made a choice, visit our product pages for Outposts and Local Zones with information on how you can get started.

Let’s Architect! Modern data architectures

Post Syndicated from Luca Mezzalira original https://aws.amazon.com/blogs/architecture/lets-architect-modern-data-architectures/

With the rapid growth in data coming from data platforms and applications, and the continuous improvements in state-of-the-art machine learning algorithms, data are becoming key assets for companies.

Modern data architectures include data mesh—a recent style that represents a paradigm shift, in which data is treated as a product and data architectures are designed around business domains. This type of approach supports the idea of distributed data, where each business domain focuses on the quality of the data it produces and exposes to the consumers.

In this edition of Let’s Architect!, we focus on data mesh and how it is designed on AWS, plus other approaches to adopt modern architectural patterns.

Design a data mesh architecture using AWS Lake Formation and AWS Glue

Domain Driven Design (DDD) is a software design approach where a solution is divided into domains aligned with business capabilities, software, and organizational boundaries. Unlike software architectures, most data architectures are often designed around technologies rather than business domains.

In this blog, you can learn about data mesh, an architectural pattern that applies the principles of DDD to data architectures. Data are organized into domains and considered the product that each team owns and offers for consumption.

A data mesh design organizes around data domains. Each domain owns multiple data products with their own data and technology stacks

A data mesh design organizes around data domains. Each domain owns multiple data products with their own data and technology stacks

Building Data Mesh Architectures on AWS

In this video, discover how to use the data mesh approach in AWS. Specifically, how to implement certain design patterns for building a data mesh architecture with AWS services in the cloud.

This is a pragmatic presentation to get a quick understanding of data mesh fundamentals, the benefits/challenges, and the AWS services that you can use to build it. This video provides additional context to the aforementioned blog post and includes several examples on the benefits of modern data architectures.

This diagram demonstrates the pattern for sharing data catalogs between producer domains and consumer domains

This diagram demonstrates the pattern for sharing data catalogs between producer domains and consumer domains

Build a modern data architecture on AWS with Amazon AppFlow, AWS Lake Formation, and Amazon Redshift

In this blog, you can learn how to build a modern data strategy using AWS managed services to ingest data from sources like Salesforce. Also discussed is how to automatically create metadata catalogs and share data seamlessly between the data lake and data warehouse, plus creating alerts in the event of an orchestrated data workflow failure.

The second part of the post explains how a data warehouse can be built by using an agile data modeling pattern, as well as how ELT jobs were quickly developed, orchestrated, and configured to perform automated data quality testing.

A data platform architecture and the subcomponents used to build it

A data platform architecture and the subcomponents used to build it

AWS Lake Formation Workshop

With a modern data architecture on AWS, architects and engineers can rapidly build scalable data lakes; use a broad and deep collection of purpose-built data services; and ensure compliance via unified data access, security, and governance. As data mesh is a modern architectural pattern, you can build it using a service like AWS Lake Formation.

Familiarize yourself with new technologies and services by not only learning how they work, but also to building prototypes and projects to gain hands-on experience. This workshop allows builders to become familiar with the features of AWS Lake Formation and its integrations with other AWS services.

A data catalog is a key component in a data mesh architecture. AWS Glue crawlers interact with data stores and other elements to populate the data catalog

A data catalog is a key component in a data mesh architecture. AWS Glue crawlers interact with data stores and other elements to populate the data catalog

See you next time!

Thanks for joining our discussion on data mesh! See you in a couple of weeks when we talk more about architectures and the challenges that we face every day while working with distributed systems.

Other posts in this series

Looking for more architecture content?

AWS Architecture Center provides reference architecture diagrams, vetted architecture solutions, Well-Architected best practices, patterns, icons, and more!

Interactively develop your AWS Glue streaming ETL jobs using AWS Glue Studio notebooks

Post Syndicated from Arun A K original https://aws.amazon.com/blogs/big-data/interactively-develop-your-aws-glue-streaming-etl-jobs-using-aws-glue-studio-notebooks/

Enterprise customers are modernizing their data warehouses and data lakes to provide real-time insights, because having the right insights at the right time is crucial for good business outcomes. To enable near-real-time decision-making, data pipelines need to process real-time or near-real-time data. This data is sourced from IoT devices, change data capture (CDC) services like AWS Data Migration Service (AWS DMS), and streaming services such as Amazon Kinesis, Apache Kafka, and others. These data pipelines need to be robust, able to scale, and able to process large data volumes in near-real time. AWS Glue streaming extract, transform, and load (ETL) jobs process data from data streams, including Kinesis and Apache Kafka, apply complex transformations in-flight, and load it into a target data stores for analytics and machine learning (ML).

Hundreds of customers are using AWS Glue streaming ETL for their near-real-time data processing requirements. These customers required an interactive capability to process streaming jobs. Previously, when developing and running a streaming job, you had to wait for the results to be available in the job logs or persisted into a target data warehouse or data lake to be able to view the results. With this approach, debugging and adjusting code is difficult, resulting in a longer development timeline.

Today, we are launching a new AWS Glue streaming ETL feature to interactively develop streaming ETL jobs in AWS Glue Studio notebooks and interactive sessions.

In this post, we provide a use case and step-by-step instructions to develop and debug your AWS Glue streaming ETL job using a notebook.

Solution overview

To demonstrate the streaming interactive sessions capability, we develop, test, and deploy an AWS Glue streaming ETL job to process Apache Webserver logs. The following high-level diagram represents the flow of events in our job.
BDB-2464 High Level Application Architecture
Apache Webserver logs are streamed to Amazon Kinesis Data Streams. An AWS Glue streaming ETL job consumes the data in near-real time and runs an aggregation that computes how many times a webpage has been unavailable (status code 500 and above) due to an internal error. The aggregate information is then published to a downstream Amazon DynamoDB table. As part of this post, we develop this job using AWS Glue Studio notebooks.

You can either work with the instructions provided in the notebook, which you download when instructed later in this post, or follow along with this post to author your first streaming interactive session job.

Prerequisites

To get started, click the Launch Stack button below, to run an AWS CloudFormation template on your AWS environment.

BDB-2063-launch-cloudformation-stack

The template provisions a Kinesis data stream, DynamoDB table, AWS Glue job to generate simulated log data, and the necessary AWS Identity and Access Management (IAM) role and polices. After you deploy your resources, you can review the Resources tab on the AWS CloudFormation console for detailed information.

Set up the AWS Glue streaming interactive session job

To set up your AWS Glue streaming job, complete the following steps:

  1. Download the notebook file and save it to a local directory on your computer.
  2. On the AWS Glue console, choose Jobs in the navigation pane.
  3. Choose Create job.
  4. Select Jupyter Notebook.
  5. Under Options, select Upload and edit an existing notebook.
  6. Choose Choose file and browse to the notebook file you downloaded.
  7. Choose Create.
BDB-2464 Create Job
  1. For Job name¸ enter a name for the job.
  2. For IAM Role, use the role glue-iss-role-0v8glq, which is provisioned as part of the CloudFormation template.
  3. Choose Start notebook job.
BDB-2464 Start Notebook

You can see that the notebook is loaded into the UI. There are markdown cells with instructions as well as code blocks that you can run sequentially. You can either run the instructions on the notebook or follow along with this post to continue with the job development.

BDB-2464 Explore Notebook

Run notebook cells

Let’s run the code block that has the magics. The notebook has notes on what each magic does.

  1. Run the first cell.
BDB-2464 Run First Cell

After running the cell, you can see in the output section that the defaults have been reconfigured.

BDB-2464 Configurations Set

In the context of streaming interactive sessions, an important configuration is job type, which is set to streaming. Additionally, to minimize costs, the number of workers is set to 2 (default 5), which is sufficient for our use case that deals with a low-volume simulated dataset.

Our next step is to initialize an AWS Glue streaming session.

  1. Run the next code cell.
BDB-2464 Initiate Session

After we run this cell, we can see that a session has been initialized and a session ID is created.

A Kinesis data stream and AWS Glue data generator job that feeds into this stream have already been provisioned and triggered by the CloudFormation template. With the next cell, we consume this data as an Apache Spark DataFrame.

  1. Run the next cell.
BDB-2464 Fetch From Kinesis

Because there are no print statements, the cells don’t show any output. You can proceed to run the following cells.

Explore the data stream

To help enhance the interactive experience in AWS Glue interactive sessions, GlueContext provides the method getSampleStreamingDynamicFrame. It provides a snapshot of the stream in a static DynamicFrame. It takes three arguments:

  • The Spark streaming DataFrame
  • An options map
  • A writeStreamFunction to apply a function to every sampled record

Available options are as follows:

  • windowSize – Also known as the micro-batch duration, this parameter determines how long a streaming query will wait after the previous batch was triggered.
  • pollingTimeInMs – This is the total length of time the method will run. It starts at least one micro-batch to obtain sample records from the input stream. The time unit is milliseconds, and the value should be greater than the windowSize.
  • recordPollingLimit – This is defaulted to 100, and helps you set an upper bound on the number of records that is retrieved from the stream.

Run the next code cell and explore the output.

BDB-2464 Sample Data

We see that the sample consists of 100 records (the default record limit), and we have successfully displayed the first 10 records from the sample.

Work with the data

Now that we know what our data looks like, we can write the logic to clean and format it for our analytics.

Run the code cell containing the reformat function.

Note that Python UDFs aren’t the recommended way to handle data transformations in a Spark application. We use reformat() to exemplify troubleshooting. When working with a real-world production application, we recommend using native APIs wherever possible.

BDB-2464 Run The UDF

We see that the code cell failed to run. The failure was on purpose. We deliberately created a division by zero exception in our parser.

BDB-2464 Error Running The Code

Failure and recovery

In case of a regular AWS Glue job, for any error, the whole application exits, and you have to make code changes and resubmit the application. However, in case of interactive sessions, the coding context and definitions are fully preserved and the session is still operational. There is no need to bootstrap a new cluster and rerun all the preceding transformation. This allows you to focus on quickly iterating your batch function implementation to obtain the desired outcome. You can fix the defects and run them in a matter of seconds.

To test this out, go back to the code and comment or delete the erroneous line error_line=1/0 and rerun the cell.

BDB-2464 Error Corrected

Implement business logic

Now that we have successfully tested our parsing logic on the sample stream, let’s implement the actual business logic. The logics are implemented in the processBatch method within the next code cell. In this method, we do the following:

  • Pass the streaming DataFrame in micro-batches
  • Parse the input stream
  • Filter messages with status code >=500
  • Over a 1-minute interval, get the count of failures per webpage
  • Persist the preceding metric to a DynamoDB table (glue-iss-ddbtbl-0v8glq)
  1. Run the next code cell to trigger the stream processing.
BDB-2464 Trigger DDB Write
  1. Wait a few minutes for the cell to complete.
  2. On the DynamoDB console, navigate to the Items page and select the glue-iss-ddbtbl-0v8glq table.
BDB-2464 Explore DDB

The page displays the aggregated results that have been written by our interactive session job.

Deploy the streaming job

So far, we have been developing and testing our application using the streaming interactive sessions. Now that we’re confident of the job, let’s convert this into an AWS Glue job. We have seen that the majority of code cells are doing exploratory analysis and sampling, and aren’t required to be a part of the main job.

A commented code cell that represents the whole application is provided to you. You can uncomment the cell and delete all other cells. Another option would be to not use the commented cell, but delete just the two cells from the notebook that do the sampling or debugging and print statements.

To delete a cell, choose the cell and then choose the delete icon.

BDB-2464 Delete a Cell

Now that you have the final application code ready, save and deploy the AWS Glue job by choosing Save.

BDB-2464 Save Job

A banner message appears when the job is updated.

BDB-2464 Save Job Banner

Explore the AWS Glue job

After you save the notebook, you should be able to access the job like any regular AWS Glue job on the Jobs page of the AWS Glue console.

BDB-2464 Job Page

Additionally, you can look at the Job details tab to confirm the initial configurations, such as number of workers, have taken effect after deploying the job.

BDB-2464 Job Details Page

Run the AWS Glue job

If needed, you can choose Run to run the job as an AWS Glue streaming job.

BDB-2464 Job Run

To track progress, you can access the run details on the Runs tab.

BDB-2464 Job Run Details

Clean up

To avoid incurring additional charges to your account, stop the streaming job that you started as part of the instructions. Also, on the AWS CloudFormation console, select the stack that you provisioned and delete it.

Conclusion

In this post, we demonstrated how to do the following:

  • Author a job using notebooks
  • Preview incoming data streams
  • Code and fix issues without having to publish AWS Glue jobs
  • Review the end-to-end working code, remove any debugging, and print statements or cells from the notebook
  • Publish the code as an AWS Glue job

We did all of this via a notebook interface.

With these improvements in the overall development timelines of AWS Glue jobs, it’s easier to author jobs using the streaming interactive sessions. We encourage you to use the prescribed use case, CloudFormation stack, and notebook to jumpstart your individual use cases to adopt AWS Glue streaming workloads.

The goal of this post was to give you hands-on experience working with AWS Glue streaming and interactive sessions. When onboarding a productionized workload onto your AWS environment, based on the data sensitivity and security requirements, ensure you implement and enforce tighter security controls.


About the authors

Arun A K is a Big Data Solutions Architect with AWS. He works with customers to provide architectural guidance for running analytics solutions on the cloud. In his free time, Arun loves to enjoy quality time with his family.

Linan Zheng is a Software Development Engineer at AWS Glue Streaming Team, helping building the serverless data platform. His works involve large scale optimization engine for transactional data formats and streaming interactive sessions.

Roman Gavrilov is an Engineering Manager at AWS Glue. He has over a decade of experience building scalable Big Data and Event-Driven solutions. His team works on Glue Streaming ETL to allow near real time data preparation and enrichment for machine learning and analytics.

Shiv Narayanan is a Senior Technical Product Manager on the AWS Glue team. He works with AWS customers across the globe to strategize, build, develop, and deploy modern data platforms.

From centralized architecture to decentralized architecture: How data sharing fine-tunes Amazon Redshift workloads

Post Syndicated from Jingbin Ma original https://aws.amazon.com/blogs/big-data/from-centralized-architecture-to-decentralized-architecture-how-data-sharing-fine-tunes-amazon-redshift-workloads/

Amazon Redshift is a fully managed, petabyte-scale, massively parallel data warehouse that offers simple operations and high performance. It makes it fast, simple, and cost-effective to analyze all your data using standard SQL and your existing business intelligence (BI) tools. Today, Amazon Redshift has become the most widely used cloud data warehouse.

With the significant growth of data for big data analytics over the years, some customers have asked how they should optimize Amazon Redshift workloads. In this post, we explore how to optimize workloads on Amazon Redshift clusters using Amazon Redshift RA3 nodes, data sharing, and pausing and resuming clusters. For more cost-optimization methods, refer to Getting the most out of your analytics stack with Amazon Redshift.

Key features of Amazon Redshift

First, let’s review some key features:

  • RA3 nodes – Amazon Redshift RA3 nodes are backed by a new managed storage model that gives you the power to separately optimize your compute power and your storage. They bring a few very important features, one of which is data sharing. RA3 nodes also support the ability to pause and resume, which allows you to easily suspend on-demand billing while the cluster is not being used.
  • Data sharing – Amazon Redshift data sharing offers you to extend the ease of use, performance, and cost benefits of Amazon Redshift in a single cluster to multi-cluster deployments while being able to share data. Data sharing enables instant, granular, and fast data access across Redshift clusters without the need to copy or move it. You can securely share live data with Amazon Redshift clusters in the same or different AWS accounts, and across regions. You can share data at many levels, including schemas, tables, views, and user-defined functions. You can also share the most up-to-date and consistent information as it’s updated in Amazon Redshift Serverless. It also provides fine-grained access controls that you can tailor for different users and businesses that all need access to the data. However, data sharing in Amazon Redshift has a few limitations.

Solution overview

In this use case, our customer is heavily using Amazon Redshift as their data warehouse for their analytics workloads, and they have been enjoying the possibility and convenience that Amazon Redshift brought to their business. They mainly use Amazon Redshift to store and process user behavioral data for BI purposes. The data has increased by hundreds of gigabytes daily in recent months, and employees from departments continuously run queries against the Amazon Redshift cluster on their BI platform during business hours.

The company runs four major analytics workloads on a single Amazon Redshift cluster, because some data is used by all workloads:

  • Queries from the BI platform – Various queries run mainly during business hours.
  • Hourly ETL – This extract, transform, and load (ETL) job runs in the first few minutes of each hour. It generally takes about 40 minutes.
  • Daily ETL – This job runs twice a day during business hours, because the operation team needs to get daily reports before the end of the day. Each job normally takes between 1.5–3 hours. It’s the second-most resource-heavy workload.
  • Weekly ETL – This job runs in the early morning every Sunday. It’s the most resource-heavy workload. The job normally takes 3–4 hours.

The analytics team has migrated to the RA3 family and increased the number of nodes of the Amazon Redshift cluster to 12 over the years to keep the average runtime of queries from their BI tool within an acceptable time due to the data size, especially when other workloads are running.

However, they have noticed that performance is reduced while running ETL tasks, and the duration of ETL tasks is long. Therefore, the analytics team wants to explore solutions to optimize their Amazon Redshift cluster.

Because CPU utilization spikes appear while the ETL tasks are running, the AWS team’s first thought was to separate workloads and relevant data into multiple Amazon Redshift clusters with different cluster sizes. By reducing the total number of nodes, we hoped to reduce the cost of Amazon Redshift.

After a series of conversations, the AWS team found that one of the reasons that the customer keeps all workloads on the 12-node Amazon Redshift cluster is to manage the performance of queries from their BI platform, especially while running ETL workloads, which have a big impact on the performance of all workloads on the Amazon Redshift cluster. The obstacle is that many tables in the data warehouse are required to be read and written by multiple workloads, and only the producer of a data share can update the shared data.

The challenge of dividing the Amazon Redshift cluster into multiple clusters is data consistency. Some tables need to be read by ETL workloads and written by BI workloads, and some tables are the opposite. Therefore, if we duplicate data into two Amazon Redshift clusters or only create a data share from the BI cluster to the reporting cluster, the customer will have to develop a data synchronization process to keep the data consistent between all Amazon Redshift clusters, and this process could be very complicated and unmaintainable.

After more analysis to gain an in-depth understanding of the customer’s workloads, the AWS team found that we could put tables into four groups, and proposed a multi-cluster, two-way data sharing solution. The purpose of the solution is to divide the workloads into separate Amazon Redshift clusters so that we can use Amazon Redshift to pause and resume clusters for periodic workloads to reduce the Amazon Redshift running costs, because clusters can still access a single copy of data that is required for workloads. The solution should meet the data consistency requirements without building a complicated data synchronization process.

The following diagram illustrates the old architecture (left) compared to the new multi-cluster solution (right).

Improve the old architecture (left) to the new multi-cluster solution (right)

Dividing workloads and data

Due to the characteristics of the four major workloads, we categorized workloads into two categories: long-running workloads and periodic-running workloads.

The long-running workloads are for the BI platform and hourly ETL jobs. Because the hourly ETL workload requires about 40 minutes to run, the gain is small even if we migrate it to an isolated Amazon Redshift cluster and pause and resume it every hour. Therefore, we leave it with the BI platform.

The periodic-running workloads are the daily and weekly ETL jobs. The daily job generally takes about 1 hour and 40 minutes to 3 hours, and the weekly job generally takes 3–4 hours.

Data sharing plan

The next step is identifying all data (tables) access patterns of each category. We identified four types of tables:

  • Type 1 – Tables are only read and written by long-running workloads
  • Type 2 – Tables are read and written by long-running workloads, and are also read by periodic-running workloads
  • Type 3 – Tables are read and written by periodic-running workloads, and are also read by long-running workloads
  • Type 4 – Tables are only read and written by periodic-running workloads

Fortunately, there is no table that is required to be written by all workloads. Therefore, we can separate the Amazon Redshift cluster into two Amazon Redshift clusters: one for the long-running workloads, and the other for periodic-running workloads with 20 RA3 nodes.

We created a two-way data share between the long-running cluster and the periodic-running cluster. For type 2 tables, we created a data share on the long-running cluster as the producer and the periodic-running cluster as the consumer. For type 3 tables, we created a data share on the periodic-running cluster as the producer and the long-running cluster as the consumer.

The following diagram illustrates this data sharing configuration.

The long-running cluster (producer) shares type 2 tables to the periodic-running cluster (consumer). The periodic-running cluster (producer’) shares type 3 tables to the long-running cluster (consumer’)

Build two-way data share across Amazon Redshift clusters

In this section, we walk through the steps to build a two-way data share across Amazon Redshift clusters. First, let’s take a snapshot of the original Amazon Redshift cluster, which became the long-running cluster later.

Take a snapshot of the long-running-cluster from the Amazon Redshift console

Now, let’s create a new Amazon Redshift cluster with 20 RA3 nodes for periodic-running workloads. Then we migrate the type 3 and type 4 tables to the periodic-running cluster. Make sure you choose the ra3 node type. (Amazon Redshift Serverless supports data sharing too, and it becomes generally available in July 2022, so it is also an option now.)

Create the periodic-running-cluster. Make sure you select the ra3 node type.

Create the long-to-periodic data share

The next step is to create the long-to-periodic data share. Complete the following steps:

  1. On the periodic-running cluster, get the namespace by running the following query:
SELECT current_namespace;

Make sure record the namespace.

  1. On the long-running cluster, we run queries similar to the following:
CREATE DATASHARE ltop_share SET PUBLICACCESSIBLE TRUE;
ALTER DATASHARE ltop_share ADD SCHEMA public_long;
ALTER DATASHARE ltop_share ADD ALL TABLES IN SCHEMA public_long;
GRANT USAGE ON DATASHARE ltop_share TO NAMESPACE '[periodic-running-cluster-namespace]';
  1. We can validate the long-to-periodic data share using the following command:
SHOW datashares;
  1. After we validate the data share, we get the long-running cluster namespace with the following query:
SELECT current-namespace;

Make sure record the namespace.

  1. On the periodic-running cluster, run the following command to load the data from the long-to-periodic data share with the long-running cluster namespace:
CREATE DATABASE ltop FROM DATASHARE ltop_share OF NAMESPACE '[long-running-cluster-namespace]';
  1. Confirm that we have read access to tables in the long-to-periodic data share.

Create the periodic-to-long data share

The next step is to create the periodic-to-long data share. We use the namespaces of the long-running cluster and the periodic-running cluster that we collected in the previous step.

  1. On the periodic-running cluster, run queries similar to the following to create the periodic-to-long data share:
CREATE DATASHARE ptol_share SET PUBLICACCESSIBLE TRUE;
ALTER DATASHARE ptol_share ADD SCHEMA public_periodic;
ALTER DATASHARE ptol_share ADD ALL TABLES IN SCHEMA public_periodic;
GRANT USAGE ON DATASHARE ptol_share TO NAMESPACE '[long-running-cluster-namespace]';
  1. Validate the data share using the following command:
SHOW datashares;
  1. On the long-running cluster, run the following command to load the data from the periodic-to-long data using the periodic-running cluster namespace:
CREATE DATABASE ptol FROM DATASHARE ptol_share OF NAMESPACE '[periodic-running-cluster-namespace]';
  1. Check that we have read access to the tables in the periodic-to-long data share.

At this stage, we have separated workloads into two Amazon Redshift clusters and built a two-way data share across two Amazon Redshift clusters.

The next step is updating the code of different workloads to use the correct endpoints of two Amazon Redshift clusters and perform consolidated tests.

Pause and resume the periodic-running Amazon Redshift cluster

Let’s update the crontab scripts, which run periodic-running workloads. We make two updates.

  1. When the scripts start, call the Amazon Redshift check and resume cluster APIs to resume the periodic-running Amazon Redshift cluster when the cluster is paused:
    aws redshift resume-cluster --cluster-identifier [periodic-running-cluster-id]

  2. After the workloads are finished, call the Amazon Redshift pause cluster API with the cluster ID to pause the cluster:
    aws redshift pause-cluster --cluster-identifier [periodic-running-cluster-id]

Results

After we migrated the workloads to the new architecture, the company’s analytics team ran some tests to verify the results.

According to tests, the performance of all workloads improved:

  • The BI workload is about 100% faster during the ETL workload running periods
  • The hourly ETL workload is about 50% faster
  • The daily workload duration reduced to approximately 40 minutes, from a maximum of 3 hours
  • The weekly workload duration reduced to approximately 1.5 hours, from a maximum of 4 hours

All functionalities work properly, and cost of the new architecture only increased approximately 13%, while over 10% of new data had been added during the testing period.

Learnings and limitations

After we separated the workloads into different Amazon Redshift clusters, we discovered a few things:

  • The performance of the BI workloads was 100% faster because there was no resource competition with daily and weekly ETL workloads anymore
  • The duration of ETL workloads on the periodic-running cluster was reduced significantly because there were more nodes and no resource competition from the BI and hourly ETL workloads
  • Even when over 10% new data was added, the overall cost of the Amazon Redshift clusters only increased by 13%, due to using the cluster pause and resume function of the Amazon Redshift RA3 family

As a result, we saw a 70% price-performance improvement of the Amazon Redshift cluster.

However, there are some limitations of the solution:

  • To use the Amazon Redshift pause and resume function, the code for calling the Amazon Redshift pause and resume APIs must be added to all scheduled scripts that run ETL workloads on the periodic-running cluster
  • Amazon Redshift clusters require several minutes to finish pausing and resuming, although you’re not charged during these processes
  • The size of Amazon Redshift clusters can’t automatically scale in and out depending on workloads

Next steps

After improving performance significantly, we can explore the possibility of reducing the number of nodes of the long-running cluster to reduce Amazon Redshift costs.

Another possible optimization is using Amazon Redshift Spectrum to reduce the cost of Amazon Redshift on cluster storage. With Redshift Spectrum, multiple Amazon Redshift clusters can concurrently query and retrieve the same structured and semistructured dataset in Amazon Simple Storage Service (Amazon S3) without the need to make copies of the data for each cluster or having to load the data into Amazon Redshift tables.

Amazon Redshift Serverless was announced for preview in AWS re:Invent 2021 and became generally available in July 2022. Redshift Serverless automatically provisions and intelligently scales your data warehouse capacity to deliver best-in-class performance for all your analytics. You only pay for the compute used for the duration of the workloads on a per-second basis. You can benefit from this simplicity without making any changes to your existing analytics and BI applications. You can also share data for read purposes across different Amazon Redshift Serverless instances within or across AWS accounts.

Therefore, we can explore the possibility of removing the need to script for pausing and resuming the periodic-running cluster by using Redshift Serverless to make the management easier. We can also explore the possibility of improving the granularity of workloads.

Conclusion

In this post, we discussed how to optimize workloads on Amazon Redshift clusters using RA3 nodes, data sharing, and pausing and resuming clusters. We also explored a use case implementing a multi-cluster two-way data share solution to improve workload performance with a minimum code change. If you have any questions or feedback, please leave them in the comments section.


About the authors

Jingbin Ma

Jingbin Ma is a Sr. Solutions Architect at Amazon Web Services. He helps customers build well-architected applications using AWS services. He has many years of experience working in the internet industry, and his last role was CTO of a New Zealand IT company before joining AWS. He is passionate about serverless and infrastructure as code.

Chao PanChao Pan is a Data Analytics Solutions Architect at Amazon Web Services. He’s responsible for the consultation and design of customers’ big data solution architectures. He has extensive experience in open-source big data. Outside of work, he enjoys hiking.

Bias in the machine: How can we address gender bias in AI?

Post Syndicated from Sue Sentance original https://www.raspberrypi.org/blog/gender-bias-in-ai-machine-learning-biased-data/

At the Raspberry Pi Foundation, we’ve been thinking about questions relating to artificial intelligence (AI) education and data science education for several months now, inviting experts to share their perspectives in a series of very well-attended seminars. At the same time, we’ve been running a programme of research trials to find out what interventions in school might successfully improve gender balance in computing. We’re learning a lot, and one primary lesson is that these topics are not discrete: there are relationships between them.

We can’t talk about AI education — or computer science education more generally — without considering the context in which we deliver it, and the societal issues surrounding computing, AI, and data. For this International Women’s Day, I’m writing about the intersection of AI and gender, particularly with respect to gender bias in machine learning.

The quest for gender equality

Gender inequality is everywhere, and researchers, activists, and initiatives, and governments themselves, have struggled since the 1960s to tackle it. As women and girls around the world continue to suffer from discrimination, the United Nations has pledged, in its Sustainable Development Goals, to achieve gender equality and to empower all women and girls.

While progress has been made, new developments in technology may be threatening to undo this. As Susan Leahy, a machine learning researcher from the Insight Centre for Data Analytics, puts it:

Artificial intelligence is increasingly influencing the opinions and behaviour of people in everyday life. However, the over-representation of men in the design of these technologies could quietly undo decades of advances in gender equality.

Susan Leavy, 2018 [1]

Gender-biased data

In her 2019 award-winning book Invisible Women: Exploring Data Bias in a World Designed for Men [2], Caroline Ceriado Perez discusses the effects of gender-biased data. She describes, for example, how the designs of cities, workplaces, smartphones, and even crash test dummies are all based on data gathered from men. She also discusses that medical research has historically been conducted by men, on male bodies.

Looking at this problem from a different angle, researcher Mayra Buvinic and her colleagues highlight that in most countries of the world, there are no sources of data that capture the differences between male and female participation in civil society organisations, or in local advisory or decision making bodies [3]. A lack of data about girls and women will surely impact decision making negatively. 

Bias in machine learning

Machine learning (ML) is a type of artificial intelligence technology that relies on vast datasets for training. ML is currently being use in various systems for automated decision making. Bias in datasets for training ML models can be caused in several ways. For example, datasets can be biased because they are incomplete or skewed (as is the case in datasets which lack data about women). Another example is that datasets can be biased because of the use of incorrect labels by people who annotate the data. Annotating data is necessary for supervised learning, where machine learning models are trained to categorise data into categories decided upon by people (e.g. pineapples and mangoes).

A banana, a glass flask, and a potted plant on a white surface. Each object is surrounded by a white rectangular frame with a label identifying the object.
Max Gruber / Better Images of AI / Banana / Plant / Flask / CC-BY 4.0

In order for a machine learning model to categorise new data appropriately, it needs to be trained with data that is gathered from everyone, and is, in the case of supervised learning, annotated without bias. Failing to do this creates a biased ML model. Bias has been demonstrated in different types of AI systems that have been released as products. For example:

Facial recognition: AI researcher Joy Buolamwini discovered that existing AI facial recognition systems do not identify dark-skinned and female faces accurately. Her discovery, and her work to push for the first-ever piece of legislation in the USA to govern against bias in the algorithms that impact our lives, is narrated in the 2020 documentary Coded Bias

Natural language processing: Imagine an AI system that is tasked with filling in the missing word in “Man is to king as woman is to X” comes up with “queen”. But what if the system completes “Man is to software developer as woman is to X” with “secretary” or some other word that reflects stereotypical views of gender and careers? AI models called word embeddings learn by identifying patterns in huge collections of texts. In addition to the structural patterns of the text language, word embeddings learn human biases expressed in the texts. You can read more about this issue in this Brookings Institute report

Not noticing

There is much debate about the level of bias in systems using artificial intelligence, and some AI researchers worry that this will cause distrust in machine learning systems. Thus, some scientists are keen to emphasise the breadth of their training data across the genders. However, other researchers point out that despite all good intentions, gender disparities are so entrenched in society that we literally are not aware of all of them. White and male dominance in our society may be so unconsciously prevalent that we don’t notice all its effects.

Three women discuss something while looking at a laptop screen.

As sociologist Pierre Bourdieu famously asserted in 1977: “What is essential goes without saying because it comes without saying: the tradition is silent, not least about itself as a tradition.” [4]. This view holds that people’s experiences are deeply, or completely, shaped by social conventions, even those conventions that are biased. That means we cannot be sure we have accounted for all disparities when collecting data.

What is being done in the AI sector to address bias?

Developers and researchers of AI systems have been trying to establish rules for how to avoid bias in AI models. An example rule set is given in an article in the Harvard Business Review, which describes the fact that speech recognition systems originally performed poorly for female speakers as opposed to male ones, because systems analysed and modelled speech for taller speakers with longer vocal cords and lower-pitched voices (typically men).

A women looks at a computer screen.

The article recommends four ways for people who work in machine learning to try to avoid gender bias:

  • Ensure diversity in the training data (in the example from the article, including as many female audio samples as male ones)
  • Ensure that a diverse group of people labels the training data
  • Measure the accuracy of a ML model separately for different demographic categories to check whether the model is biased against some demographic categories
  • Establish techniques to encourage ML models towards unbiased results

What can everybody else do?

The above points can help people in the AI industry, which is of course important — but what about the rest of us? It’s important to raise awareness of the issues around gender data bias and AI lest we find out too late that we are reintroducing gender inequalities we have fought so hard to remove. Awareness is a good start, and some other suggestions, drawn out from others’ work in this area are:

Improve the gender balance in the AI workforce

Having more women in AI and data science, particularly in both technical and leadership roles, will help to reduce gender bias. A 2020 report by the World Economic Forum (WEF) on gender parity found that women account for only 26% of data and AI positions in the workforce. The WEF suggests five ways in which the AI workforce gender balance could be addressed:

  1. Support STEM education
  2. Showcase female AI trailblazers
  3. Mentor women for leadership roles
  4. Create equal opportunities
  5. Ensure a gender-equal reward system

Ensure the collection of and access to high-quality and up-to-date gender data

We need high-quality dataset on women and girls, with good coverage, including country coverage. Data needs to be comparable across countries in terms of concepts, definitions, and measures. Data should have both complexity and granularity, so it can be cross-tabulated and disaggregated, following the recommendations from the Data2x project on mapping gender data gaps.

A woman works at a multi-screen computer setup on a desk.

Educate young people about AI

At the Raspberry Pi Foundation we believe that introducing some of the potential (positive and negative) impacts of AI systems to young people through their school education may help to build awareness and understanding at a young age. The jury is out on what exactly to teach in AI education, and how to teach it. But we think educating young people about new and future technologies can help them to see AI-related work opportunities as being open to all, and to develop critical and ethical thinking.

Three teenage girls at a laptop

In our AI education seminars we heard a number of perspectives on this topic, and you can revisit the videos, presentation slides, and blog posts. We’ve also been curating a list of resources that can help to further AI education — although there is a long way to go until we understand this area fully. 

We’d love to hear your thoughts on this topic.


References

[1] Leavy, S. (2018). Gender bias in artificial intelligence: The need for diversity and gender theory in machine learning. Proceedings of the 1st International Workshop on Gender Equality in Software Engineering, 14–16.

[2] Perez, C. C. (2019). Invisible Women: Exploring Data Bias in a World Designed for Men. Random House.

[3] Buvinic M., Levine R. (2016). Closing the gender data gap. Significance 13(2):34–37 

[4] Bourdieu, P. (1977). Outline of a Theory of Practice (No. 16). Cambridge University Press. (p.167)

The post Bias in the machine: How can we address gender bias in AI? appeared first on Raspberry Pi.