Tag Archives: Analytics

Migrate a petabyte-scale data warehouse from Actian Vectorwise to Amazon Redshift

Post Syndicated from Krishna Gogineni original https://aws.amazon.com/blogs/big-data/migrate-a-petabyte-scale-data-warehouse-from-actian-vectorwise-to-amazon-redshift/

Amazon Redshift is a fast, scalable, and fully managed cloud data warehouse that allows you to process and run your complex SQL analytics workloads on structured and semi-structured data. It also helps you securely access your data in operational databases, data lakes, or third-party datasets with minimal movement or copying of data. Tens of thousands of customers use Amazon Redshift to process large amounts of data, modernize their data analytics workloads, and provide insights for their business users.

In this post, we discuss how a financial services industry customer achieved scalability, resiliency, and availability by migrating from an on-premises Actian Vectorwise data warehouse to Amazon Redshift.

Challenges

The customer’s use case required a high-performing, highly available, and scalable data warehouse to process queries against large datasets in a low-latency environment. Their Actian Vectorwise system was designed to replace Excel plugins and stock screeners but eventually evolved into a much larger and ambitious portfolio analysis solution running multiple API clusters on premises, serving some of the largest financial services firms worldwide. The customer saw growing demand that needed high performance and scalability due to 30% year-over-year increase in usage from the success of their products. The customer needed to keep up with increased volume of read requests, but they couldn’t do this without deploying additional hardware in the data center. There was also a customer mandate that business-critical products must have their hardware updated to cloud-based solutions or be deemed on the path to obsolescence. In addition, the business started moving customers onto a new commercial model, and therefore new projects would need to provision a new cluster, which meant that they needed improved performance, scalability, and availability.

They faced the following challenges:

  • Scalability – The customer understood that infrastructure maintenance was a growing issue and, although operations were a consideration, the existing implementation didn’t have a scalable and efficient solution to meet the advanced sharding requirements needed for query, reporting, and analysis. Over-provisioning of data warehouse capacity to meet unpredictable workloads resulted in underutilized capacity during normal operations by 30%.
  • Availability and resiliency – Because the customer was running business-critical analytical workloads, it required the highest levels of availability and resiliency, which was a concern with the on-premises data warehouse solution.
  • Performance – Some of their queries needed to be processed in priority, and users were starting to experience performance degradation with longer-running query times as their solution started getting used more and more. The need for a scalable and efficient solution to manage customer demand, address infrastructure maintenance concerns, replace legacy tooling, and tackle availability led to them choosing Amazon Redshift as the future state solution. If these concerns were not addressed, the customer would be prevented from growing their user base.

Legacy architecture

The customer’s platform was the main source for one-time, batch, and content processing. It served many enterprise use cases across API feeds, content mastering, and analytics interfaces. It was also the single strategic platform within the company for entity screening, on-the-fly aggregation, and other one-time, complex request workflows.

The following diagram illustrates the legacy architecture.

The architecture consists of many layers:

  • Rules engine – The rules engine was responsible for intercepting every incoming request. Based on the nature of the request, it routed the request to the API cluster that could optimally process that specific request based on the response time requirement.
  • API – Scalability was one of the primary challenges with the existing on-premises system. It wasn’t possible to quickly scale up and down API service capacity to meet growing business demand. Both the API and data store had to support a highly volatile workload pattern. This included simple data retrieval requests that had to be processed within a few milliseconds vs. power user-style batch requests with complex analytics-based workloads that could take several seconds and significant compute resources to process. To separate these different workload patterns, the API and data store infrastructure was split into multiple isolated physical clusters. This made sure each workload group was provisioned with sufficient reserved capacity to meet the respective response time expectations. However, this model of reserving capacity for each workload type resulted in suboptimal usage of compute resources because each cluster would only process a specific workload type.
  • Data store – The data store used a custom data model that had been highly optimized to meet low-latency query response requirements. The current on-premises data store wasn’t horizontally scalable, and there was no built-in replication or data sharding capability. Due to this limitation, multiple database instances were created to meet concurrent scalability and availability requirements because the schema wasn’t generic per dataset. This model caused operational maintenance overhead and wasn’t easily expandable.
  • Data ingestion – Pentaho was used to ingest data sourced from multiple data publishers into the data store. The ingestion framework itself didn’t have any major challenges. However, the primary bottleneck was due to scalability issues associated with the data store. Because the data store didn’t support sharding or replication, data ingestion had to explicitly ingest the same data concurrently across multiple database nodes within a single transaction to provide data consistency. This significantly impacted overall ingestion speed.

Overall, the current architecture didn’t support workload prioritization, therefore a physical model of resources was reserved for this reason. The downside here is over-provisioning. The system had an integration with legacy backend services that were all hosted on premises.

Solution overview

Amazon Redshift is an industry-leading cloud data warehouse. Amazon Redshift uses SQL to analyze structured and semi-structured data across data warehouses, operational databases, and data lakes using AWS-designed hardware and machine learning (ML) to deliver the best price-performance at any scale.

Amazon Redshift is designed for high-performance data warehousing, which provides fast query processing and scalable storage to handle large volumes of data efficiently. Its columnar storage format minimizes I/O and improves query performance by reading only the relevant data needed for each query, resulting in faster data retrieval. Lastly, you can integrate Amazon Redshift with data lakes like Amazon Simple Storage Service (Amazon S3), combining structured and semi-structured data for comprehensive analytics.

The following diagram illustrates the architecture of the new solution.

In the following sections, we discuss the features of this solution and how it addresses the challenges of the legacy architecture.

Rules engine and API

Amazon API Gateway is a fully managed service that help developers deliver secure, robust, API-driven application backends at any scale. To address scalability and availability requirements of the rules and routing layer, we introduced API Gateway to do the routing of the client requests to different integration paths using routes and parameter mappings. Having API Gateway as the entry point allowed the customer to move away from the design, testing, and maintenance of their rules engine development workload. In their legacy environment, handling fluctuating amounts of traffic posed a significant challenge. However, API Gateway seamlessly addressed this issue by acting as a proxy and automatically scaling to accommodate varying traffic demands, providing optimal performance and reliability.

Data storage and processing

Amazon Redshift allowed the customer to meet their scalability and performance requirements. Amazon Redshift features such as workload management (WLM), massively parallel processing (MPP) architecture, concurrency scaling, and parameter groups helped address the requirements:

  • WLM provided the ability for query prioritization and managing resources effectively
  • The MPP architecture model provided horizontal scalability
  • Concurrency scaling added additional cluster capacity to handle unpredictable and spiky workloads
  • Parameter groups defined configuration parameters that control database behavior

Together, these capabilities allowed them to meet their scalability and performance requirements in a managed fashion.

Data distribution

The legacy data center architecture was unable to partition the data without deploying additional hardware in the data center, and it couldn’t handle read workloads efficiently.

The MPP architecture of Amazon Redshift offers efficient data distribution across all the compute nodes, which helped run heavy workloads in parallel and subsequently lowered response times. With the data distributed across all the compute nodes, it allows data to be processed in parallel. Its MPP engine and architecture separates compute and storage for efficient scaling and performance.

Operational efficiency and hygiene

Infrastructure maintenance and operational efficiency was a concern for the customer in their current state architecture. Amazon Redshift is a fully managed service that takes care of data warehouse management tasks such as hardware provisioning, software patching, setup, configuration, and monitoring nodes and drives to recover from failures or backups. Amazon Redshift periodically performs maintenance to apply fixes, enhancements, and new features to your Redshift data warehouse. As a result, the customer’s operational costs reduced by 500%, and they are now able to spend more time innovating and building mission-critical applications.

Workload management

Amazon Redshift WLM was able to resolve issues with the legacy architecture where longer-running queries were consuming all the resources, causing other queries to run slower, impacting performance SLAs. With automatic WLM, the customer was able to create separate WLM queues with different priorities, which allowed them to manage the priorities for the critical SLA-bound workloads and other non-critical workloads. With short query acceleration (SQA) enabled, it prioritized selected short-running queries ahead of longer-running queries. Furthermore, the customer benefited by using query monitoring rules in WLM to apply performance boundaries to control poorly designed queries and take action when a query goes beyond those boundaries. To learn more about WLM, refer to Implementing workload management.

Workload isolation

In the legacy architecture, all the workloads—extract, transform, and load (ETL); business intelligence (BI); and one-time workloads—were running on the same on-premises data warehouse, leading to the noisy neighbor problem and performance issues with the increase in users and workloads.

With the new solution architecture, this issue is remediated using data sharing in Amazon Redshift. With data sharing, the customer is able to share live data with security and ease across Redshift clusters, AWS accounts, or AWS Regions for read purposes, without the need to copy any data.

Data sharing improved the agility of the customer’s organization. It does this by giving them instant, granular, and high-performance access to data across Redshift clusters without the need to copy or move it manually. With data sharing, customers have live access to data, so their users can see the most up-to-date and consistent information as it’s updated in Redshift clusters. Data sharing provides workload isolation by running ETL workloads in its own Redshift cluster and sharing data with other BI and analytical workloads in their respective Redshift clusters.

Scalability

With the legacy architecture, the customer was facing scalability challenges during large events to handle unpredictable spiky workloads and over-provisioning of the database capacity. Using concurrency scaling and elastic resize allowed the customer to meet their scalability requirements and handle unpredictable and spiky workloads.

Data migration to Amazon Redshift

The customer used a home-grown process to extract the data from Actian Vectorwise and store it in Amazon S3 and CSV files. The data from Amazon S3 was then ingested into Amazon Redshift.

The loading process used a COPY command and ingested the data from Amazon S3 in a fast and efficient way. A best practice for loading data into Amazon Redshift is to use the COPY command. The COPY command is the most efficient way to load a table because it uses the Amazon Redshift MPP architecture to read and load data in parallel from a file or multiple files in an S3 bucket.

To learn about the best practices for source data files to load using the COPY command, see Loading data files.

After the data is ingested into Redshift staging tables from Amazon S3, transformation jobs are run from Pentaho to apply the incremental changes to the final reporting tables.

The following diagram illustrates this workflow.

Key considerations for the migration

There are three ways of migrating an on-premises data warehouse to Amazon Redshift: one-step, two-step, and wave-based migration. To minimize the risk of migrating over 20 databases that vary in complexity, we decided on the wave-based approach. The fundamental concept behind wave-based migration involves dividing the migration program into projects based on factors such as complexity and business outcomes. The implementation then migrates each project individually or by combining certain projects into a wave. Subsequent waves follow, which may or may not be dependent on the results of the preceding wave.

This strategy requires both the legacy data warehouse and Amazon Redshift to operate concurrently until the migration and validation of all workloads are successfully complete. This provides a smooth transition while making sure the on-premises infrastructure can be retired only after thorough migration and validation have taken place.

In addition, within each wave, we followed a set of phases to make sure that each wave was successful:

  • Assess and plan
  • Design the Amazon Redshift environment
  • Migrate the data
  • Test and validate
  • Perform cutover and optimizations

In the process, we didn’t want to rewrite the legacy code for each migration. With minimal code changes, we migrated the data to Amazon Redshift because SQL compatibility was very important in the process due to existing knowledge within the organization and downstream application consumption. After the data was ingested into the Redshift cluster, we adjusted the tables for best performance.

One of the main benefits we realized as part of the migration was the option to integrate data in Amazon Redshift with other business groups in the future that use AWS Data Exchange, without significant effort.

We performed blue/green deployments to make sure that the end-users didn’t encounter any latency degradation while retrieving the data. We migrated the end-users in a phased manner to measure the impact and adjust the cluster configuration as needed.

Results

The customer’s decision to use Amazon Redshift for their solution was further reinforced by the platform’s ability to handle both structured and semi-structured data seamlessly. Amazon Redshift allows the customer to efficiently analyze and derive valuable insights from their diverse range of datasets, including equities and institutional data, all while using standard SQL commands that teams are already comfortable with.

Through rigorous testing, Amazon Redshift consistently demonstrated remarkable performance, meeting the customer’s stringent SLAs and delivering exceptional subsecond query response times with an impressive latency. With the AWS migration, the customer achieved a 5% improvement in query performance. Scalability of the clusters was done in minutes compared to 6 months in the data center. Operational cost reduced by 500% due to the simplicity of the Redshift cluster operations in AWS. Stability of the clusters improved by 100%. Upgrades and patching cycle time improved by 200%. Overall, improvement in operational posture and total savings for the footprint has resulted in significant savings for the team and platform in general. In addition, the ability to scale the overall architecture based on market data trends in a resilient and highly available way not only met the customer demand in terms of time to market, but also significantly reduced the operational costs and total cost of ownership.

Conclusion

In this post, we covered how a large financial services customer improved performance and scalability, and reduced their operational costs by migrating to Amazon Redshift. This enabled the customer to grow and onboard new workloads into Amazon Redshift for their business-critical applications.

To learn about other migration use cases, refer to the following:


About the Authors

Krishna Gogineni is a Principal Solutions Architect at AWS helping financial services customers. Krishna is Cloud-Native Architecture evangelist helping customers transform the way they build software. Krishna works with customers to learn their unique business goals, and then super-charge their ability to meet these goals through software delivery that leverages industry best practices/tools such as DevOps, Data Lakes, Data Analytics, Microservices, Containers, and Continuous Integration/Continuous Delivery.

Dayananda Shenoy is a Senior Solution Architect with over 20 years of experience designing and architecting backend services for financial services products. Currently, he leads the design and architecture of distributed, high-performance, low latency analytics services for a data provider. He is passionate about solving scalability and performance challenges in distributed systems leveraging emerging technology which improve existing tech stacks and add value to the business to enhance customer experience.

Vishal Balani is a Sr. Customer Solutions Manager based out of New York. He works closely with Financial Services customers to help them leverage cloud for businesses agility, innovation and resiliency. He has extensive experience leading large-scale cloud migration programs. Outside of work he enjoys spending time with family, tinkering with a new project or riding his bike.

Ranjan Burman is a Sr. PostgreSQL Database Specialist SA. He specializes in RDS & Aurora PostgreSQL. He has more than 18 years of experience in different database and data warehousing technologies. He is passionate about automating and solving customer problems with the use of cloud solutions.

Muthuvelan Swaminathan is an Enterprise Solutions Architect based out of New York. He works with enterprise customers providing architectural guidance in building resilient, cost-effective and innovative solutions that address business needs.

Introducing support for Apache Kafka on Raft mode (KRaft) with Amazon MSK clusters

Post Syndicated from Kalyan Janaki original https://aws.amazon.com/blogs/big-data/introducing-support-for-apache-kafka-on-raft-mode-kraft-with-amazon-msk-clusters/

Organizations are adopting Apache Kafka and Amazon Managed Streaming for Apache Kafka (Amazon MSK) to capture and analyze data in real time. Amazon MSK helps you build and run production applications on Apache Kafka without needing Kafka infrastructure management expertise or having to deal with the complex overhead associated with setting up and running Apache Kafka on your own. Since its inception, Apache Kafka has depended on Apache Zookeeper for storing and replicating the metadata of Kafka brokers and topics. Starting from Apache Kafka version 3.3, the Kafka community has adopted KRaft (Apache Kafka on Raft), a consensus protocol, to replace Kafka’s dependency on ZooKeeper for metadata management. In the future, the Apache Kafka community plans to remove the ZooKeeper mode entirely.

Today, we’re excited to launch support for KRaft on new clusters on Amazon MSK starting from version 3.7. In this post, we walk you through some details around how KRaft mode helps over the ZooKeeper approach. We also guide you through the process of creating MSK clusters with KRaft mode and how to connect your application to MSK clusters with KRaft mode.

Why was ZooKeeper replaced with KRaft mode

The traditional Kafka architecture relies on ZooKeeper as the authoritative source for cluster metadata. Read and write access to metadata in ZooKeeper is funneled through a single Kafka controller. For clusters with a large number of partitions, this architecture can create a bottleneck during scenarios such as an uncontrolled broker shutdown or controller failover, due to a single-controller approach.

KRaft mode addresses these limitations by managing metadata within the Kafka cluster itself. Instead of relying on a separate ZooKeeper cluster, KRaft mode stores and replicates the cluster metadata across multiple Kafka controller nodes, forming a metadata quorum. The KRaft controller nodes comprise a Raft quorum that manages the Kafka metadata log. By distributing the metadata management responsibilities across multiple controller nodes, KRaft mode improves recovery time for scenarios such as uncontrolled broker shutdown or controller failover. For more details on KRaft mode and its implementation, refer to the KIP-500: Replace ZooKeeper with a Self-Managed Metadata Quorum.

The following figure compares the three-node MSK cluster architecture with ZooKeeper vs. KRaft mode.

Amazon MSK with KRaft mode

Until now, Amazon MSK has supported Kafka clusters that rely on ZooKeeper for metadata management. One of the key benefits of Amazon MSK is that it handles the complexity of setting up and managing the ZooKeeper cluster at no additional cost. Many organizations use Amazon MSK to run large, business-critical streaming applications that require splitting their traffic across thousands of partitions. As the size of a Kafka cluster grows, the amount of metadata generated within the cluster increases proportionally to the number of partitions.

Two key properties govern the number of partitions a Kafka cluster can support: the per-node partition count limit and the cluster-wide partition limit. As mentioned earlier, the metadata management system based on ZooKeeper imposed a bottleneck on the cluster-wide partition limitation in Apache Kafka. However, with the introduction of KRaft mode in Amazon MSK starting with version 3.7, Amazon MSK now enables the creation of clusters with up to 60 brokers vs. the default quota of 30 brokers in ZooKeeper mode. Kafka’s scalability still fundamentally relies on expanding the cluster by adding more nodes to increase overall capacity. Consequently, the cluster-wide partition limit continues to define the upper bounds of scalability within the Kafka system, because it determines the maximum number of partitions that can be distributed across the available nodes. Amazon MSK manages the KRaft controller nodes at no additional cost.

Create and access an MSK cluster with KRaft mode

Complete the following steps to configure an MSK cluster with KRaft mode:

  1. On the Amazon MSK console, choose Clusters in the navigation pane.
  2. Choose Create cluster.
  3. For Cluster creation method, select Custom create.
  4. For Cluster name, enter a name.
  5. For Cluster type¸ select Provisioned.
  6. For Apache Kafka version, choose 3.7.x.
  7. For Metadata mode, select KRaft.
  8. Leave the other settings as default and choose Create cluster.

When the cluster creation is successful, you can navigate to the cluster and choose View client integration information, which will provide details about the cluster bootstrap servers.

Adapt your client applications and tools for accessing MSK clusters with KRaft mode

With the adoption of KRaft mode in Amazon MSK, customers using client applications and tools that connect to ZooKeeper to interact with MSK clusters will need to update them to reflect the removal of ZooKeeper from the architecture. Starting with version 1.0, Kafka introduced the ability for admin tools to use the bootstrap servers (brokers) as input parameters instead of a ZooKeeper connection string, and started deprecating ZooKeeper connection strings starting with version 2.5. This change was part of the efforts to decouple Kafka from ZooKeeper and pave the way for its eventual replacement with KRaft mode for metadata management. Instead of specifying the ZooKeeper connection string, clients will need to use the bootstrap.servers configuration option to connect directly to the Kafka brokers. The following table summarizes these changes.

. With Zookeeper With KRaft
Client and Services bootstrap.servers=broker:<port> or zookeeper.connect=zookeeper:2181 (deprecated) bootstrap.servers=broker:<port>
Admin Tools kafka-topics --zookeeper zookeeper:2181 (deprecated) or kafka-topics —bootstrap-server broker:<port> … —command-config kafka-topics —bootstrap-server broker:<port> … —command-config

Summary

In this post, we discussed how Amazon MSK has launched support for KRaft mode for metadata management. We also described how KRaft works and how it’s different from ZooKeeper.

To get started, create a new cluster with KRaft mode using the AWS Management Console, and refer to the Amazon MSK Developer Guide for more information.


About the author

Kalyan Janaki is Senior Big Data & Analytics Specialist with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

AWS analytics services streamline user access to data, permissions setting, and auditing

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/aws-analytics-services-streamline-user-access-to-data-permissions-setting-and-auditing/

I am pleased to announce a new use case based on trusted identity propagation, a recently introduced capability of AWS IAM Identity Center.

Tableau, a commonly used business intelligence (BI) application, can now propagate end-user identity down to Amazon Redshift. This has a triple benefit. It simplifies the sign-in experience for end users. It allows data owners to define access based on real end-user identity. It allows auditors to verify data access by users.

Trusted identity propagation allows applications that consume data (such as Tableau, Amazon QuickSight, Amazon Redshift Query Editor, Amazon EMR Studio, and others) to propagate the user’s identity and group memberships to the services that store and manage access to the data, such as Amazon Redshift, Amazon Athena, Amazon Simple Storage Service (Amazon S3), Amazon EMR, and others. Trusted identity propagation is a capability of IAM Identity Center that improves the sign-in experience across multiple analytics applications, simplifies data access management, and simplifies audit. End users benefit from single sign-on and do not have to specify the IAM roles they want to assume to connect to the system.

Before diving into more details, let’s agree on terminology.

I use the term “identity providers” to refer to the systems that hold user identities and group memberships. These are the systems that prompt the user for credentials and perform the authentication. For example, Azure Directory, Okta, Ping Identity, and more. Check the full list of identity providers we support.

I use the term “user-facing applications” to designate the applications that consume data, such as Tableau, Microsoft PowerBI, QuickSight, Amazon Redshift Query Editor, and others.

And finally, when I write “downstream services”, I refer to the analytics engines and storage services that process, store, or manage access to your data: Amazon Redshift, Athena, S3, EMR, and others.

Trusted Identity Propagation - high-level diagram

To understand the benefit of trusted identity propagation, let’s briefly talk about how data access was granted until today. When a user-facing application accesses data from a downstream service, either the upstream service uses generic credentials (such as “tableau_user“) or assumes an IAM role to authenticate against the downstream service. This is the source of two challenges.

First, it makes it difficult for the downstream service administrator to define access policies that are fine-tuned for the actual user making the request. As seen from the downstream service, all requests originate from that common user or IAM role. If Jeff and Jane are both mapped to the BusinessAnalytics IAM role, then it is not possible to give them different levels of access, for example, readonly and read-write. Furthermore, if Jeff is also in the Finance group, he needs to choose a role in which to operate; he cannot access data from both groups in the same session.

Secondly, the task of associating a data-access event to an end user involves some undifferentiated heavy lifting. If the request originates from an IAM role called BusinessAnalytics, then additional work is required to figure out which user was behind that action.

Well, this particular example might look very simple, but in real life, organizations have hundreds of users and thousands of groups to match to hundreds of datasets. There was an opportunity for us to Invent and Simplify.

Once configured, the new trusted identity propagation provides a technical mechanism for user-facing applications to access data on behalf of the actual user behind the keyboard. Knowing the actual user identity offers three main advantages.

First, it allows downstream service administrators to create and manage access policies based on actual user identities, the groups they belong to, or a combination of the two. Downstream service administrators can now assign access in terms of users, groups, and datasets. This is the way most of our customers naturally think about access to data—intermediate mappings to IAM roles are no longer necessary to achieve these patterns.

Second, auditors now have access to the original user identity in system logs and can verify that policies are implemented correctly and follow all requirements of the company or industry-level policies.

Third, users of BI applications can benefit from single sign-on between applications. Your end-users no longer need to understand your company’s AWS accounts and IAM roles. Instead, they can sign in to EMR Studio (for example) using their corporate single sign-on that they’re used to for so many other things they do at work.

How does trusted identity propagation work?
Trusted identity propagation relies on standard mechanisms from our industry: OAuth2 and JWT. OAuth2 is an open standard for access delegation that allows users to grant third-party user-facing applications access to data on other services (downstream services) without exposing their credentials. JWT (JSON Web Token) is a compact, URL-safe means of representing identities and claims to be transferred between two parties. JWTs are signed, which means their integrity and authenticity can be verified.

How to configure trusted identity propagation
Configuring trusted identity propagation requires setup in IAM Identity Center, at the user-facing application, and at the downstream service because each of these needs to be told to work with end-user identities. Although the particulars will be different for each application, they will all follow this pattern:

  1. Configure an identity source in AWS IAM Identity Center. AWS recommends enabling automated provisioning if your identity provider supports it, as most do. Automated provisioning works through the SCIM synchronization standard to synchronize your directory users and groups into IAM Identity Center. You probably have configured this already if you currently use IAM Identity Center to federate your workforce into the AWS Management Console. This is a one-time configuration, and you don’t have to repeat this step for each user-facing application.
  2. Configure your user-facing application to authenticate its users with your identity provider. For example, configure Tableau to use Okta.
  3. Configure the connection between the user-facing application and the downstream service. For example, configure Tableau to access Amazon Redshift. In some cases, it requires using the ODBC or JDBC driver for Redshift.

Then comes the configuration specific to trusted identity propagation. For example, imagine your organization has developed a user-facing web application that authenticates the users with your identity provider, and that you want to access data in AWS on behalf of the current authenticated user. For this use case, you would create a trusted token issuer in IAM Identity Center. This powerful new construct gives you a way to map your application’s authenticated users to the users in your IAM Identity Center directory so that it can make use of trusted identity propagation. My colleague Becky wrote a blog post to show you how to develop such an application. This additional configuration is required only when using third-party applications, such as Tableau, or a customer-developed application, that authenticate outside of AWS. When using user-facing applications managed by AWS, such as Amazon QuickSight, no further setup is required.

setup an external IdP to issue trusted token

Finally, downstream service administrators must configure the access policies based on the user identity and group memberships. The exact configuration varies from one downstream service to the other. If the application reads or writes data in Amazon S3, the data owner may use S3 Access Grants in the Amazon S3 console to grant access for users and groups to prefixes in Amazon S3. If the application makes queries to an Amazon Redshift data warehouse, the data owner must configure IAM Identity Center trusted connection in the Amazon Redshift console and match the audience claim (aud) from the identity provider.

Now that you have a high-level overview of the configuration, let’s dive into the most important part: the user experience.

The end-user experience
Although the precise experience of the end user will obviously be different for different applications, in all cases, it will be simpler and more familiar to workforce users than before. The user interaction will begin with a redirect-based authentication single sign-on flow that takes the user to their identity provider, where they can sign in with credentials, multi-factor authentication, and so on.

Let’s look at the details of how an end user might interact with Okta and Tableau when trusted identity propagation has been configured.

Here is an illustration of the flow and the main interactions between systems and services.

Trusted Identity Propagation flow

Here’s how it goes.

1. As a user, I attempt to sign in to Tableau.

2. Tableau initiates a browser-based flow and redirects to the Okta sign-in page where I can enter my sign-in credentials. On successful authentication, Okta issues an authentication token (ID and access token) to Tableau.

3. Tableau initiates a JDBC connection with Amazon Redshift and includes the access token in the connection request. The Amazon Redshift JDBC driver makes a call to Amazon Redshift. Because your Amazon Redshift administrator enabled IAM Identity Center, Amazon Redshift forwards the access token to IAM Identity Center.

4. IAM Identity Center verifies and validates the access token and exchange the access token for an Identity Center issued token.

5. Amazon Redshift will resolve the Identity Center token to determine the corresponding Identity Center user and authorize access to the resource. Upon successful authorization, I can connect from Tableau to Amazon Redshift.

Once authenticated, I can start to use Tableau as usual.

Trusted Identity Propagation - Tableau usage

And when I connect to Amazon Redshift Query Editor, I can observe the sys_query_history table to check who was the user who made the query. It correctly reports awsidc:<email address>, the Okta email address I used when I connected from Tableau.

Trusted Identity Propagation - audit in Redshift

You can read Tableau’s documentation for more details about this configuration.

Pricing and availability
Trusted identity propagation is provided at no additional cost in the 26 AWS Regions where AWS IAM Identity Center is available today.

Here are more details about trusted identity propagation and downstream service configurations.

Happy reading!

With trusted identity propagation, you can now configure analytics systems to propagate the actual user identity, group membership, and attributes to AWS services such as Amazon Redshift, Amazon Athena, or Amazon S3. It simplifies the management of access policies on these services. It also allows auditors to verify your organization’s compliance posture to know the real identity of users accessing data.

Get started now and configure your Tableau integration with Amazon Redshift.

— seb

PS: Writing a blog post at AWS is always a team effort, even when you see only one name under the post title. In this case, I want to thank Eva Mineva, Laura Reith, and Roberto Migli for their much-appreciated help in understanding the many subtleties and technical details of trusted identity propagation.

Introducing Amazon EMR on EKS with Apache Flink: A scalable, reliable, and efficient data processing platform

Post Syndicated from Kinnar Kumar Sen original https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-on-eks-with-apache-flink-a-scalable-reliable-and-efficient-data-processing-platform/

AWS recently announced that Apache Flink is generally available for Amazon EMR on Amazon Elastic Kubernetes Service (EKS). Apache Flink is a scalable, reliable, and efficient data processing framework that handles real-time streaming and batch workloads (but is most commonly used for real-time streaming). Amazon EMR on EKS is a deployment option for Amazon EMR that allows you to run open source big data frameworks such as Apache Spark and Flink on Amazon Elastic Kubernetes Service (Amazon EKS) clusters with the EMR runtime. With the addition of Flink support in EMR on EKS, you can now run your Flink applications on Amazon EKS using the EMR runtime and benefit from both services to deploy, scale, and operate Flink applications more efficiently and securely.

In this post, we introduce the features of EMR on EKS with Apache Flink, discuss their benefits, and highlight how to get started.

EMR on EKS for data workloads

AWS customers deploying large-scale data workloads are adopting the EMR runtime with Amazon EKS as the underlying orchestrator to benefit from complimenting features. This also enables multi-tenancy and allows data engineers and data scientists to focus on building the data applications, and the platform engineering and the site reliability engineering (SRE) team can manage the infrastructure. Some key benefits of Amazon EKS for these customers are:

  • The AWS-managed control plane, which improves resiliency and removes undifferentiated heavy lifting
  • Features like multi-tenancy and resource-based access policies (RBAC), which allow you to build cost-efficient platforms and enforce organization-wide governance policies
  • The extensibility of Kubernetes, which allows you to install open source add-ons (observability, security, notebooks) to meet your specific needs

The EMR runtime offers the following benefits:

  • Takes care of the undifferentiated heavy lifting of managing installations, configuration, patching, and backups
  • Simplifies scaling
  • Optimizes performance and cost
  • Implements security and compliance by integrating with other AWS services and tools

Benefits of EMR on EKS with Apache Flink

The flexibility to choose instance types, price, and AWS Region and Availability Zone according to the workload specification is often the main driver of reliability, availability, and cost-optimization. Amazon EMR on EKS natively integrates tools and functionalities to enable these—and more.

Integration with existing tools and processes, such as continuous integration and continuous development (CI/CD), observability, and governance policies, helps unify the tools used and decreases the time to launch new services. Many customers already have these tools and processes for their Amazon EKS infrastructure, which you can now easily extend to your Flink applications running on EMR on EKS. If you’re interested in building your Kubernetes and Amazon EKS capabilities, we recommend using EKS Blueprints, which provides a starting place to compose complete EKS clusters that are bootstrapped with the operational software that is needed to deploy and operate workloads.

Another benefit of running Flink applications with Amazon EMR on EKS is improving your applications’ scalability. The volume and complexity of data processed by Flink apps can vary significantly based on factors like the time of the day, day of the week, seasonality, or being tied to a specific marketing campaign or other activity. This volatility makes customers trade off between over-provisioning, which leads to inefficient resource usage and higher costs, or under-provisioning, where you risk missing latency and throughput SLAs or even service outages. When running Flink applications with Amazon EMR on EKS, the Flink auto scaler will increase the applications’ parallelism based on the data being ingested, and Amazon EKS auto scaling with Karpenter or Cluster Autoscaler will scale the underlying capacity required to meet those demands. In addition to scaling up, Amazon EKS can also scale your applications down when the resources aren’t needed so your Flink apps are more cost-efficient.

Running EMR on EKS with Flink allows you to run multiple versions of Flink on the same cluster. With traditional Amazon Elastic Compute Cloud (Amazon EC2) instances, each version of Flink needs to run on its own virtual machine to avoid challenges with resource management or conflicting dependencies and environment variables. However, containerizing Flink applications allows you to isolate versions and avoid conflicting dependencies, and running them on Amazon EKS allows you to use Kubernetes as the unified resource manager. This means that you have the flexibility to choose which version of Flink is best suited for each job, and also improves your agility to upgrade a single job to the next version of Flink rather than having to upgrade an entire cluster, or spin up a dedicated EC2 instance for a different Flink version, which would increase your costs.

Key EMR on EKS differentiations

In this section, we discuss the key EMR on EKS differentiations.

Faster restart of the Flink job during scaling or failure recovery

This is enabled by task local recovery via Amazon Elastic Block Store (Amazon EBS) volumes and fine-grained recovery support in Adaptive Scheduler.

Task local recovery via EBS volumes for TaskManager pods is available with Amazon EMR 6.15.0 and higher. The default overlay mount comes with 10 GB, which is sufficient for jobs with a lower state. Jobs with large states can enable the automatic EBS volume mount option. The TaskManager pods are automatically created and mounted during pod creation and removed during pod deletion.

Fine-grained recovery support in the adaptive scheduler is available with Amazon EMR 6.15.0 and higher. When a task fails during its run, fine-grained recovery restarts only the pipeline-connected component of the failed task, instead of resetting the entire graph, and triggers a complete rerun from the last completed checkpoint, which is more expensive than just rerunning the failed tasks. To enable fine-grained recovery, set the following configurations in your Flink configuration:

jobmanager.execution.failover-strategy: region
restart-strategy: exponential-delay or fixed-delay

Logging and monitoring support with customer managed keys

Monitoring and observability are key constructs of the AWS Well-Architected framework because they help you learn, measure, and adapt to operational changes. You can enable monitoring of launched Flink jobs while using EMR on EKS with Apache Flink. Amazon Managed Service for Prometheus is deployed automatically, if enabled while installing the Flink operator, and it helps analyze Prometheus metrics emitted for the Flink operator, job, and TaskManager.

You can use the Flink UI to monitor health and performance of Flink jobs through a browser using port-forwarding. We have also enabled collection and archival of operator and application logs to Amazon Simple Storage Service (Amazon S3) or Amazon CloudWatch using a FluentD sidecar. This can be enabled through a monitoringConfiguration block in the deployment customer resource definition (CRD):

monitoringConfiguration:
    s3MonitoringConfiguration:
      logUri: S3 BUCKET
      encryptionKeyArn: CMK ARN FOR S3 BUCKET ENCRYPTION
    cloudWatchMonitoringConfiguration:
      logGroupName: LOG GROUP NAME
      logStreamNamePrefix: LOG GROUP STREAM PREFIX
    sideCarResources:
      limits:
        cpuLimit: 500m
        memoryLimit: 250Mi
    containerLogRotationConfiguration:
        rotationSize: 2Gb
        maxFilesToKeep: 10

Cost-optimization using Amazon EC2 Spot Instances

Amazon EC2 Spot Instances are an Amazon EC2 pricing option that provides steep discounts of up to 90% over On-Demand prices. It’s the preferred choice to run big data workloads because it helps improve throughput and optimize Amazon EC2 spend. Spot Instances are spare EC2 capacity and can be interrupted with notification if Amazon EC2 needs the capacity for On-Demand requests. Flink streaming jobs running on EMR on EKS can now respond to Spot Instance interruption, perform a just-in-time (JIT) checkpoint of the running jobs, and prevent scheduling further tasks on these Spot Instances. When restarting the job, not only will the job restart from the checkpoint, but a combined restart mechanism will provide a best-effort service to restart the job either after reaching target resource parallelism or the end of the current configured window. This can also prevent consecutive job restarts caused by Spot Instances stopping in a short interval and help reduce cost and improve performance.

To minimize the impact of Spot Instance interruptions, you should adopt Spot Instance best practices. The combined restart mechanism and JIT checkpoint is offered only in Adaptive Scheduler.

Integration with the AWS Glue Data Catalog as a metadata store for Flink applications

The AWS Glue Data Catalog is a centralized metadata repository for data assets across various data sources, and provides a unified interface to store and query information about data formats, schemas, and sources. Amazon EMR on EKS with Apache Flink releases 6.15.0 and higher support using the Data Catalog as a metadata store for streaming and batch SQL workflows. This further enables data understanding and makes sure that it is transformed correctly.

Integration with Amazon S3, enabling resiliency and operational efficiency

Amazon S3 is the preferred cloud object store for AWS customers to store not only data but also application JARs and scripts. EMR on EKS with Apache Flink can fetch application JARs and scripts (PyFlink) through deployment specification, which eliminates the need to build custom images in Flink’s Application Mode. When checkpointing on Amazon S3 is enabled, a managed state is persisted to provide consistent recovery in case of failures. Retrieval and storage of files using Amazon S3 is enabled by two different Flink connectors. We recommend using Presto S3 (s3p) for checkpointing and s3 or s3a for reading and writing files including JARs and scripts. See the following code:

...
spec:
  flinkConfiguration:
    taskmanager.numberOfTaskSlots: "2"
    state.checkpoints.dir: s3p://<BUCKET-NAME>/flink-checkpoint/
...
job:
jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process
entryClass: "org.apache.flink.client.python.PythonDriver"
...

Role-based access control using IRSA

IAM Roles for Service Accounts (IRSA) is the recommended way to implement role-based access control (RBAC) for deploying and running applications on Amazon EKS. EMR on EKS with Apache Flink creates two roles (IRSA) by default for Flink operator and Flink jobs. The operator role is used for JobManager and Flink services, and the job role is used for TaskManagers and ConfigMaps. This helps limit the scope of AWS Identity and Access Management (IAM) permission to a service account, helps with credential isolation, and improves auditability.

Get started with EMR on EKS with Apache Flink

If you want to run a Flink application on recently launched EMR on EKS with Apache Flink, refer to Running Flink jobs with Amazon EMR on EKS, which provides step-by-step guidance to deploy, run, and monitor Flink jobs.

We have also created an IaC (Infrastructure as Code) template for EMR on EKS with Flink Streaming as part of Data on EKS (DoEKS), an open-source project aimed at streamlining and accelerating the process of building, deploying, and scaling data and ML workloads on Amazon Elastic Kubernetes Service (Amazon EKS). This template will help you to provision a EMR on EKS with Flink cluster and evaluate the features as mentioned in this blog. This template comes with the best practices built in, so you can use this IaC template as a foundation for deploying EMR on EKS with Flink in your own environment if you decide to use it as part of your application.

Conclusion

In this post, we explored the features of recently launched EMR on EKS with Flink to help you understand how you might run Flink workloads on a managed, scalable, resilient, and cost-optimized EMR on EKS cluster. If you are planning to run/explore Flink workloads on Kubernetes consider running them on EMR on EKS with Apache Flink. Please do contact your AWS Solution Architects, who can be of assistance alongside your innovation journey.


About the Authors

Kinnar Kumar Sen is a Sr. Solutions Architect at Amazon Web Services (AWS) focusing on Flexible Compute. As a part of the EC2 Flexible Compute team, he works with customers to guide them to the most elastic and efficient compute options that are suitable for their workload running on AWS. Kinnar has more than 15 years of industry experience working in research, consultancy, engineering, and architecture.

Alex Lines is a Principal Containers Specialist at AWS helping customers modernize their Data and ML applications on Amazon EKS.

Mengfei Wang is a Software Development Engineer specializing in building large-scale, robust software infrastructure to support big data demands on containers and Kubernetes within the EMR on EKS team. Beyond work, Mengfei is an enthusiastic snowboarder and a passionate home cook.

Jerry Zhang is a Software Development Manager in AWS EMR on EKS. His team focuses on helping AWS customers to solve their business problems using cutting-edge data analytics technology on AWS infrastructure.

Build Spark Structured Streaming applications with the open source connector for Amazon Kinesis Data Streams

Post Syndicated from Idan Maizlits original https://aws.amazon.com/blogs/big-data/build-spark-structured-streaming-applications-with-the-open-source-connector-for-amazon-kinesis-data-streams/

Apache Spark is a powerful big data engine used for large-scale data analytics. Its in-memory computing makes it great for iterative algorithms and interactive queries. You can use Apache Spark to process streaming data from a variety of streaming sources, including Amazon Kinesis Data Streams for use cases like clickstream analysis, fraud detection, and more. Kinesis Data Streams is a serverless streaming data service that makes it straightforward to capture, process, and store data streams at any scale.

With the new open source Amazon Kinesis Data Streams Connector for Spark Structured Streaming, you can use the newer Spark Data Sources API. It also supports enhanced fan-out for dedicated read throughput and faster stream processing. In this post, we deep dive into the internal details of the connector and show you how to use it to consume and produce records from and to Kinesis Data Streams using Amazon EMR.

Introducing the Kinesis Data Streams connector for Spark Structured Streaming

The Kinesis Data Streams connector for Spark Structured Streaming is an open source connector that supports both provisioned and On-Demand capacity modes offered by Kinesis Data Streams. The connector is built using the latest Spark Data Sources API V2, which uses Spark optimizations. Starting with Amazon EMR 7.1, the connector comes pre-packaged on Amazon EMR on Amazon EKS, Amazon EMR on Amazon EC2, and Amazon EMR Serverless, so you don’t need to build or download any packages. For using it with other Apache Spark platforms, the connector is available as a public JAR file that can be directly referred to while submitting a Spark Structured Streaming job. Additionally, you can download and build the connector from the GitHub repo.

Kinesis Data Streams supports two types of consumers: shared throughput and dedicated throughput. With shared throughput, 2 Mbps of read throughput per shard is shared across consumers. With dedicated throughput, also known as enhanced fan-out, 2 Mbps of read throughput per shard is dedicated to each consumer. This new connector supports both consumer types out of the box without any additional coding, providing you the flexibility to consume records from your streams based on your requirements. By default, this connector uses a shared throughput consumer, but you can configure it to use enhanced fan-out in the configuration properties.

You can also use the connector as a sink connector to produce records to a Kinesis data stream. The configuration parameters for using the connector as a source and sink differ—for more information, see Kinesis Source Configuration. The connector also supports multiple storage options, including Amazon DynamoDB, Amazon Simple Service for Storage (Amazon S3), and HDFS, to store checkpoints and provide continuity.

For scenarios where a Kinesis data stream is deployed in an AWS producer account and the Spark Structured Streaming application is in a different AWS consumer account, you can use the connector to do cross-account processing. This requires additional Identity and Access Management (IAM) trust policies to allow the Spark Structured Streaming application in the consumer account to assume the role in the producer account.

You should also consider reviewing the security configuration with your security teams based on your data security requirements.

How the connector works

Consuming records from Kinesis Data Streams using the connector involves multiple steps. The following architecture diagram shows the internal details of how the connector works. A Spark Structured Streaming application consumes records from a Kinesis data stream source and produces records to another Kinesis data stream.

A Kinesis data stream is composed of set of shards. A shard is a uniquely identified sequence of data records in a stream and provides a fixed unit of capacity. The total capacity of the stream is the sum of the capacity of all of its shards.

A Spark application consists of a driver and a set of executor processes. The Spark driver acts as a coordinator, and the tasks running in executors are responsible for producing and consuming records to and from shards.

The solution workflow includes the following steps:

  1. Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs. At the beginning of a micro-batch run, the driver uses the Kinesis Data Streams ListShard API to determine the latest description of all available shards. The connector exposes a parameter (kinesis.describeShardInterval) to configure the interval between two successive ListShard API calls.
  2. The driver then determines the starting position in each shard. If the application is a new job, the starting position of each shard is determined by kinesis.startingPosition. If it’s a restart of an existing job, it’s read from last record metadata checkpoint from storage (for this post, DynamoDB) and ignores kinesis.startingPosition.
  3. Each shard is mapped to one task in an executor, which is responsible for reading data. The Spark application automatically creates an equal number of tasks based on the number of shards and distributes it across the executors.
  4. The tasks in an executor use either polling mode (shared) or push mode (enhanced fan-out) to get data records from the starting position for a shard.
  5. Spark tasks running in the executors write the processed data to the data sink. In this architecture, we use the Kinesis Data Streams sink to illustrate how the connector writes back to the stream. Executors can write to more than one Kinesis Data Streams output shard.
  6. At the end of each task, the corresponding executor process saves the metadata (checkpoint) about the last record read for each shard in the offset storage (for this post, DynamoDB). This information is used by the driver in the construction of the next micro-batch.

Solution overview

The following diagram shows an example architecture of how to use the connector to read from one Kinesis data stream and write to another.

In this architecture, we use the Amazon Kinesis Data Generator (KDG) to generate sample streaming data (random events per country) to a Kinesis Data Streams source. We start an interactive Spark Structured Streaming session and consume data from the Kinesis data stream, and then write to another Kinesis data stream.

We use Spark Structured Streaming to count events per micro-batch window. These events for each country are being consumed from Kinesis Data Streams. After the count, we can see the results.

Prerequisites

To get started, follow the instructions in the GitHub repo. You need the following prerequisites:

After you deploy the solution using the AWS CDK, you will have the following resources:

  • An EMR cluster with the Kinesis Spark connector installed
  • A Kinesis Data Streams source
  • A Kinesis Data Streams sink

Create your Spark Structured Streaming application

After the deployment is complete, you can access the EMR primary node to start a Spark application and write your Spark Structured Streaming logic.

As we mentioned earlier, you use the new open source Kinesis Spark connector to consume data from Amazon EMR. You can find the connector code on the GitHub repo along with examples on how to build and set up the connector in Spark.

In this post, we use Amazon EMR 7.1, where the connector is natively available. If you’re not using Amazon EMR 7.1 and above, you can use the connector by running the following code:

cd /usr/lib/spark/jars 
sudo wget https://awslabs-code-us-east-1.s3.amazonaws.com/spark-sql-kinesis-connector/spark-streaming-sql-kinesis-connector_2.12-1.2.1.jar
sudo chmod 755 spark-streaming-sql-kinesis-connector_2.12-1.2.1.jar

Complete the following steps:

  1. On the Amazon EMR console, navigate to the emr-spark-kinesis cluster.
  2. On the Instances tab, select the primary instance and choose the Amazon Elastic Compute Cloud (Amazon EC2) instance ID.

You’re redirected to the Amazon EC2 console.

  1. On the Amazon EC2 console, select the primary instance and choose Connect.
  2. Use Session Manager, a capability of AWS Systems Manager, to connect to the instance.
  3. Because the user that is used to connect is the ssm-user, we need to switch to the Hadoop user:
    sudo su hadoop

  4. Start a Spark shell either using Scala or Python to interactively build a Spark Structured Streaming application to consume data from a Kinesis data stream.

For this post, we use Python for writing to a stream using a PySpark shell in Amazon EMR.

  1. Start the PySpark shell by entering the command pyspark.

Because you already have the connector installed in the EMR cluster, you can now create the Kinesis source.

  1. Create the Kinesis source with the following code:
    kinesis = spark.readStream.format("aws-kinesis") \
        .option("kinesis.region", "<aws-region>") \
        .option("kinesis.streamName", "kinesis-source") \
        .option("kinesis.consumerType", "GetRecords") \
        .option("kinesis.endpointUrl", "https://kinesis.<aws-region>.amazonaws.com") \
        .option("kinesis.startingposition", "LATEST") \
        .load()

For creating the Kinesis source, the following parameters are required:

  • Name of the connector – We use the connector name aws-kinesis
  • kinesis.region – The AWS Region of the Kinesis data stream you are consuming
  • kinesis.consumerType – Use GetRecords (standard consumer) or SubscribeToShard (enhanced fan-out consumer)
  • kinesis.endpointURL – The Regional Kinesis endpoint (for more details, see Service endpoints)
  • kinesis.startingposition – Choose LATEST, TRIM_HORIZON, or AT_TIMESTAMP (refer to ShardIteratorType)

For using an enhanced fan-out consumer, additional parameters are needed, such as the consumer name. The additional configuration can be found in the connector’s GitHub repo.

kinesis_efo = spark \
.readStream \
.format("aws-kinesis") \
.option("kinesis.region", "<aws-region>") \
.option("kinesis.streamName", "kinesis-source") \
.option("kinesis.consumerType", "SubscribeToShard") \
.option("kinesis.consumerName", "efo-consumer") \
.option("kinesis.endpointUrl", "https://kinesis.<aws-region>.amazonaws.com") \
.option("kinesis.startingposition", "LATEST") \
.load()

Deploy the Kinesis Data Generator

Complete the following steps to deploy the KDG and start generating data:

  1. Choose Launch Stack:
    launch stack 1

You might need to change your Region when deploying. Make sure that the KDG is launched in the same Region as where you deployed the solution.

  1. For the parameters Username and Password, enter the values of your choice. Note these values to use later when you log in to the KDG.
  2. When the template has finished deploying, go to the Outputs tab of the stack and locate the KDG URL.
  3. Log in to the KDG, using the credentials you set when launching the CloudFormation template.
  4. Specify your Region and data stream name, and use the following template to generate test data:
    {
        "id": {{random.number(100)}},
        "data": "{{random.arrayElement(
            ["Spain","Portugal","Finland","France"]
        )}}",
        "date": "{{date.now("YYYY-MM-DD hh:mm:ss")}}"
    }

  5. Return to Systems Manager to continue working with the Spark application.
  6. To be able to apply transformations based on the fields of the events, you first need to define the schema for the events:
    from pyspark.sql.types import *
    
    pythonSchema = StructType() \
     .add("id", LongType()) \
     .add("data", StringType()) \
     .add("date", TimestampType())

  7. Run the following the command to consume data from Kinesis Data Streams:
    from pyspark.sql.functions import *
    
    events= kinesis \
      .selectExpr("cast (data as STRING) jsonData") \
      .select(from_json("jsonData", pythonSchema).alias("events")) \
      .select("events.*")

  8. Use the following code for the Kinesis Spark connector sink:
    events \
        .selectExpr("CAST(id AS STRING) as partitionKey","data","date") \
        .writeStream \
        .format("aws-kinesis") \
        .option("kinesis.region", "<aws-region>") \
        .outputMode("append") \
        .option("kinesis.streamName", "kinesis-sink") \
        .option("kinesis.endpointUrl", "https://kinesis.<aws-region>.amazonaws.com") \
        .option("checkpointLocation", "/kinesisCheckpoint") \
        .start() \
        .awaitTermination()

You can view the data in the Kinesis Data Streams console.

  1. On the Kinesis Data Streams console, navigate to kinesis-sink.
  2. On the Data viewer tab, choose a shard and a starting position (for this post, we use Latest) and choose Get records.

You can see the data sent, as shown in the following screenshot. Kinesis Data Streams uses base64 encoding by default, so you might see text with unreadable characters.

Clean up

Delete the following CloudFormation stacks created during this deployment to delete all the provisioned resources:

  • EmrSparkKinesisStack
  • Kinesis-Data-Generator-Cognito-User-SparkEFO-Blog

If you created any additional resources during this deployment, delete them manually.

Conclusion

In this post, we discussed the open source Kinesis Data Streams connector for Spark Structured Streaming. It supports the newer Data Sources API V2 and Spark Structured Streaming for building streaming applications. The connector also enables high-throughput consumption from Kinesis Data Streams with enhanced fan-out by providing dedicated throughput up to 2 Mbps per shard per consumer. With this connector, you can now effortlessly build high-throughput streaming applications with Spark Structured Streaming.

The Kinesis Spark connector is open source under the Apache 2.0 license on GitHub. To get started, visit the GitHub repo.


About the Authors


Idan Maizlits is a Senior Product Manager on the Amazon Kinesis Data Streams team at Amazon Web Services. Idan loves engaging with customers to learn about their challenges with real-time data and to help them achieve their business goals. Outside of work, he enjoys spending time with his family exploring the outdoors and cooking.


Subham Rakshit is a Streaming Specialist Solutions Architect for Analytics at AWS based in the UK. He works with customers to design and build search and streaming data platforms that help them achieve their business objective. Outside of work, he enjoys spending time solving jigsaw puzzles with his daughter.

Francisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers helping them design real-time analytics architectures using AWS services, supporting Amazon MSK and AWS’s managed offering for Apache Flink.

Umesh Chaudhari is a Streaming Solutions Architect at AWS. He works with customers to design and build real-time data processing systems. He has extensive working experience in software engineering, including architecting, designing, and developing data analytics systems. Outside of work, he enjoys traveling, reading, and watching movies.

Get started with AWS Glue Data Quality dynamic rules for ETL pipelines

Post Syndicated from Prasad Nadig original https://aws.amazon.com/blogs/big-data/get-started-with-aws-glue-data-quality-dynamic-rules-for-etl-pipelines/

Hundreds of thousands of organizations build data integration pipelines to extract and transform data. They establish data quality rules to ensure the extracted data is of high quality for accurate business decisions. These rules assess the data based on fixed criteria reflecting current business states. However, when the business environment changes, data properties shift, rendering these fixed criteria outdated and causing poor data quality.

For example, a data engineer at a retail company established a rule that validates daily sales must exceed a 1-million-dollar threshold. After a few months, daily sales surpassed 2 million dollars, rendering the threshold obsolete. The data engineer couldn’t update the rules to reflect the latest thresholds due to lack of notification and the effort required to manually analyze and update the rule. Later in the month, business users noticed a 25% drop in their sales. After hours of investigation, the data engineers discovered that an extract, transform, and load (ETL) pipeline responsible for extracting data from some stores had failed without generating errors. The rule with outdated thresholds continued to operate successfully without detecting this issue. The ordering system that used the sales data placed incorrect orders, causing low inventory for future weeks. What if the data engineer had the ability to set up dynamic thresholds that automatically adjusted as business properties changed?

We are excited to talk about how to use dynamic rules, a new capability of AWS Glue Data Quality. Now, you can define dynamic rules and not worry about updating static rules on a regular basis to adapt to varying data trends. This feature enables you to author dynamic rules to compare current metrics produced by your rules with your historical values. These historical comparisons are enabled by using the last(k) operator in expressions. For example, instead of writing a static rule like RowCount > 1000, which might become obsolete as data volume grows over time, you can replace it with a dynamic rule like RowCount > min(last(3)) . This dynamic rule will succeed when the number of rows in the current run is greater than the minimum row count from the most recent three runs for the same dataset.

This is part 7 of a seven-part series of posts to explain how AWS Glue Data Quality works. Check out the other posts in the series:

Previous posts explain how to author static data quality rules. In this post, we show how to create an AWS Glue job that measures and monitors the data quality of a data pipeline using dynamic rules. We also show how to take action based on the data quality results.

Solution overview

Let’s consider an example data quality pipeline where a data engineer ingests data from a raw zone and loads it into a curated zone in a data lake. The data engineer is tasked with not only extracting, transforming, and loading data, but also identifying anomalies compared against data quality statistics from historical runs.

In this post, you’ll learn how to author dynamic rules in your AWS Glue job in order to take appropriate actions based on the outcome.

The data used in this post is sourced from NYC yellow taxi trip data. The yellow taxi trip records include fields capturing pickup and dropoff dates and times, pickup and dropoff locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. The following screenshot shows an example of the data.

Set up resources with AWS CloudFormation

This post includes an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs.

The CloudFormation template generates the following resources:

  • An Amazon Simple Storage Service (Amazon S3) bucket (gluedataqualitydynamicrules-*)
  • An AWS Lambda which will create the following folder structure within the above Amazon S3 bucket:
    • raw-src/
    • landing/nytaxi/
    • processed/nytaxi/
    • dqresults/nytaxi/
  • AWS Identity and Access Management (IAM) users, roles, and policies. The IAM role GlueDataQuality-* has AWS Glue run permission as well as read and write permission on the S3 bucket.

To create your resources, complete the following steps:

  1. Sign in to the AWS CloudFormation console in the us-east-1 Region.
  2. Choose Launch Stack:  
  3. Select I acknowledge that AWS CloudFormation might create IAM resources.
  4. Choose Create stack and wait for the stack creation step to complete.

Upload sample data

  1. Download the dataset to your local machine.
  2. Unzip the file and extract the Parquet files into a local folder.
  3. Upload parquet files under prefix raw-src/ in Amazon s3 bucket (gluedataqualitydynamicrules-*)

Implement the solution

To start configuring your solution, complete the following steps:

  1. On the AWS Glue Studio console, choose ETL Jobs in the navigation pane and choose Visual ETL.
  2. Navigate to the Job details tab to configure the job.
  3. For Name, enter GlueDataQualityDynamicRules
  4. For IAM Role, choose the role starting with GlueDataQuality-*.
  5. For Job bookmark, choose Enable.

This allows you to run this job incrementally. To learn more about job bookmarks, refer to Tracking processed data using job bookmarks.

  1. Leave all the other settings as their default values.
  2. Choose Save.
  3. After the job is saved, navigate to the Visual tab and on the Sources menu, choose Amazon S3.
  4. In the Data source properties – S3 pane, for S3 source type, select S3 location.
  5. Choose Browse S3 and navigate to the prefix /landing/nytaxi/ in the S3 bucket starting with gluedataqualitydynamicrules-*.
  6. For Data format, choose Parquet and choose Infer schema.

  1. On the Transforms menu, choose Evaluate Data Quality.

You now implement validation logic in your process to identify potential data quality problems originating from the source data.

  1. To accomplish this, specify the following DQDL rules on the Ruleset editor tab:
    CustomSql "select vendorid from primary where passenger_count > 0" with threshold > 0.9,
    Mean "trip_distance" < max(last(3)) * 1.50,
    Sum "total_amount" between min(last(3)) * 0.8 and max(last(3)) * 1.2,
    RowCount between min(last(3)) * 0.9 and max(last(3)) * 1.2,
    Completeness "fare_amount" >= avg(last(3)) * 0.9,
    DistinctValuesCount "ratecodeid" between avg(last(3))-1 and avg(last(3))+2,
    DistinctValuesCount "pulocationid" > avg(last(3)) * 0.8,
    ColumnCount = max(last(2))

  1. Select Original data to output the original input data from the source and add a new node below the Evaluate Data Quality node.
  2. Choose Add new columns to indicate data quality errors to add four new columns to the output schema.
  3. Select Data quality results to capture the status of each rule configured and add a new node below the Evaluate Data Quality node.

  1. With rowLevelOutcomes node selected, choose Amazon S3 on the Targets menu.
  2. Configure the S3 target location to /processed/nytaxi/ under the bucket name starting with gluedataqualitydynamicrules-* and set the output format to Parquet and compression type to Snappy.

  1. With the ruleOutcomes node selected, choose Amazon S3 on the Targets menu.
  2. Configure the S3 target location to /dqresults/ under the bucket name starting with gluedataqualitydynamicrules-*.
  3. Set the output format to Parquet and compression type to Snappy.
  4. Choose Save.

Up to this point, you have set up an AWS Glue job, specified dynamic rules for the pipeline, and configured the target location for both the original source data and AWS Glue Data Quality results to be written on Amazon S3. Next, let’s examine dynamic rules and how they function, and provide an explanation of each rule we used in our job.

Dynamic rules

You can now author dynamic rules to compare current metrics produced by your rules with their historical values. These historical comparisons are enabled by using the last() operator in expressions. For example, the rule RowCount > max(last(1)) will succeed when the number of rows in the current run is greater than the most recent prior row count for the same dataset. last() takes an optional natural number argument describing how many prior metrics to consider; last(k) where k >= 1 will reference the last k metrics. The rule has the following conditions:

  • If no data points are available, last(k) will return the default value 0.0
  • If fewer than k metrics are available, last(k) will return all prior metrics

For example, if values from previous runs are (5, 3, 2, 1, 4), max(last (3)) will return 5.

AWS Glue supports over 15 types of dynamic rules, providing a robust set of data quality validation capabilities. For more information, refer to Dynamic rules. This section demonstrates several rule types to showcase the functionality and enable you to apply these features in your own use cases.

CustomSQL

The CustomSQL rule provides the capability to run a custom SQL statement against a dataset and check the return value against a given expression.

The following example rule uses a SQL statement wherein you specify a column name in your SELECT statement, against which you compare with some condition to get row-level results. A threshold condition expression defines a threshold of how many records should fail in order for the entire rule to fail. In this example, more than 90% of records should contain passenger_count greater than 0 for the rule to pass:

CustomSql "select vendorid from primary where passenger_count > 0" with threshold > 0.9

Note: Custom SQL also supports Dynamic rules, below is an example of how to use it in your job

CustomSql "select count(*) from primary" between min(last(3)) * 0.9 and max(last(3)) * 1.2

Mean

The Mean rule checks whether the mean (average) of all the values in a column matches a given expression.

The following example rule checks that the mean of trip_distance is less than the maximum value for the column trip distance over the last three runs times 1.5:

Mean "trip_distance" < max(last(3)) * 1.50

Sum

The Sum rule checks the sum of all the values in a column against a given expression.

The following example rule checks that the sum of total_amount is between 80% of the minimum of the last three runs and 120% of the maximum of the last three runs:

Sum "total_amount" between min(last(3)) * 0.8 and max(last(3)) * 1.2

RowCount

The RowCount rule checks the row count of a dataset against a given expression. In the expression, you can specify the number of rows or a range of rows using operators like > and <.

The following example rule checks if the row count is between 90% of the minimum of the last three runs and 120% of the maximum of last three runs (excluding the current run). This rule applies to the entire dataset.

RowCount between min(last(3)) * 0.9 and max(last(3)) * 1.2

Completeness

The Completeness rule checks the percentage of complete (non-null) values in a column against a given expression.

The following example rule checks if the completeness of the fare_amount column is greater than or equal to the 90% of the average of the last three runs:

Completeness "fare_amount" >= avg(last(3)) * 0.9

DistinctValuesCount

The DistinctValuesCount rule checks the number of distinct values in a column against a given expression.

The following example rules checks for two conditions:

  • If the distinct count for the ratecodeid column is between the average of the last three runs minus 1 and the average of the last three runs plus 2
  • If the distinct count for the pulocationid column is greater than 80% of the average of the last three runs
    DistinctValuesCount "ratecodeid" between avg(last(3))-1 and avg(last(3))+2,
    DistinctValuesCount "pulocationid" > avg(last(3)) * 0.8

ColumnCount

The ColumnCount rule checks the column count of the primary dataset against a given expression. In the expression, you can specify the number of columns or a range of columns using operators like > and <.

The following example rule check if the column count is equal to the maximum of the last two runs:

ColumnCount = max(last(2))

Run the job

Now that the job setup is complete, we are prepared to run it. As previously indicated, dynamic rules are determined using the last(k) operator, with k set to 3 in the configured job. This implies that data quality rules will be evaluated using metrics from the previous three runs. To assess these rules accurately, the job must be run a minimum of k+1 times, requiring a total of four runs to thoroughly evaluate dynamic rules. In this example, we simulate an ETL job with data quality rules, starting with an initial run followed by three incremental runs.

First job (initial)

Complete the following steps for the initial run:

  1. Navigate to the source data files made available under the prefix /raw-src/ in the S3 bucket starting with gluedataqualitydynamicrules-*.
  2. To simulate the initial run, copy the day one file 20220101.parquet under /raw-src/ to the /landing/nytaxi/ folder in the same S3 bucket.

  1. On the AWS Glue Studio console, choose ETL Jobs in the navigation pane.
  2. Choose GlueDataQualityDynamicRule under Your jobs to open it.
  3. Choose Run to run the job.

You can view the job run details on the Runs tab. It will take a few minutes for the job to complete.

  1. After job successfully completes, navigate to the Data quality -updated tab.

You can observe the Data Quality rules, rule status, and evaluated metrics for each rule that you set in the job. The following screenshot shows the results.

The rule details are as follows:

  • CustomSql – The rule passes the data quality check because 95% of records have a passenger_count greater than 0, which exceeds the set threshold of 90%.
  • Mean – The rule fails due to the absence of previous runs, resulting in a default value of 0.0 when using last(3), with an overall mean of 5.94, which is greater than 0. If no data points are available, last(k) will return the default value of 0.0.
  • Sum – The rule fails for the same reason as the mean rule, with last(3) resulting in a default value of 0.0.
  • RowCount – The rule fails for the same reason as the mean rule, with last(3) resulting in a default value of 0.0.
  • Completeness – The rule passes because 100% of records are complete, meaning there are no null values for the fare_amount column.
  • DistinctValuesCount “ratecodeid” – The rule fails for the same reason as the mean rule, with last(3) resulting in a default value of 0.0.
  • DistinctValuesCount “pulocationid” – The rule passes because the distinct count of 205 for the pulocationid column is higher than the set threshold, with a value of 0.00 because avg(last(3))*0.8 results in 0.
  • ColumnCount – The rule fails for the same reason as the mean rule, with last(3) resulting in a default value of 0.0.

Second job (first incremental)

Now that you have successfully completed the initial run and observed the data quality results, you are ready for the first incremental run to process the file from day two. Complete the following steps:

  1. Navigate to the source data files made available under the prefix /raw-src/ in the S3 bucket starting with gluedataqualitydynamicrules-*.
  2. To simulate the first incremental run, copy the day two file 20220102.parquet under /raw-src/ to the /landing/nytaxi/ folder in the same S3 bucket.
  3. On the AWS Glue Studio console, repeat Steps 4–7 from the first (initial) run to run the job and validate the data quality results.

The following screenshot shows the data quality results.

On the second run, all rules passed because each rule’s threshold has been met:

  • CustomSql – The rule passed because 96% of records have a passenger_count greater than 0, exceeding the set threshold of 90%.
  • Mean – The rule passed because the mean of 6.21 is less than 9.315 (6.21 * 1.5, meaning the mean from max(last(3)) is 6.21, multiplied by 1.5).
  • Sum – The rule passed because the sum of the total amount, 1,329,446.47, is between 80% of the minimum of the last three runs, 1,063,557.176 (1,329,446.47 * 0.8), and 120% of the maximum of the last three runs, 1,595,335.764 (1,329,446.47 * 1.2).
  • RowCount – The rule passed because the row count of 58,421 is between 90% of the minimum of the last three runs, 52,578.9 (58,421 * 0.9), and 120% of the maximum of the last three runs, 70,105.2 (58,421 * 1.2).
  • Completeness – The rule passed because 100% of the records have non-null values for the fare amount column, exceeding the set threshold of the average of the last three runs times 90%.
  • DistinctValuesCount “ratecodeid” – The rule passed because the distinct count of 8 for the ratecodeid column is between the set threshold of 6, which is the average of the last three runs minus 1 ((7)/1 = 7 – 1), and 9, which is the average of the last three runs plus 2 ((7)/1 = 7 + 2).
  • DistinctValuesCount “pulocationid” – The rule passed because the distinct count of 201 for the pulocationid column is greater than 80% of the average of the last three runs, 160.8 (201 * 0.8).
  • ColumnCount – The rule passed because the number of columns, 19, is equal to the maximum of the last two runs.

Third job (second incremental)

After the successful completion of the first incremental run, you are ready for the second incremental run to process the file from day three. Complete the following steps:

  1. Navigate to the source data files under the prefix /raw-src/ in the S3 bucket starting with gluedataqualitydynamicrules-*.
  2. To simulate the second incremental run, copy the day three file 20220103.parquet under /raw-src/ to the /landing/nytaxi/ folder in the same S3 bucket.
  3. On the AWS Glue Studio console, repeat Steps 4–7 from the first (initial) job to run the job and validate data quality results.

The following screenshot shows the data quality results.

Similar to the second run, the data file from the source didn’t contain any data quality issues. As a result, all of the defined data validation rules were within the set thresholds and passed successfully.

Fourth job (third incremental)

Now that you have successfully completed the first three runs and observed the data quality results, you are ready for the final incremental run for this exercise, to process the file from day four. Complete the following steps:

  1. Navigate to the source data files under the prefix /raw-src/ in the S3 bucket starting with gluedataqualitydynamicrules-*.
  2. To simulate the third incremental run, copy the day four file 20220104.parquet under /raw-src/ to the /landing/nytaxi/ folder in the same S3 bucket.
  3. On the AWS Glue Studio console, repeat Steps 4–7 from the first (initial) job to run the job and validate the data quality results.

The following screenshot shows the data quality results.

In this run, there are some data quality issues from the source that were caught by the AWS Glue job, causing the rules to fail. Let’s examine each failed rule to understand the specific data quality issues that were detected:

  • CustomSql – The rule failed because only 80% of the records have a passenger_count greater than 0, which is lower than the set threshold of 90%.
  • Mean – The rule failed because the mean of trip_distance is 71.74, which is greater than 1.5 times the maximum of the last three runs, 11.565 (7.70 * 1.5).
  • Sum – The rule passed because the sum of total_amount is 1,165,023.73, which is between 80% of the minimum of the last three runs, 1,063,557.176 (1,329,446.47 * 0.8), and 120% of the maximum of the last three runs, 1,816,645.464 (1,513,871.22 * 1.2).
  • RowCount – The rule failed because the row count of 44,999 is not between 90% of the minimum of the last three runs, 52,578.9 (58,421 * 0.9), and 120% of the maximum of the last three runs, 88,334.1 (72,405 * 1.2).
  • Completeness – The rule failed because only 82% of the records have non-null values for the fare_amount column, which is lower than the set threshold of the average of the last three runs times 90%.
  • DistinctValuesCount “ratecodeid” – The rule failed because the distinct count of 6 for the ratecodeid column is not between the set threshold of 6.66, which is the average of the last three runs minus 1 ((8+8+7)/3 = 7.66 – 1), and 9.66, which is the average of the last three runs plus 1 ((8+8+7)/3 = 7.66 + 2).
  • DistinctValuesCount “pulocationid” – The rule passed because the distinct count of 205 for the pulocationid column is greater than 80% of the average of the last three runs, 165.86 ((216+201+205)/3 = 207.33 * 0.8).
  • ColumnCount – The rule passed because the number of columns, 19, is equal to the maximum of the last two runs.

To summarize the outcome of the fourth run: the rules for Sum and DistinctValuesCount for pulocationid, as well as the ColumnCount rule, passed successfully. However, the rules for CustomSql, Mean, RowCount, Completeness, and DistinctValuesCount for ratecodeid failed to meet the criteria.

Upon examining the Data Quality evaluation results, further investigation is necessary to identify the root cause of these data quality issues. For instance, in the case of the failed RowCount rule, it’s imperative to ascertain why there was a decrease in record count. This investigation should delve into whether the drop aligns with actual business trends or if it stems from issues within the source system, data ingestion process, or other factors. Appropriate actions must be taken to rectify these data quality issues or update the rules to accommodate natural business trends.

You can expand this solution by implementing and configuring alerts and notifications to promptly address any data quality issues that arise. For more details, refer to Set up alerts and orchestrate data quality rules with AWS Glue Data Quality (Part 4 in this series).

Clean up

To clean up your resources, complete the following steps:

  1. Delete the AWS Glue job.
  2. Delete the CloudFormation stack.

Conclusion

AWS Glue Data Quality offers a straightforward way to measure and monitor the data quality of your ETL pipeline. In this post, you learned about authoring a Data Quality job with dynamic rules, and how these rules eliminate the need to update static rules with ever-evolving source data in order to keep the rules current. Data Quality dynamic rules enable the detection of potential data quality issues early in the data ingestion process, before downstream propagation into data lakes, warehouses, and analytical engines. By catching errors upfront, organizations can ingest cleaner data and take advantage of advanced data quality capabilities. The rules provide a robust framework to identify anomalies, validate integrity, and provide accuracy as data enters the analytics pipeline. Overall, AWS Glue dynamic rules empower organizations to take control of data quality at scale and build trust in analytical outputs.

To learn more about AWS Glue Data Quality, refer to the following:


About the Authors

Prasad Nadig is an Analytics Specialist Solutions Architect at AWS. He guides customers architect optimal data and analytical platforms leveraging the scalability and agility of the cloud. He is passionate about understanding emerging challenges and guiding customers to build modern solutions. Outside of work, Prasad indulges his creative curiosity through photography, while also staying up-to-date on the latest technology innovations and trends.

Mahammadali Saheb is a Data Architect at AWS Professional Services, specializing in Data Analytics. He is passionate about helping customers drive business outcome via data analytics solutions on AWS Cloud.

Tyler McDaniel is a software development engineer on the AWS Glue team with diverse technical interests including high-performance computing and optimization, distributed systems, and machine learning operations. He has eight years of experience in software and research roles.

Rahul Sharma is a Senior Software Development Engineer at AWS Glue. He focuses on building distributed systems to support features in AWS Glue. He has a passion for helping customers build data management solutions on the AWS Cloud. In his spare time, he enjoys playing the piano and gardening.

Edward Cho is a Software Development Engineer at AWS Glue. He has contributed to the AWS Glue Data Quality feature as well as the underlying open-source project Deequ.

Introducing blueprint discovery and other UI enhancements for Amazon OpenSearch Ingestion

Post Syndicated from Jagadish Kumar original https://aws.amazon.com/blogs/big-data/introducing-blueprint-discovery-and-other-ui-enhancements-for-amazon-opensearch-ingestion/

Amazon OpenSearch Ingestion is a fully managed serverless pipeline that allows you to ingest, filter, transform, enrich, and route data to an Amazon OpenSearch Service domain or Amazon OpenSearch Serverless collection. OpenSearch Ingestion is capable of ingesting data from a wide variety of sources and has a rich ecosystem of built-in processors to take care of your most complex data transformation needs.

In this post, we walk you through the new UI enhancements and blueprint discovery features that are now available with OpenSearch Ingestion for a richer user experience.

Blueprint discovery

Rather than create a pipeline definition from scratch, you can use configuration blueprints, which are preconfigured templates for common ingestion scenarios such as trace analytics or Apache logs. Configuration blueprints help you provision pipelines without having to author a configuration from scratch.

With the older interface, you had to scroll through a dropdown menu of all the available blueprints and couldn’t filter the blueprints using search. OpenSearch Ingestion now offers a new UI that enables searching for blueprints using full-text search on the AWS Management Console, helping you discover all the sources that you can ingest data from into OpenSearch Service.

When you create a new pipeline on the OpenSearch Service console, you’re presented with a new catalog page. You now have a top-down view of all the sources and sinks supported by OpenSearch Ingestion. The blueprints are listed as tiles with icons and are grouped together thematically.

The new catalog has a navigation menu that lists the blueprints, grouped by use case or the service that you want to ingest data from. You can either search for a blueprint or choose the shortcuts in the navigation pane. For example, if you’re building a pipeline to perform a zero-ETL integration with Amazon DynamoDB, you can either search for DynamoDB, choose ZeroETL under Use case in the navigation pane, or choose DynamoDB under Service.

Customized getting started guide

After you choose your blueprint, you’re directed to the Pipeline settings and configuration page. The new UI for blueprints offers a customized getting started guide for each source, detailing key steps in setting up a successful end-to-end integration. The guide on this page will change depending on the blueprint you chose for your pipeline.

The pipeline configuration is pre-filled with a template that you can modify with your specific sources, AWS Identity and Access Management (IAM) roles, and sinks. For example, if you chose the Zero-ETL with DocumentDB blueprint, the getting started guide will provide information on setting up a DocumentDB collection, permissions, and destination.

Support for JSON pipeline configuration

As part of the visual overhaul, OpenSearch Ingestion now offers support for specifying the pipeline configuration in JSON format on the console in addition to the existing YAML support. You can now view the configurations in JSON format in addition to the YAML format and edit them in place.

Although YAML is more human readable, it’s whitespace sensitive and is often challenging when copy/pasting from text editors. The JSON pipeline configuration allows you to confidently copy and paste configurations from your text or code editors without having to worry about formatting errors due to inconsistent whitespace. In addition, you can now make any edits in place with JSON configuration and the changes will be propagated to the YAML automatically.

Availability and pricing

These features are available in all commercial AWS Regions where OpenSearch Ingestion is currently available.

Ingestion-OCUs remain at the same price of $0.24 cents per hour. OCUs are billed on an hourly basis with per-minute granularity. You can control the costs OCUs incur by configuring maximum OCUs that a pipeline is allowed to scale.

Conclusion

In this post, we showed you the new blueprint discovery and other UI enhancements for OpenSearch Ingestion. Refer to Amazon OpenSearch Ingestion to learn about other capabilities provided by OpenSearch Ingestion to build scalable pipelines for your OpenSearch data ingestion needs.


About the author

Jagadish Kumar (Jag) is a Senior Specialist Solutions Architect at AWS focused on Amazon OpenSearch Service. He is deeply passionate about Data Architecture and helps customers build analytics solutions at scale on AWS.


Sam Selvan
is a Principal Specialist Solution Architect with Amazon OpenSearch Service.

Use AWS Data Exchange to seamlessly share Apache Hudi datasets

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/use-aws-data-exchange-to-seamlessly-share-apache-hudi-datasets/

Apache Hudi was originally developed by Uber in 2016 to bring to life a transactional data lake that could quickly and reliably absorb updates to support the massive growth of the company’s ride-sharing platform. Apache Hudi is now widely used to build very large-scale data lakes by many across the industry. Today, Hudi is the most active and high-performing open source data lakehouse project, known for fast incremental updates and a robust services layer.

Apache Hudi serves as an important data management tool because it allows you to bring full online transaction processing (OLTP) database functionality to data stored in your data lake. As a result, Hudi users can store massive amounts of data with the data scaling costs of a cloud object store, rather than the more expensive scaling costs of a data warehouse or database. It also provides data lineage, integration with leading access control and governance mechanisms, and incremental ingestion of data for near real-time performance. AWS, along with its partners in the open source community, has embraced Apache Hudi in several services, offering Hudi compatibility in Amazon EMR, Amazon Athena, Amazon Redshift, and more.

AWS Data Exchange is a service provided by AWS that enables you to find, subscribe to, and use third-party datasets in the AWS Cloud. A dataset in AWS Data Exchange is a collection of data that can be changed or updated over time. It also provides a platform through which a data producer can make their data available for consumption for subscribers.

In this post, we show how you can take advantage of the data sharing capabilities in AWS Data Exchange on top of Apache Hudi.

Benefits of AWS Data Exchange

AWS Data Exchange offers a series of benefits to both parties. For subscribers, it provides a convenient way to access and use third-party data without the need to build and maintain data delivery, entitlement, or billing technology. Subscribers can find and subscribe to thousands of products from qualified AWS Data Exchange providers and use them with AWS services. For providers, AWS Data Exchange offers a secure, transparent, and reliable channel to reach AWS customers. It eliminates the need to build and maintain data delivery, entitlement, and billing technology, allowing providers to focus on creating and managing their datasets.

To become a provider on AWS Data Exchange, there are a few steps to determine eligibility. Providers need to register to be a provider, make sure their data meets the legal eligibility requirements, and create datasets, revisions, and import assets. Providers can define public offers for their data products, including prices, durations, data subscription agreements, refund policies, and custom offers. The AWS Data Exchange API and AWS Data Exchange console can be used for managing datasets and assets.

Overall, AWS Data Exchange simplifies the process of data sharing in the AWS Cloud by providing a platform for customers to find and subscribe to third-party data, and for providers to publish and manage their data products. It offers benefits for both subscribers and providers by eliminating the need for complex data delivery and entitlement technology and providing a secure and reliable channel for data exchange.

Solution overview

Combining the scale and operational capabilities of Apache Hudi with the secure data sharing features of AWS Data Exchange enables you to maintain a single source of truth for your transactional data. Simultaneously, it enables automatic business value generation by allowing other stakeholders to use the insights that the data can provide. This post shows how to set up such a system in your AWS environment using Amazon Simple Storage Service (Amazon S3), Amazon EMR, Amazon Athena, and AWS Data Exchange. The following diagram illustrates the solution architecture.

Set up your environment for data sharing

You need to register as a data producer before you create datasets and list them in AWS Data Exchange as data products. Complete the following steps to register as a data provider:

  1. Sign in to the AWS account that you want to use to list and manage products on AWS Data Exchange.
    As a provider, you are responsible for complying with these guidelines and the Terms and Conditions for AWS Marketplace Sellers and the AWS Customer Agreement. AWS may update these guidelines. AWS removes any product that breaches these guidelines and may suspend the provider from future use of the service. AWS Data Exchange may have some AWS Regional requirements; refer to Service endpoints for more information.
  2.  Open the AWS Marketplace Management Portal registration page and enter the relevant information about how you will use AWS Data Exchange.
  3. For Legal business name, enter the name that your customers see when subscribing to your data.
  4. Review the terms and conditions and select I have read and agree to the AWS Marketplace Seller Terms and Conditions.
  5. Select the information related to the types of products you will be creating as a data provider.
  6. Choose Register & Sign into Management Portal.

If you want to submit paid products to AWS Marketplace or AWS Data Exchange, you must provide your tax and banking information. You can add this information on the Settings page:

  1. Choose the Payment information tab.
  2. Choose Complete tax information and complete the form.
  3. Choose Complete banking information and complete the form.
  4. Choose the Public profile tab and update your public profile.
  5. Choose the Notifications tab and configure an additional email address to receive notifications.

You’re now ready to configure seamless data sharing with AWS Data Exchange.

Upload Apache Hudi datasets to AWS Data Exchange

After you create your Hudi datasets and register as a data provider, complete the following steps to create the datasets in AWS Data Exchange:

  1. Sign in to the AWS account that you want to use to list and manage products on AWS Data Exchange.
  2. On the AWS Data Exchange console, choose Owned data sets in the navigation pane.
  3. Choose Create data set.
  4. Select the dataset type you want to create (for this post, we select Amazon S3 data access).
  5. Choose Choose Amazon S3 locations.
  6. Choose the Amazon S3 location where you have your Hudi datasets.

After you add the Amazon S3 location to register in AWS Data Exchange, a bucket policy is generated.

  1. Copy the JSON file and update the bucket policy in Amazon S3.
  2. After you update the bucket policy, choose Next.
  3. Wait for the CREATE_S3_DATA_ACCESS_FROM_S3_BUCKET job to show as Completed, then choose Finalize data set.

Publish a product using the registered Hudi dataset

Complete the following steps to publish a product using the Hudi dataset:

  1. On the AWS Data Exchange console, choose Products in the navigation pane.
    Make sure you’re in the Region where you want to create the product.
  2. Choose Publish new product to start the workflow to create a new product.
  3. Choose which product visibility you want to have: public (it will be publicly available in AWS Data Exchange catalog as well as the AWS Marketplace websites) or private (only the AWS accounts you share with will have access to it).
  4. Select the sensitive information category of the data you are publishing.
  5. Choose Next.
  6. Select the dataset that you want to add to the product, then choose Add selected to add the dataset to the new product.
  7. Define access to your dataset revisions based on time. For more information, see Revision access rules.
  8. Choose Next.
  9. Provide the information for a new product, including a short description.
    One of the required fields is the product logo, which must be in a supported image format (PNG, JPG, or JPEG) and the file size must be 100 KB or less.
  10. Optionally, in the Define product section, under Data dictionaries and samples, select a dataset and choose Edit to upload a data dictionary to the product.
  11. For Long description, enter the description to display to your customers when they look at your product. Markdown formatting is supported.
  12. Choose Next.
  13. Based on your choice of product visibility, configure the offer, renewal, and data subscription agreement.
  14. Choose Next.
  15. Review all the products and offer information, then choose Publish to create the new private product.

Manage permissions and access controls for shared datasets

Datasets that are published on AWS Data Exchange can only be used when customers are subscribed to the products. Complete the following steps to subscribe to the data:

  1. On the AWS Data Exchange console, choose Browse catalog in the navigation pane.
  2. In the search bar, enter the name of the product you want to subscribe to and press Enter.
  3. Choose the product to view its detail page.
  4. On the product detail page, choose Continue to Subscribe.
  5. Choose your preferred price and duration combination, choose whether to enable auto-renewal for the subscription, and review the offer details, including the data subscription agreement (DSA).
    The dataset is available in the US East (N. Virginia) Region.
  6. Review the pricing information, choose the pricing offer and, if you and your organization agree to the DSA, pricing, and support information, choose Subscribe.

After the subscription has gone through, you will be able to see the product on the Subscriptions page.

Create a table in Athena using an Amazon S3 access point

Complete the following steps to create a table in Athena:

  1. Open the Athena console.
  2. If this is the first time using Athena, choose Explore Query Editor and set up the S3 bucket where query results will be written:
    Athena will display the results of your query on the Athena console, or send them through your ODBC/JDBC driver if that is what you are using. Additionally, the results are written to the result S3 bucket.

    1. Choose View settings.
    2. Choose Manage.
    3. Under Query result location and encryption, choose Browse Amazon S3 to choose the location where query results will be written.
    4. Choose Save.
    5. Choose a bucket and folder you want to automatically write the query results to.
      Athena will display the results of your query on the Athena console, or send them through your ODBC/JDBC driver if that is what you are using. Additionally, the results are written to the result S3 bucket.
  3. Complete the following steps to create a workgroup:
    1. In the navigation pane, choose Workgroups.
    2. Choose Create workgroup.
    3. Enter a name for your workgroup (for this post, data_exchange), select your analytics engine (Athena SQL), and select Turn on queries on requester pay buckets in Amazon S3.
      This is important to access third-party datasets.
    4. In the Athena query editor, choose the workgroup you created.
    5. Run the following DDL to create the table:

Now you can run your analytical queries using Athena SQL statements. The following screenshot shows an example of the query results.

Enhanced customer collaboration and experience with AWS Data Exchange and Apache Hudi

AWS Data Exchange provides a secure and simple interface to access high-quality data. By providing access to over 3,500 datasets, you can use leading high-quality data in your analytics and data science. Additionally, the ability to add Hudi datasets as shown in this post allows you to enable deeper integration with lakehouse use cases. There are several potential use cases where having Apache Hudi datasets integrated into AWS Data Exchange can accelerate business outcomes, such as the following:

  • Near real-time updated datasets – One of Apache Hudi’s defining features is the ability to provide near real-time incremental data processing. As new data flows in, Hudi allows that data to be ingested in real time, providing a central source of up-to-date truth. AWS Data Exchange supports dynamically updated datasets, which can keep up with these incremental updates. For downstream customers that rely on the most up-to-date information for their use cases, the combination of Apache Hudi and AWS Data Exchange means that they can subscribe to a dataset in AWS Data Exchange and know that they’re getting incrementally updated data.
  • Incremental pipelines and processing – Hudi supports incremental processing and updates to data in the data lake. This is especially valuable because it enables you to only update or process any data that has changed and materialized views that are valuable for your business use case.

Best practices and recommendations

We recommend the following best practices for security and compliance:

  • Enable AWS Lake Formation or other data governance systems as part of creating the source data lake
  • To maintain compliance, you can use the guides provided by AWS Artifact

For monitoring and management, you can enable Amazon CloudWatch logs on your EMR clusters along with CloudWatch alerts to maintain pipeline health.

Conclusion

Apache Hudi enables you to bring to life massive amounts of data stored in Amazon S3 for analytics. It provides full OLAP capabilities, enables incremental processing and querying, along with maintaining the ability to run deletes to remain GDPR compliant. Combining this with the secure, reliable, and user-friendly data sharing capabilities of AWS Data Exchange means that the business value unlocked by a Hudi lakehouse doesn’t need to remain limited to the producer that generates this data.

For more use cases about using AWS Data Exchange, see Learning Resources for Using Third-Party Data in the Cloud. To learn more about creating Apache Hudi data lakes, refer to Build your Apache Hudi data lake on AWS using Amazon EMR – Part 1. You can also consider using a fully managed lakehouse product such as Onehouse.


About the Authors

Saurabh Bhutyani is a Principal Analytics Specialist Solutions Architect at AWS. He is passionate about new technologies. He joined AWS in 2019 and works with customers to provide architectural guidance for running generative AI use cases, scalable analytics solutions and data mesh architectures using AWS services like Amazon Bedrock, Amazon SageMaker, Amazon EMR, Amazon Athena, AWS Glue, AWS Lake Formation, and Amazon DataZone.

Ankith Ede is a Data & Machine Learning Engineer at Amazon Web Services, based in New York City. He has years of experience building Machine Learning, Artificial Intelligence, and Analytics based solutions for large enterprise clients across various industries. He is passionate about helping customers build scalable and secure cloud based solutions at the cutting edge of technology innovation.

Chandra Krishnan is a Solutions Engineer at Onehouse, based in New York City. He works on helping Onehouse customers build business value from their data lakehouse deployments and enjoys solving exciting challenges on behalf of his customers. Prior to Onehouse, Chandra worked at AWS as a Data and ML Engineer, helping large enterprise clients build cutting edge systems to drive innovation in their organizations.

Safely remove Kafka brokers from Amazon MSK provisioned clusters

Post Syndicated from Vidhi Taneja original https://aws.amazon.com/blogs/big-data/safely-remove-kafka-brokers-from-amazon-msk-provisioned-clusters/

Today, we are announcing broker removal capability for Amazon Managed Streaming for Apache Kafka (Amazon MSK) provisioned clusters, which lets you remove multiple brokers from your provisioned clusters. You can now reduce your cluster’s storage and compute capacity by removing sets of brokers, with no availability impact, data durability risk, or disruption to your data streaming applications. Amazon MSK is a fully managed Apache Kafka service that makes it easy for developers to build and run highly available, secure, and scalable streaming applications. Administrators can optimize the costs of their Amazon MSK clusters by reducing broker count and adapting the cluster capacity to the changes in the streaming data demand, without affecting their clusters’ performance, availability, or data durability.

You can use Amazon MSK as a core foundation to build a variety of real-time streaming applications and high-performance event-driven architectures. As business needs and traffic patterns change, cluster capacity is often adjusted to optimize costs. Amazon MSK provides flexibility and elasticity for administrators to right-size MSK clusters. You can increase broker count or the broker size to manage the surge in traffic during peak events or decrease the instance size of brokers of the cluster to reduce capacity. However, to reduce the broker count, earlier you had to undertake effort-intensive migration to another cluster.

With the broker removal capability, you can now remove multiple brokers from your provisioned clusters to meet the varying needs of your streaming workloads. During and post broker removal, the cluster continues to handle read and write requests from the client applications. MSK performs the necessary validations to safeguard against data durability risks and gracefully removes the brokers from the cluster. By using broker removal capability, you can precisely adjust MSK cluster capacity, eliminating the need to change the instance size of every broker in the cluster or having to migrate to another cluster to reduce broker count.

How the broker removal feature works

Before you execute the broker removal operation, you must make some brokers eligible for removal by moving all partitions off of them. You can use Kafka admin APIs or Cruise Control to move partitions to other brokers that you intend to retain in the cluster.

You choose which brokers to remove and move the partitions from those brokers to other brokers using Kafka tools. Alternatively, you may have brokers that are not hosting any partitions. Then use Edit number of brokers feature using the AWS Management Console, or the Amazon MSK API UpdateBrokerCount. Here are details on how you can use this new feature:

  • You can remove a maximum of one broker per Availability Zone (AZ) in a single broker removal operation. To remove more brokers, you can call multiple broker removal operations consecutively after the prior operation has been completed. You must retain at least one broker per AZ in your MSK cluster.
  • The target number of broker nodes in the cluster must be a multiple of the number of availability zones (AZs) in the client subnets parameter. For example, a cluster with subnets in two AZs must have a target number of nodes that is a multiple of two.
  • If the brokers you removed were present in the bootstrap broker string, MSK will perform the necessary routing so that the client’s connectivity to the cluster is not disrupted. You don’t need to make any client changes to change your bootstrap strings.
  • You can add brokers back to your cluster anytime using AWS Console, or the UpdateBrokerCount API.
  • Broker removal is supported on Kafka versions 2.8.1 and above. If you have clusters in lower versions, you must first upgrade to version 2.8.1 or above and then remove brokers.
  • Broker removal doesn’t support the t3.small instance type.
  • You will stop incurring costs for the removed brokers once the broker removal operation is completed successfully.
  • When brokers are removed from a cluster, their associated local storage is removed as well.

Considerations before removing brokers

Removing brokers from an existing Apache Kafka cluster is a critical operation that needs careful planning to avoid service disruption. When deciding how many brokers you should remove from the cluster, determine your cluster’s minimum broker count by considering your requirements around availability, durability, local data retention, and partition count. Here are a few things you should consider:

  • Check Amazon CloudWatch BytesInPerSec and BytesOutPerSec metrics for your cluster. Look for the peak load over a period of 1 month. Use this data with MSK sizing Excel file to identify how many brokers you need to handle your peak load. If the number of brokers listed in the Excel file is higher than the number of brokers that would remain after removing brokers, do not proceed with this operation. This indicates that removing brokers would result in too few brokers for the cluster, which can lead to availability impact for your cluster or applications.
  • Check UserPartitionExists metrics to verify that you have at least 1 empty broker per AZ in your cluster. If not, make sure to remove partitions from at least one broker per AZ before invoking the operation.
  • If you have more than one broker per AZ with no user partitions on them, MSK will randomly pick one of those during the removal operation.
  • Check the PartitionCount metrics to know the number of partitions that exist on your cluster. Check per broker partition limit. The broker removal feature will not allow the removal of brokers if the service detects that any brokers in the cluster have breached the partition limit. In that case, check if any unused topics could be removed instead to free up broker resources.
  • Check if the estimated storage in the Excel file exceeds the currently provisioned storage for the cluster. In that case, first provision additional storage on that cluster. If you are hitting per-broker storage limits, consider approaches like using MSK tiered storage or removing unused topics. Otherwise, avoid moving partitions to just a few brokers as that may lead to a disk full issue.
  • If the brokers you are planning to remove host partitions, make sure those partitions are reassigned to other brokers in the cluster. Use the kafka-reassign-partitions.sh tool or Cruise Control to initiate partition reassignment. Monitor the progress of reassignment to completion. Disregard the __amazon_msk_canary, __amazon_msk_canary_state internal topics, because they are managed by the service and will be automatically removed by MSK while executing the operation.
  • Verify the cluster status is Active, before starting the removal process.
  • Check the performance of the workload on your production environment after you move those partitions. We recommend monitoring this for a week before you remove the brokers to make sure that the other brokers in your cluster can safely handle your traffic patterns.
  • If you experience any impact on your applications or cluster availability after removing brokers, you can add the same number of brokers that you removed earlier by using the UpdateBrokerCount API, and then reassign partitions to the newly added brokers.
  • We recommend you test the entire process in a non-production environment, to identify and resolve any issues before making changes in the production environment.

Conclusion

Amazon MSK’s new broker removal capability provides a safe way to reduce the capacity of your provisioned Apache Kafka clusters. By allowing you to remove brokers without impacting availability, data durability, or disrupting your streaming applications, this feature enables you to optimize costs and right-size your MSK clusters based on changing business needs and traffic patterns. With careful planning and by following the recommended best practices, you can confidently use this capability to manage your MSK resources more efficiently.

Start taking advantage of the broker removal feature in Amazon MSK today. Review the documentation and follow the step-by-step guide to test the process in a non-production environment. Once you are comfortable with the workflow, plan and execute broker removal in your production MSK clusters to optimize costs and align your streaming infrastructure with your evolving workload requirements.


About the Authors


Vidhi Taneja is a Principal Product Manager for Amazon Managed Streaming for Apache Kafka (Amazon MSK) at AWS. She is passionate about helping customers build streaming applications at scale and derive value from real-time data. Before joining AWS, Vidhi worked at Apple, Goldman Sachs and Nutanix in product management and engineering roles. She holds an MS degree from Carnegie Mellon University.


Anusha Dasarakothapalli is a Principal Software Engineer for Amazon Managed Streaming for Apache Kafka (Amazon MSK) at AWS. She started her software engineering career with Amazon in 2015 and worked on products such as S3-Glacier and S3 Glacier Deep Archive, before transitioning to MSK in 2022. Her primary areas of focus lie in streaming technology, distributed systems, and storage.


Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Breaking barriers in geospatial: Amazon Redshift, CARTO, and H3

Post Syndicated from Ravi Animi original https://aws.amazon.com/blogs/big-data/breaking-barriers-in-geospatial-amazon-redshift-carto-and-h3/

This post is co-written with Javier de la Torre from CARTO.

In this post, we discuss how Amazon Redshift spatial index functions such as Hexagonal hierarchical geospatial indexing system (or H3) can be used to represent spatial data using H3 indexing for fast spatial lookups at scale. Navigating the vast landscape of data-driven insights has always been an exciting endeavor. As technology continues to evolve, one specific facet of this journey is reaching unprecedented proportions: geospatial data. In our increasingly interconnected world, where every step we take, every location we visit, and every event we encounter leaves a digital footprint, the volume and complexity of geospatial data are expanding at an astonishing pace. From GPS-enabled smartphones to remote sensing satellites, the sources of geospatial information are multiplying, generating an immense gold mine of location-based insights.

However, visualizing and analyzing large-scale geospatial data presents a formidable challenge due to the sheer volume and intricacy of information. This often overwhelms traditional visualization tools and methods. The need to balance detail and context while maintaining real-time interactivity can lead to issues of scalability and rendering complexity.

Because of this, many organizations are turning to novel ways of approaching geospatial data, such as spatial indexes such as H3.

Figure 1 – Map built with CARTO Builder and the native support to visualize H3 indexes

Figure 1 – Map built with CARTO Builder and the native support to visualize H3 indexes

What are spatial indexes?

Spatial indexes are global grid systems that exist at multiple resolutions. But what makes them special? Traditionally, spatial data is represented through a geography or geometry in which features are geolocated on the earth by a long reference string describing the coordinates of every vertex. Unlike geometries, spatial indexes are georeferenced by a short ID string. This makes them far smaller to store and lightning fast to process! Because of this, many organizations are utilizing them as a support geography, aggregating their data to these grids to optimize both their storage and analysis.

Figure 2 shows some of the possible types of savings with spatial indexes. To learn more details about their benefits, see Introduction to Spatial Indexes.

Figure 2 – Comparison of performance between geometries and spatial indexes. Learn more about these differences in CARTO’s free ebook Spatial Indexes

Figure 2 – Comparison of performance between geometries and spatial indexes. Learn more about these differences in CARTO’s free ebook Spatial Indexes

Benefits of H3

One of the flagship examples of spatial indexes is H3, which is a hexagonal spatial index. Originally developed by Uber, it is now used far beyond the ridesharing industry. Unlike square-based grids, H3’s well-structured hexagons accurately represent intricate geographic features like rivers and roads, enabling precise depiction of nonperpendicular shapes. The hexagonal geometry excels at capturing gradual spatial changes and movement, and its consistent distance between one centroid and neighboring centroids eliminates outliers. This ensures robust data representation in all directions. Learn more about the benefits of using hexagons for location intelligence at Hexagons for Location Intelligence.

Figure 3 – H3: the relationships between different resolutions

Figure 3 – H3: the relationships between different resolutions

H3 available now in Amazon Redshift

Given the immense benefits of H3 for spatial analysis, we’re very excited to announce the availability of H3 in Amazon Redshift. Seamlessly accessible through the powerful infrastructure of Amazon Redshift, H3 unlocks a new realm of possibilities for visualizing, analyzing, and deriving insights from geospatial data.

Amazon Redshift support for H3 offers an easy way to index spatial coordinates into a hexagonal grid, down to a square meter resolution. Indexed data can be quickly joined across different datasets and aggregated at different levels of precision. H3 enables several spatial algorithms and optimizations based on the hexagonal grid, including nearest neighbors, shortest path, gradient smoothing, and more. H3 indexes refer to cells that can be either hexagons or pentagons. The space is subdivided hierarchically, and given a resolution. H3 supports 16 resolutions from 0–15, inclusive, with 0 being the coarsest and 15 being the finest. H3 indexing and related H3 spatial functions are now available for Amazon Redshift spatial analytics.

Support for the three new H3 indexing related spatial functions, H3_FromLongLat, H3_FromPoint, and H3_PolyFill spatial functions, is now available in all commercial AWS Regions. For more information or to get started with Amazon Redshift spatial analytics, see the documentation for querying spatial data, spatial functions, and the spatial tutorial.

Examples of H3 functions in Amazon Redshift:

To create or access the indexed values of the hexagonal tiles, you use one of the three H3 indexing functions Amazon Redshift has released for the particular spatial GEOMETRY object you want to index. For example, a polygon (a series of Cartesian X Y points that makes a closed 2D object), a point (a single Cartesian X Y value) or a point as a latitude, longitude value (a single latitude, longitude value). For example, if you have a spatial polygon already, you would use the H3_PolyFill function to get the index values of the hexagonal tiles that cover or fit the polygon vertices. Imagine you have a polygon with the following Cartesian (X Y) coordinates:

(0 0, 0 1, 1 1, 1 0, 0 0) , which is just a 1 x 1 unit square. You would then invoke the H3_PolyFill() function by converting the text values of the Cartesian coordinates to a GEOMETRY data type and then use the POLYGON() function to convert those coordinates to a polygon object of GEOMETRY data type. This is what you would call:

SELECT H3_Polyfill(ST_GeomFromText('POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))'), 4);

The return values from the this function are the actual index values to the individual hexagonal tiles that cover the 1 x 1 polygon. Of course, you could define arbitrary polygons of any shape just by using vertices of the enclosing 2D polygon of GEOMETRY data type. The actual H3 tile index values that are returned as Amazon Redshift SUPER data type arrays for the preceding example are:

h3_polyfill
_____________________________________________________________________
[596538848238895103,596538805289222143,596538856828829695,596538813879156735,596537920525959167,596538685030137855,596538693620072447,596538839648960511]
_____________________________________________________________________

So there are eight hexagonal tiles when the resolution of four is used when you call the H3_PolyFill function.

Similarly, the following SQL returns the H3 cell ID from longitude 0, latitude 0, and resolution 10.

SELECT H3_FromLongLat(0, 0, 10);

 h3_fromlonglat
______________________________________________________________
 623560421467684863
______________________________________________________________

As does this SQL that returns the H3 cell ID from point 0,0 with resolution 10.

SELECT H3_FromPoint(ST_GeomFromText('POINT(0 0)'), 10);

 h3_frompoint
_____________________________________________________________________________________
 623560421467684863
_____________________________________________________________________________________

Data visualization and analysis made easy with H3 and CARTO

To illustrate how H3 can be used in action, let’s turn to CARTO. As an AWS Partner, CARTO offers a software solution on the curated digital catalog AWS Marketplace that seamlessly integrates distinctive capabilities for spatial visualization, analysis, and app development directly within the AWS data warehouse environment. Notably setting CARTO apart from certain GIS platforms is its strategy of query optimization by using the data warehouse and conducting analytical tasks and computations within Amazon Redshift through the use of user-defined functions (UDFs).

Figure 4 – Basic workflow build with CARTO to polyfill a set of polygons into H3 indexes

Figure 4 – Basic workflow build with CARTO to polyfill a set of polygons into H3 indexes

Amazon Redshift comes equipped with a variety of preexisting spatial functions, and CARTO enhances this foundation by providing additional spatial functions within its Analytics Toolbox for Amazon Redshift, thereby expanding the range of analytical possibilities even further. Let’s dive into a use case to see how this can be used to solve an example spatial analysis problem.

Unveiling H3 spatial indexes in logistics

Logistics, particularly in last-mile delivery, harness substantial benefits from utilizing H3 spatial indexes in operational analytics. This framework has revolutionized geospatial analysis, particularly in efficiently managing extensive datasets.

H3 divides earth’s surface into varying-sized hexagons, precisely representing different geographic areas across multiple hierarchy levels. This precision allows detailed location representation at various scales, offering versatility in analyses and optimizations—from micro to macro, spanning neighborhoods to cities—efficiently managing vast datasets.

H3-based analytics empower the processing and understanding of delivery data patterns, such as peak times, popular destinations, and high-demand areas. This insight aids in predicting future demand and facilitates operations-related decisions. H3 can also help create location-based profiling features for predictive machine learning (ML) models such as risk-mitigation models. Further use cases can include adjustments to inventory, strategic placement of permanent or temporary distribution centers, or even refining pricing strategies to become more effective and adaptive.

The uniform scalability and size consistency of H3 make it an ideal structure for organizing data, effectively replacing traditional zip codes in day-to-day operations.

In essence, insights derived from H3-based analytics empower businesses to make informed decisions, swiftly adapt to market changes, and elevate customer satisfaction through efficient deliveries.

The feature is eagerly anticipated by Amazon Redshift and CARTO customers. “The prospect of leveraging H3’s advanced spatial capabilities within the robust framework of Amazon Redshift has us excited about the new insights and efficiencies we can unlock for our geospatial analysis. This partnership truly aligns with our vision for smarter, data-driven decision-making,” says the Data Science Team at Aramex.

Figure 5 – Diagram illustrating the process of using H3-powered analytics for strategic decision-making

Figure 5 – Diagram illustrating the process of using H3-powered analytics for strategic decision-making

Let’s talk about your use case

You can experience the future of location intelligence firsthand by requesting a demo from CARTO today. Discover how H3’s hexagonal spatial index, seamlessly integrated with Amazon Redshift, can empower your organization with efficiency in handling large-scale geospatial data.

About Amazon Redshift

Thousands of customers rely on Amazon Redshift to analyze data from terabytes to petabytes and run complex analytical queries.

With Amazon Redshift, you can get real-time insights and predictive analytics on all of your data across your operational databases, data lake, data warehouse, and third-party datasets. It delivers this at a price performance that’s up to three times better than other cloud data warehouses out of the box, helping you keep your costs predictable.

Amazon Redshift provides capabilities likeAmazon Redshift spatial analytics, Amazon Redshift streaming analytics, Amazon Redshift ML and Amazon Redshift Serverless to further simplify application building and make it easier, simpler, and faster for independent software vendors (ISVs) to embed rich data analytics capabilities within their applications.

With Amazon Redshift serverless, ISVs can run and scale analytics quickly without the need to set up and manage data warehouse infrastructure. Developers, data analysts, business professionals, and data scientists can go from data to insights in seconds by simply loading and querying in the data warehouse.

To request a demo of Amazon Redshift, visit Amazon Redshift free trial or to get started on your own, visit Getting started with Amazon Redshift.

About CARTO

From smartphones to connected cars, location data is changing the way we live and the way we run businesses. Everything happens somewhere, but visualizing data to see where things are isn’t the same as understanding why they happen there. CARTO is the world’s leading cloud-based location intelligence platform, enabling organizations to use spatial data and analysis for more efficient delivery routes, better behavioral marketing, strategic store placements, and much more.

Data scientists, developers, and analysts use CARTO to optimize business processes and predict future outcomes through the power of spatial data science. To learn more, visit CARTO.


About the authors

Ravi Animi is a senior product leader in the Amazon Redshift team and manages several functional areas of the Amazon Redshift cloud data warehouse service, including spatial analytics, streaming analytics, query performance, Spark integration, and analytics business strategy. He has experience with relational databases, multidimensional databases, IoT technologies, storage and compute infrastructure services, and more recently, as a startup founder in the areas of artificial intelligence (AI) and deep learning, computer vision, and robotics.

Ioanna Tsalouchidou is a software development engineer in the Amazon Redshift team focusing on spatial analytics and query processing. She holds a PhD in graph algorithms from UPF Spain and a Masters in distributed systems and computing from KTH Sweden and UPC Spain.

Hinnerk Gildhoff is a senior engineering leader in the Amazon Redshift team leading query processing, spatial analytics, materialized views, autonomics, query languages and more. Prior to joining Amazon, Hinnerk spent over a decade as both an engineer and a manager in the field of in-memory and cluster computing, specializing in building databases and distributed systems.

Javier de la Torre is founder and Chief Strategy Officer of CARTO, has been instrumental in advancing the geospatial industry. At CARTO, he’s led innovations in location intelligence. He also serves on the Open Geospatial Consortium board, aiding in the development of standards like geoparquet. Javier’s commitment extends to environmental causes through his work with Tierra Pura, focusing on climate change and conservation, demonstrating his dedication to using data for global betterment.

Analyze Elastic IP usage history using Amazon Athena and AWS CloudTrail

Post Syndicated from Aidin Khosrowshahi original https://aws.amazon.com/blogs/big-data/analyze-elastic-ip-usage-history-using-amazon-athena-and-aws-cloudtrail/

An AWS Elastic IP (EIP) address is a static, public, and unique IPv4 address. Allocated exclusively to your AWS account, the EIP remains under your control until you decide to release it. It can be allocated to your Amazon Elastic Compute Cloud (Amazon EC2) instance or other AWS resources such as load balancers.

EIP addresses are designed for dynamic cloud computing because they can be re-mapped to another instance to mask any disruptions. These EIPs are also used for applications that must make external requests to services that require a consistent address for allow listed inbound connections. As your application usage varies, these EIPs might see sporadic use over weeks or even months, leading to potential accumulation of unused EIPs that may inadvertently inflate your AWS expenditure.

In this post, we show you how to analyze EIP usage history using AWS CloudTrail and Amazon Athena to have a better insight of your EIP usage pattern in your AWS account. You can use this solution regularly as part of your cost-optimization efforts to safely remove unused EIPs to reduce your costs.

Solution overview

This solution uses activity logs from CloudTrail and the power of Athena to conduct a comprehensive analysis of historical EIP attachment activity within your AWS account. CloudTrail, a critical AWS service, meticulously logs API activity within an AWS account.

Athena is an interactive query service that simplifies data analysis in Amazon Simple Storage Service (Amazon S3) using standard SQL. It is a serverless service, eliminating the need for infrastructure management and costing you only for the queries you run.

By extracting detailed information from CloudTrail and querying it using Athena, this solution streamlines the process of data collection, analysis, and reporting of EIP usage within an AWS account.

To gather EIP usage reporting, this solution compares snapshots of the current EIPs, focusing on their most recent attachment within a customizable 3-month period. It then determines the frequency of EIP attachments to resources. An attachment count greater than zero suggests that the EIPs are actively in use. In contrast, an attachment count of zero indicates that these EIPs are idle and can be released, aiding in identifying potential areas for cost reduction.

In the following sections, we show you how to deploy the solution using AWS CloudFormation and then run an analysis.

Prerequisites

Complete the following prerequisite steps:

  1. If your account doesn’t have CloudTrail enabled, create a trail, then capture the S3 bucket name to use later in the implementation steps.
  2. Download the CloudFormation template from the repository. You need this template.yaml file for the implementation steps.

Deploy the solution

In this section, you use AWS CloudFormation to create the required resources. AWS CloudFormation is a service that helps you model and set up your AWS resources so that you can spend less time managing those resources and more time focusing on your applications that run in AWS.

The CloudFormation template creates Athena views and a table to search past AssociateAddress events in CloudTrail, an AWS Lambda function to collect snapshots of existing EIPs, and an S3 bucket to store the analysis results.

Complete the following steps:

  1. On the AWS CloudFormation console, choose on Create stack and choose With new resources (standard).
  2. In the Specify Template section, choose an existing template and upload the template.yaml file downloaded from the prerequisites.
  3. In the Specify stack details section, enter your preferred stack name and the existing CloudTrail S3 location, and maintain the default settings for the other parameters.
  4. At the bottom of the Review and create page, select the acknowledgement check box, then choose Submit.

Wait for the stack to be created. It should take a few minutes to complete. You can open the AWS CloudFormation console to view the stack creation process.

Run an analysis

You have configured the solution to run your EIP attachments analysis. Complete the following steps to analyze your EIP attachment history. If you’re using Athena for the first time in your account, you need to set up a query result location in Amazon S3.

  1. On the Athena console, navigate to the query editor.
  2. For Database, choose default.
  3. Enter the following query and choose Run query:
select 
eip.publicip,
eip.allocationid,
eip.region,
eip.accountid,
eip.associationid, 
eip.PublicIpv4Pool,
max(associate_ip_event.eventtime) as latest_attachment,
count(associate_ip_event.associationid) as attachmentCount
from eip LEFT JOIN associate_ip_event on associate_ip_event.allocationid = eip.allocationid 
group by 1,2,3,4,5,6

All the required tables are created under the default database.

You can now run a query on the CloudTrail logs to look back in time for the EIP attachment. This query provides you with better insight to safely release idle EIPs in order to reduce costs by displaying how frequently each specific EIP was previously attached to any resources.

This report will provide the following information:

  • Public IP
  • Allocation ID (the ID that AWS assigns to represent the allocation of the EIP address for use with instances in a VPC)
  • Region
  • Account ID
  • latest_attachment date (the last time EIP was attached to a resource)
  • attachmentCount (number of attachments)
  • The association ID for the address (if this field is empty, the EIP is idle and not attached to any resources)

The following screenshot shows the query results.

Clean up

To optimize cost, clean up the resources you deployed for this post by completing the following steps:

  1. Delete the contents in your S3 buckets (eip-analyzer-eipsnapshot-* and eip-analyzer-athenaresulteipanalyzer-*).
  2. Delete the S3 buckets.
  3. On the AWS CloudFormation console, delete the stack you created.

Conclusion

This post demonstrated how you can analyze Elastic IP usage history to have a better insight of EIP attachment patterns using Athena and CloudTrail. Check out the GitHub repo to regularly run this analysis as part of your cost-optimization strategy to identify and release inactive EIPs to reduce costs.

You can also use Athena to analyze logs from other AWS services; for more information, see Querying AWS service logs.

Additionally, you can analyze activity logs with AWS CloudTrail Lake and Amazon Athena. AWS CloudTrail Lake is a managed data lake that enables organizations to aggregate, immutably store, and query events recorded by CloudTrail for auditing, security investigation, and operational troubleshooting. AWS CloudTrail Lake supports the collection of events from multiple AWS regions and AWS accounts. For CloudTrail Lake, you pay for data ingestion, retention, and analysis. Refer to AWS CloudTrail Lake pricing page for pricing details.


About the Author

Aidin Khosrowshahi is a Senior Technical Account Manager with Amazon Web Services based out of San Francisco. He focuses on reliability, optimization, and improving operational mechanisms with his customers.

Use AWS Glue Data Catalog views to analyze data

Post Syndicated from Leonardo Gomez original https://aws.amazon.com/blogs/big-data/use-aws-glue-data-catalog-views-to-analyze-data/

In this post, we show you how to use the new views feature the AWS Glue Data Catalog. SQL views are a powerful object used across relational databases. You can use views to decrease the time to insights of data by tailoring the data that is queried. Additionally, you can use the power of SQL in a view to express complex boundaries in data across multiple tables that can’t be expressed with simpler permissions. Data lakes provide customers the flexibility required to derive useful insights from data across many sources and many use cases. Data consumers can consume data where they need to across lines of business, increasing the velocity of insights generation.

Customers use many different processing engines in their data lakes, each of which have their own version of views with different capabilities. The AWS Glue Data Catalog and AWS Lake Formation provide a central location to manage your data across data lake engines.

AWS Glue has released a new feature, SQL views, which allows you to manage a single view object in the Data Catalog that can be queried from SQL engines. You can create a single view object with a different SQL version for each engine you want to query, such as Amazon Athena, Amazon Redshift, and Spark SQL on Amazon EMR. You can then manage access to these resources using the same Lake Formation permissions that are used to control tables in the data lake.

Solution overview

For this post, we use the Women’s E-Commerce Clothing Review. The objective is to create views in the Data Catalog so you can create a single common view schema and metadata object to use across engines (in this case, Athena). Doing so lets you use the same views across your data lakes to fit your use case. We create a view to mask the customer_id column in this dataset, then we will share this view to another user so that they can query this masked view.

Prerequisites

Before you can create a view in the AWS Glue Data Catalog, make sure that you have an AWS Identity and Access Management (IAM) role with the following configuration:

  • The following trust policy:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": [
               "glue.amazonaws.com",
               "lakeformation.amazonaws.com"
            ]
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }

  • The following pass role policy:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "Stmt1",
          "Action": [
            "iam:PassRole"
          ],
          "Effect": "Allow",
          "Resource": "*",
          "Condition": {
             "StringEquals": {
               "iam:PassedToService": [
                 "glue.amazonaws.com",
                 "lakeformation.amazonaws.com"
               ]
             }
           }
         }
       ]
    }

  • Finally, you will also need the following permissions:
    • "Glue:GetDatabase",
    • "Glue:GetDatabases",
    • "Glue:CreateTable",
    • "Glue:GetTable",
    • "Glue:UpdateTable",
    • "Glue:DeleteTable",
    • "Glue:GetTables",
    • "Glue:SearchTables",
    • "Glue:BatchGetPartition",
    • "Glue:GetPartitions",
    • "Glue:GetPartition",
    • "Glue:GetTableVersion",
    • "Glue:GetTableVersions"

Run the AWS CloudFormation template

You can deploy the AWS CloudFormation template glueviewsblog.yaml to create the Lake Formation database and table. The dataset will be loaded into an Amazon Simple Storage Service (Amazon S3) bucket.

For step-by-step instructions, refer to Creating a stack on the AWS CloudFormation console.

When the stack is complete, you can see a table called clothing_parquet on the Lake Formation console, as shown in the following screenshot.

Create a view on the Athena console

Now that you have your Lake Formation managed table, you can open the Athena console and create a Data Catalog view. Complete the following steps:

  1. In the Athena query editor, run the following query on the Parquet dataset:
SELECT * FROM "clothing_reviews"."clothing_parquet" limit 10;

In the query results, the customer_id column is currently visible.

Next, you create a view called hidden_customerID and mask the customer_id column.

  1. Create a view called hidden_customerID:
CREATE PROTECTED MULTI DIALECT VIEW clothing_reviews.hidden_customerid SECURITY DEFINER AS 
SELECT * FROM clothing_reviews.clothing_parquet

In the following screenshot, you can see a view called hidden_customerID was successfully created.

  1. Run the following query to mask the first four characters of the customer_id column for the newly generated view:
ALTER VIEW clothing_reviews.hidden_customerid UPDATE DIALECT AS
SELECT '****' || substring(customer_id, 4) as customer_id,clothing_id,age,title,review_text,rating,recommend_ind,positive_feedback,division_name,department_name,class_name 
FROM clothing_reviews.clothing_parquet

You can see in the following screenshot that the view hidden_customerID has the customer_id column’s first four characters masked.

The original table clothing_parquet remains the same unmasked.

Grant access of the view to another user to query

Data Catalog views allow you to use Lake Formation to control access. In this step, you grant this view to another user called amazon_business_analyst and then query from that user.

  1. Sign in to the Lake Formation console as admin.
  2. In the navigation pane, choose Views.

As shown in the following screenshot, you can see the hidden_customerid view.

  1. Sign in as the amazon_business_analyst user and navigate to the Views page.

This user has no visibility to the view.

  1. Grant permission to the amazon_business_analyst user from the data lake admin.
  1. Sign in again as amazon_business_analyst and navigate to the Views page.

  1. On the Athena console, query the hidden_customerid view.

You have successfully shared a view to the user and queried it from the Athena console.

Clean up

To avoid incurring future charges, delete the CloudFormation stack. For instructions, refer to Deleting a stack on the AWS CloudFormation console.

Conclusion

In this post, we demonstrated how to use the AWS Glue Data Catalog to create views. We then showed how to alter the views and mask the data. You can share the view with different users to query using Athena. For more information about this new feature, refer to Using AWS Glue Data Catalog views.


About the Authors

Leonardo Gomez is a Principal Analytics Specialist Solutions Architect at AWS. He has over a decade of experience in data management, helping customers around the globe address their business and technical needs. Connect with him on LinkedIn

Michael Chess – is a Product Manager on the AWS Lake Formation team based out of Palo Alto, CA. He specializes in permissions and data catalog features in the data lake.

Derek Liu – is a Senior Solutions Architect based out of Vancouver, BC. He enjoys helping customers solve big data challenges through AWS analytic services.

Revolutionizing data querying: Amazon Redshift and Visual Studio Code integration

Post Syndicated from Navnit Shukla original https://aws.amazon.com/blogs/big-data/revolutionizing-data-querying-amazon-redshift-and-visual-studio-code-integration/

In today’s data-driven landscape, the efficiency and accessibility of querying tools play a crucial role in driving businesses forward. Amazon Redshift recently announced integration with Visual Studio Code (), an action that transforms the way data practitioners engage with Amazon Redshift and reshapes your interactions and practices in data management. This innovation not only unlocks new possibilities, but also tackles long-standing challenges in data analytics and query handling.

While the Amazon Redshift query editor v2 (QE v2) offers a smooth experience for data analysts and business users, many organizations have data engineers and developers who rely on VS Code as their primary development tool. Traditionally, they had to use QE v2 for their development tasks, which wasn’t the most optimal solution. However, this new feature resolves that issue by enabling data engineers and developers to seamlessly integrate their development work within VS Code, enhancing their workflow efficiency.

Visual Studio Code’s integration simplifies access to database objects within Redshift data warehouses, offering an interface you’re already familiar with to run and troubleshoot your code.

By integrating Amazon Redshift Provisioned cluster, and Amazon Redshift Serverless with the popular and free VS Code, you can alleviate concerns about costs associated with third-party tools. This integration allows you to reduce or eliminate licensing expenses for query authoring and data visualization, because these functionalities are now available within the free VSCode editor.

The support for Amazon Redshift within VS Code marks a significant leap towards a more streamlined, cost-effective, and user-friendly data querying experience.

In this post, we explore how to kickstart your journey with Amazon Redshift using the AWS Toolkit for VS Code.

Solution overview

This post outlines the procedure for creating a secure and direct connection between your local VS Code environment and the Redshift cluster. Emphasizing both security and accessibility, this solution allows you to operate within the familiar VS Code interface while seamlessly engaging with your Redshift database.

The following diagram illustrates the VS Code connection to Amazon Redshift deployed in a private VPC.

To connect to a data warehouse using VS Code from the Toolkit, you can choose from the following methods:

  • Use a database user name and password
  • Use AWS Secrets Manager
  • Use temporary credentials (this option is only available with Amazon Redshift Provisioned cluster)

In the following sections, we show how to establish a connection with a database situated on an established provisioned cluster or a serverless data warehouse from the Toolkit.

Prerequisites

Before you begin using Amazon Redshift Provisioned Cluster  and Amazon Redshift Serverless with the AWS Toolkit for Visual Studio Code, make sure you’ve completed the following requirements:

  1. Connect to your AWS account using the Toolkit.
  2. Set up a Amazon Redshift or Amazon Redshift serverless data warehouse.

Establish a connection to your data warehouse using user credentials

To connect using the database user name and password, complete the following steps:

  1. Navigate through the Toolkit explorer, expanding the AWS Region housing your data warehouse (for example, US East (N. Virginia)).
  2. In the Toolkit, expand the Redshift section and choose your specific data warehouse.
  3. In the Select a Connection Type dialog, choose Database user name and password and provide the necessary information requested by the prompts.

After the Toolkit establishes the connection to your data warehouse, you will be able to view your available databases, tables, and schemas directly in the Toolkit explorer.

Establish a connection to your data warehouse using Secrets Manager

To connect using Secrets Manager, complete the following steps:

  1. Navigate through the Toolkit explorer, expanding the AWS Region housing your data warehouse.
  2. In the Toolkit, expand the Redshift section and choose your specific data warehouse.
  3. In the Select a Connection Type dialog, choose Secrets Manager and fill in the information requested at each prompt.

After the Toolkit establishes a successful connection to your data warehouse, you’ll gain visibility into your databases, tables, and schemas directly in the Toolkit explorer.

Establish a connection to your Amazon Redshift Provisioned cluster using Temporary credentials:

To connect using Temporary credentials complete the following steps:

  1. Navigate through the Toolkit explorer, expanding the AWS Region housing your data warehouse.
  2. In the Toolkit, expand the Redshift section and choose your specific data warehouse.
  3. In the Select a Connection Type dialog, choose Temporary Credentials and fill in the information requested at each prompt.

Run SQL statements

We have successfully established the connection. The next step involves running some SQL. The steps outlined in this section detail the process of generating and running SQL statements within your database using the Toolkit for Visual Studio Code.

  1. Navigate to the Toolkit explorer and expand Redshift, then choose the data warehouse that stores the desired database for querying.
  2. Choose Create Notebook and specify a file name and location for saving your notebook locally.
  3. Choose OK to open the notebook in your VS Code editor.
  4. Enter the following SQL statements into the VS Code editor, which will be stored in this notebook:
    create table promotion
    (
        p_promo_sk                integer               not null,
        p_promo_id                char(16)              not null,
        p_start_date_sk           integer                       ,
        p_end_date_sk             integer                       ,
        p_item_sk                 integer                       ,
        p_cost                    decimal(15,2)                 ,
        p_response_target         integer                       ,
        p_promo_name              char(50)                      ,
        p_channel_dmail           char(1)                       ,
        p_channel_email           char(1)                       ,
        p_channel_catalog         char(1)                       ,
        p_channel_tv              char(1)                       ,
        p_channel_radio           char(1)                       ,
        p_channel_press           char(1)                       ,
        p_channel_event           char(1)                       ,
        p_channel_demo            char(1)                       ,
        p_channel_details         varchar(100)                  ,
        p_purpose                 char(15)                      ,
        p_discount_active         char(1)                       ,
        primary key (p_promo_sk)
    ) diststyle all;
    
    create table reason
    (
        r_reason_sk               integer               not null,
        r_reason_id               char(16)              not null,
        r_reason_desc             char(100)                     ,
        primary key (r_reason_sk)
    ) diststyle all ;
    
    
    create table ship_mode
    (
        sm_ship_mode_sk           integer               not null,
        sm_ship_mode_id           char(16)              not null,
        sm_type                   char(30)                      ,
        sm_code                   char(10)                      ,
        sm_carrier                char(20)                      ,
        sm_contract               char(20)                      ,
        primary key (sm_ship_mode_sk)
    ) diststyle all;
    
    
    copy promotion from 's3://redshift-downloads/TPC-DS/2.13/1TB/promotion/' iam_role default gzip delimiter '|' EMPTYASNULL region 'us-east-1';
    copy reason from 's3://redshift-downloads/TPC-DS/2.13/1TB/reason/' iam_role default gzip delimiter '|' EMPTYASNULL region 'us-east-1';
    copy ship_mode from 's3://redshift-downloads/TPC-DS/2.13/1TB/ship_mode/' iam_role default gzip delimiter '|' EMPTYASNULL region 'us-east-1';
    
    
    select * from promotion limit 10;
    
    drop table promotion;
    drop table reason;
    drop table ship_mode;

  5. Choose Run All to run the SQL statements.

The output corresponding to your SQL statements will be visible below the entered statements within the editor.

Include markdown in a notebook

To include markdown in your notebook, complete the following steps:

  1. Access your notebook within the VS Code editor and choose Markdown to create a markdown cell.
  2. Enter your markdown content within the designated cell.
  3. Use the editing tools in the upper-right corner of the markdown cell to modify the markdown content as needed.

Congratulations, you have learned the art of using the VS Code editor to effectively interface with your Redshift environment.

Clean up

To remove the connection, complete the following steps:

  1. In the Toolkit explorer, expand Redshift, and choose the data warehouse containing your database.
  2. Choose the database (right-click) and choose Delete Connection.

Conclusion

In this post, we explored the process of using VS Code to establish a connection with Amazon Redshift, streamlining access to database objects within Redshift data warehouses.

You can learn about Amazon Redshift from Getting started with Amazon Redshift guide. Know more about write and run SQL queries directly in VS Code with the new AWS Toolkit for VS Code integration.


About the Author

Navnit Shukla, an AWS Specialist Solution Architect specializing in Analytics, is passionate about helping clients uncover valuable insights from their data. Leveraging his expertise, he develops inventive solutions that empower businesses to make informed, data-driven decisions. Notably, Navnit Shukla is the accomplished author of the book “Data Wrangling on AWS,” showcasing his expertise in the field.

Detect and handle data skew on AWS Glue

Post Syndicated from Salim Tutuncu original https://aws.amazon.com/blogs/big-data/detect-and-handle-data-skew-on-aws-glue/

AWS Glue is a fully managed, serverless data integration service provided by Amazon Web Services (AWS) that uses Apache Spark as one of its backend processing engines (as of this writing, you can use Python Shell, Spark, or Ray).

Data skew occurs when the data being processed is not evenly distributed across the Spark cluster, causing some tasks to take significantly longer to complete than others. This can lead to inefficient resource utilization, longer processing times, and ultimately, slower performance. Data skew can arise from various factors, including uneven data distribution, skewed join keys, or uneven data processing patterns. Even though the biggest issue is often having nodes running out of disk during shuffling, which leads to nodes falling like dominoes and job failures, it’s also important to mention that data skew is hidden. The stealthy nature of data skew means it can often go undetected because monitoring tools might not flag an uneven distribution as a critical issue, and logs don’t always make it evident. As a result, a developer may observe that their AWS Glue jobs are completing without apparent errors, yet the system could be operating far from its optimal efficiency. This hidden inefficiency not only increases operational costs due to longer runtimes but can also lead to unpredictable performance issues that are difficult to diagnose without a deep dive into the data distribution and task run patterns.

For example, in a dataset of customer transactions, if one customer has significantly more transactions than the others, it can cause a skew in the data distribution.

Identifying and handling data skew issues is key to having good performance on Apache Spark and therefore on AWS Glue jobs that use Spark as a backend. In this post, we show how you can identify data skew and discuss the different techniques to mitigate data skew.

How to detect data skew

When an AWS Glue job has issues with local disks (split disk issues), doesn’t scale with the number of workers, or has low CPU usage (you can enable Amazon CloudWatch metrics for your job to be able to see this), you may have a data skew issue. You can detect data skew with data analysis or by using the Spark UI. In this section, we discuss how to use the Spark UI.

The Spark UI provides a comprehensive view of Spark applications, including the number of tasks, stages, and their duration. To use it you need to enable Spark UI event logs for your job runs. It is enabled by default on Glue console and once enabled, Spark event log files will be created during the job run and stored in your S3 bucket. Then, those logs are parsed, and you can use the AWS Glue serverless Spark UI to visualize them. You can refer to this blogpost for more details. In those jobs where the AWS Glue serverless Spark UI does not work as it has a limit of 512 MB of logs, you can set up the Spark UI using an EC2 instance.

You can use the Spark UI to identify which tasks are taking longer to complete than others, and if the data distribution among partitions is balanced or not (remember that in Spark, one partition is mapped to one task). If there is data skew, you will see that some partitions have significantly more data than others. The following figure shows an example of this. We can see that one task is taking a lot more time than the others, which can indicate data skew.

Another thing that you can use is the summary metrics for each stage. The following screenshot shows another example of data skew.

These metrics represent the task-related metrics below which a certain percentage of tasks completed. For example, the 75th percentile task duration indicates that 75% of tasks completed in less time than this value. When the tasks are evenly distributed, you will see similar numbers in all the percentiles. When there is data skew, you will see very biased values in each percentile. In the preceding example, it didn’t write many shuffle files (less than 50 MiB) in Min, 25th percentile, Median, and 75th percentile. However, in Max, it wrote 460 MiB, 10 times the 75th percentile. It means there was at least one task (or up to 25% of tasks) that wrote much bigger shuffle files than the rest of the tasks. You can also see that the duration of the tax in Max is 46 seconds and the Median is 2 seconds. These are all indicators that your dataset may have data skew.

AWS Glue interactive sessions

You can use interactive sessions to load your data from the AWS Glue Data Catalog or just use Spark methods to load the files such as Parquet or CSV that you want to analyze. You can use a similar script to the following to detect data skew from the partition size perspective; the more important issue is related to data skew while shuffling, and this script does not detect that kind of skew:

from pyspark.sql.functions import spark_partition_id, asc, desc
#input_dataframe being the dataframe where you want to check for data skew
partition_sizes_df=input_dataframe\
    .withColumn("partitionId", spark_partition_id())\
    .groupBy("partitionId")\
    .count()\
    .orderBy(asc("count"))\
    .withColumnRenamed("count","partition_size")
#calculate average and standar deviation for the partition sizes
avg_size = partition_sizes_df.agg({"partition_size": "avg"}).collect()[0][0]
std_dev_size = partition_sizes_df.agg({"partition_size": "stddev"}).collect()[0][0]

""" 
 the code calculates the absolute difference between each value in the "partition_size" column and the calculated average (avg_size).
 then, calculates twice the standard deviation (std_dev_size) and use 
 that as a boolean mask where the condition checks if the absolute difference is greater than twice the standard deviation
 in order to mark a partition 'skewed'
"""
skewed_partitions_df = partition_sizes_df.filter(abs(partition_sizes_df["partition_size"] - avg_size) > 2 * std_dev_size)
if skewed_partitions_df.count() > 0:
    skewed_partitions = [row["partition_id"] for row in skewed_partitions_df.collect()]
    print(f"The following partitions have significantly different sizes: {skewed_partitions}")
else:
    print("No data skew detected.")

You can calculate the average and standard deviation of partition sizes using the agg() function and identify partitions with significantly different sizes using the filter() function, and you can print their indexes if any skewed partitions are detected. Otherwise, the output prints that no data skew is detected.

This code assumes that your data is structured, and you may need to modify it if your data is of a different type.

How to handle data skew

You can use different techniques in AWS Glue to handle data skew; there is no single universal solution. The first thing to do is confirm that you’re using latest AWS Glue version, for example AWS Glue 4.0 based on Spark 3.3 has enabled by default some configs like Adaptative Query Execution (AQE) that can help improve performance when data skew is present.

The following are some of the techniques that you can employ to handle data skew:

  • Filter and perform – If you know which keys are causing the skew, you can filter them out, perform your operations on the non-skewed data, and then handle the skewed keys separately.
  • Implementing incremental aggregation – If you are performing a large aggregation operation, you can break it up into smaller stages because in large datasets, a single aggregation operation (like sum, average, or count) can be resource-intensive. In those cases, you can perform intermediate actions. This could involve filtering, grouping, or additional aggregations. This can help distribute the workload across the nodes and reduce the size of intermediate data.
  • Using a custom partitioner – If your data has a specific structure or distribution, you can create a custom partitioner that partitions your data based on its characteristics. This can help make sure that data with similar characteristics is in the same partition and reduce the size of the largest partition.
  • Using broadcast join – If your dataset is small but exceeds the spark.sql.autoBroadcastJoinThreshold value (default is 10 MB), you have the option to either provide a hint to use broadcast join or adjust the threshold value to accommodate your dataset. This can be an effective strategy to optimize join operations and mitigate data skew issues resulting from shuffling large amounts of data across nodes.
  • Salting – This involves adding a random prefix to the key of skewed data. By doing this, you distribute the data more evenly across the partitions. After processing, you can remove the prefix to get the original key values.

These are just a few techniques to handle data skew in PySpark; the best approach will depend on the characteristics of your data and the operations you are performing.

The following is an example of joining skewed data with the salting technique:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, ceil, rand, concat, col

# Define the number of salt values
num_salts = 3

# Function to identify skewed keys
def identify_skewed_keys(df, key_column, threshold):
    key_counts = df.groupBy(key_column).count()
    return key_counts.filter(key_counts['count'] > threshold).select(key_column)

# Identify skewed keys
skewed_keys = identify_skewed_keys(skewed_data, "key", skew_threshold)

# Splitting the dataset
skewed_data_subset = skewed_data.join(skewed_keys, ["key"], "inner")
non_skewed_data_subset = skewed_data.join(skewed_keys, ["key"], "left_anti")

# Apply salting to skewed data
skewed_data_subset = skewed_data_subset.withColumn("salt", ceil((rand() * 10) % num_salts))
skewed_data_subset = skewed_data_subset.withColumn("salted_key", concat(col("key"), lit("_"), col("salt")))

# Replicate skewed rows in non-skewed dataset
def replicate_skewed_rows(df, keys, multiplier):
    replicated_df = df.join(keys, ["key"]).crossJoin(spark.range(multiplier).withColumnRenamed("id", "salt"))
    replicated_df = replicated_df.withColumn("salted_key", concat(col("key"), lit("_"), col("salt")))
    return replicated_df.drop("salt")

replicated_non_skewed_data = replicate_skewed_rows(non_skewed_data, skewed_keys, num_salts)

# Perform the JOIN operation on the salted keys for skewed data
result_skewed = skewed_data_subset.join(replicated_non_skewed_data, "salted_key")

# Perform regular join on non-skewed data
result_non_skewed = non_skewed_data_subset.join(non_skewed_data, "key")

# Combine results
final_result = result_skewed.union(result_non_skewed)

In this code, we first define a salt value, which can be a random integer or any other value. We then add a salt column to our DataFrame using the withColumn() function, where we set the value of the salt column to a random number using the rand() function with a fixed seed. The function replicate_salt_rows is defined to replicate each row in the non-skewed dataset (non_skewed_data) num_salts times. This ensures that each key in the non-skewed data has matching salted keys. Finally, a join operation is performed on the salted_key column between the skewed and non-skewed datasets. This join is more balanced compared to a direct join on the original key, because salting and replication have mitigated the data skew.

The rand() function used in this example generates a random number between 0–1 for each row, so it’s important to use a fixed seed to achieve consistent results across different runs of the code. You can choose any fixed integer value for the seed.

The following figures illustrate the data distribution before (left) and after (right) salting. Heavily skewed key2 identified and salted into key2_0, key2_1, and key2_2, balancing the data distribution and preventing any single node from being overloaded. After processing, the results can be aggregated back, so that that the final output is consistent with the unsalted key values.

Other techniques to use on skewed data during the join operation

When you’re performing skewed joins, you can use salting or broadcasting techniques, or divide your data into skewed and regular parts before joining the regular data and broadcasting the skewed data.

If you are using Spark 3, there are automatic optimizations for trying to optimize Data Skew issues on joins. Those can be tuned because they have dedicated configs on Apache Spark.

Conclusion

This post provided details on how to detect data skew in your data integration jobs using AWS Glue and different techniques for handling it. Having a good data distribution is key to achieving the best performance on distributed processing systems like Apache Spark.

Although this post focused on AWS Glue, the same concepts apply to jobs you may be running on Amazon EMR using Apache Spark or Amazon Athena for Apache Spark.

As always, AWS welcomes your feedback. Please leave your comments and questions in the comments section.


About the Authors

Salim Tutuncu is a Sr. PSA Specialist on Data & AI, based from Amsterdam with a focus on the EMEA North and EMEA Central regions. With a rich background in the technology sector that spans roles as a Data Engineer, Data Scientist, and Machine Learning Engineer, Salim has built a formidable expertise in navigating the complex landscape of data and artificial intelligence. His current role involves working closely with partners to develop long-term, profitable businesses leveraging the AWS Platform, particularly in Data and AI use cases.

Angel Conde Manjon is a Sr. PSA Specialist on Data & AI, based in Madrid, and focuses on EMEA South and Israel. He has previously worked on research related to Data Analytics and Artificial Intelligence in diverse European research projects. In his current role, Angel helps partners develop businesses centered on Data and AI.

How Fujitsu implemented a global data mesh architecture and democratized data

Post Syndicated from Kanehito Miyake original https://aws.amazon.com/blogs/big-data/how-fujitsu-implemented-a-global-data-mesh-architecture-and-democratized-data/

This is a guest post co-authored with Kanehito Miyake, Engineer at Fujitsu Japan. 

Fujitsu Limited was established in Japan in 1935. Currently, we have approximately 120,000 employees worldwide (as of March 2023), including group companies. We develop business in various regions around the world, starting with Japan, and provide digital services globally. To provide a variety of products, services, and solutions that are better suited to customers and society in each region, we have built business processes and systems that are optimized for each region and its market.

However, in recent years, the IT market environment has changed drastically, and it has become difficult for the entire group to respond flexibly to the individual market situation. Moreover, we are challenged not only to revisit individual products, services, and solutions, but also to reinvent entire business processes and operations.

To transform Fujitsu from an IT company to a digital transformation (DX) company, and to become a world-leading DX partner, Fujitsu has declared a shift to data-driven management. We built the OneFujitsu program, which standardizes business projects and systems throughout the company, including the domestic and overseas group companies, and tackles the major transformation of the entire company under the program.

To achieve data-driven management, we built OneData, a data utilization platform used in the four global AWS Regions, which started operation in April 2022. As of November 2023, more than 200 projects and 37,000 users were onboarded. The platform consists of approximately 370 dashboards, 360 tables registered in the data catalog, and 40 linked systems. The data size stored in Amazon Simple Storage Service (Amazon S3) exceeds 100 TB, including data processed for use in each project.

In this post, we introduce our OneData initiative. We explain how Fujitsu worked to solve the aforementioned issues and introduce an overview of the OneData design concept and its implementation. We hope this post will provide some guidance for architects and engineers.

Challenges

Like many other companies struggling with data utilization, Fujitsu faced some challenges, which we discuss in this section.

Siloed data

In Fujitsu’s long history, we restructured organizations by merging affiliated companies into Fujitsu. Although organizational integration has progressed, there are still many systems and mechanisms customized for individual context. There are also many systems and mechanisms overlapping across different organizations. For this reason, it takes a lot of time and effort to discover, search, and integrate data when analyzing the entire company using a common standard. This situation makes it difficult for management to grasp business trends and make decisions in a timely manner.

Under these circumstances, the OneFujitsu program is designed have one system per one business globally. Core systems such as ERP and CRM are being integrated and unified in order to not have silos. It will make it easier for users to utilize data across different organizations for specific business areas.

However, to spread a culture of data-driven decision-making not only in management but also in every organization, it is necessary to have a mechanism that enables users to easily discover various types of data in organizations, and then analyze the data quickly and flexibly when needed.

Excel-based data utilization

Microsoft Excel is available on almost everyone’s PC in the company, and it helps lower the hurdles when starting to utilize data. However, Excel is mainly designed for spreadsheets; it’s not designed for large-scale data analytics and automation. Excel files tend to contain a mixture of data and procedures (functions, macros), and many users casually copy files for one-time use cases. It introduces complexity to keep both data and procedures up to date. Furthermore, it tends to require domain-specific knowledge to manage the Excel files for individual context.

For those reasons, it was extremely difficult for Fujitsu to manage and utilize data at scale with Excel.

Solution overview

OneData defines three personas:

  • Publisher – This role includes the organizational and management team of systems that serve as data sources. Responsibilities include:
    • Load raw data from the data source system at the appropriate frequency.
    • Provide and keep up to date with technical metadata for loaded data.
    • Perform the cleansing process and format conversion of raw data as needed.
    • Grant access permissions to data based on the requests from data users.
  • Consumer – Consumers are organizations and projects that use the data. Responsibilities include:
    • Look for the data to be used from the technical data catalog and request access to the data.
    • Handle the process and conversion of data into a format suitable for their own use (such as fact-dimension) with granted referencing permissions.
    • Configure business intelligence (BI) dashboards to provide data-driven insights to end-users targeted by the consumer’s project.
    • Use the latest data published by the publisher to update data as needed.
    • Promote and expand the use of databases.
  • Foundation – This role encompasses the data steward and governance team. Responsibilities include:
    • Provide a preprocessed, generic dataset of data commonly used by many consumers.
    • Manage and guide metrics for the quality of data published by each publisher.

Each role has sub-roles. For example, the consumer role has the following sub-roles with different responsibilities:

  • Data engineer – Create data process for analysis
  • Dashboard developer – Create a BI dashboard
  • Dashboard viewer – Monitor the BI dashboard

The following diagram describes how OneData platform works with those roles.

Let’s look at the key components of this architecture in more detail.

Publisher and consumer

In the OneData platform, the publisher is per each data source system, and the consumer is defined per each data utilization project. OneData provides an AWS account for each.

This enables the publisher to cleanse data and the consumer to process and analyze data at scale. In addition, by properly separating data and processing, it becomes effortless for the teams and organizations to share, manage, and inherit processes that were traditionally confined to individual PCs.

Foundation

When the teams don’t have a robust enough skillset, it can require more time to model and process data, and cause longer latency and lower data quality. It can also contribute to lower utilization by end-users. To address this, the foundation role provides an already processed dataset as a generic data model for data commonly use cases used by many consumers. This enables high-quality data available to each consumer. Here, the foundation role takes the lead in compiling the knowledge of domain experts and making data suitable for analysis. It is also an effective approach that eliminates duplicates for consumers. In addition, the foundation role monitors the state of the metadata, data quality indicators, data permissions, information classification labels, and so on. It is crucial in data governance and data management.

BI and visualization

Individual consumers have a dedicated space in a BI tool. In the past, if users wanted to go beyond simple data visualization using Excel, they had to build and maintain their own BI tools, which caused silos. By unifying these BI tools, OneData lowers the difficulty for consumers to use BI tools, and centralizes operation and maintenance, achieving optimization on a company-wide scale.

Additionally, to keep portability between BI tools, OneData recommends users transform data within the consumer AWS account instead of transforming data in the BI tool. With this approach, BI tool loads data from AWS Glue Data Catalog tables through an Amazon Athena JDBC/ODBC driver without any further transformations.

Deployment and operational excellence

To provide OneData as a common service for Fujitsu and group companies around the world, Regional OneData has been deployed in several locations. Regional OneData represents a unit of system configurations, and is designed to provide lower network latency for platform users, and be optimized for local languages, working hours for system operations and support, and region-specific legal restrictions, such as data residency and personal information protection.

The Regional Operations Unit (ROU), a virtual organization that brings together members from each region, is responsible for operating regional OneData in each of these regions. OneData HQ is responsible for supervising these ROUs, as well as planning and managing the entire OneData.

In addition, we have a specially positioned OneData called Global OneData, where global data utilization spans each region. Only the properly cleansed and sanitized data is transferred between each Regional OneData and Global OneData.

Systems such as ERP and CRM are accumulating data as a publisher for Global OneData, and the dashboards for executives in various regions to monitor business conditions with global metrics are also acting as a consumer for Global OneData.

Technical concepts

In this section, we discuss some of the technical concepts of the solution.

Large scale multi-account

We have adopted a multi-account strategy to provide AWS accounts for each project. Many publishers and consumers are already onboarded into OneData, and the number is expected to increase in the future. With this strategy, future usage expansion at scale can be achieved without affecting the users.

Also, this strategy allowed us to have clear boundaries in security, costs, and service quotas for each AWS service.

All the AWS accounts are deployed and managed through AWS Organizations and AWS Control Tower.

Serverless

Although we provide independent AWS accounts for each publisher and consumer, both operational costs and resource costs would be enormous if we accommodated individual user requests, such as, “I want a virtual machine or RDBMS to run specific tools for data processing.” To avoid such continuous operational and resource costs, we have adopted AWS serverless services for all the computing resources necessary for our activities as a publisher and consumer.

We use AWS Glue to preprocess, cleanse, and enrich data. Optionally, AWS Lambda or Amazon Elastic Container Service (Amazon ECS) with AWS Fargate can also be used based on preferences. We allow users to set up AWS Step Functions for orchestration and Amazon CloudWatch for monitoring. In addition, we provide Amazon Aurora Serverless PostgreSQL as standard for consumers, to meet their needs for data processing with extract, load, and transform (ELT) jobs. With this approach, only the consumer who requires those services will incur charges based on usage. We are able to take advantage of lower operational and resource costs thanks to the unique benefit of serverless (or more accurately, pay-as-you-go) services.

AWS provides many serverless services, and OneData has integrated them to provide scalability that allows active users to quickly provide the required capability as needed, while minimizing the cost for non-frequent users.

Data ownership and access control

In OneData, we have adopted a data mesh architecture where each publisher maintains ownership of data in a distributed and decentralized manner. When the consumer discovers the data they want to use, they request access from the publisher. The publisher accepts the request and grants permissions only when the request meets their own criteria. With the AWS Glue Data Catalog and AWS Lake Formation, there is no need to update S3 bucket policies or AWS Identity and Access Management (IAM) policies every time we allow access for individual data on an S3 data lake, and we can effortlessly grant the necessary permissions for the databases, tables, columns, and rows when needed.

Conclusion

Since the launch of OneData in April 2022, we have been persistently carrying out educational activities to expand the number of users and introducing success stories on our portal site. As a result, we have been promoting change management within the company and are actively utilizing data in each department. Regional OneData is being rolled out gradually, and we plan to further expand the scale of use in the future.

With its global expansion, the development of basic functions as a data utilization platform will reach a milestone. As we move forward, it will be important to make sure that OneData platform is used effectively throughout Fujitsu, while incorporating new technologies related to data analysis as appropriate. For example, we are preparing to provide more advanced machine learning functions using Amazon SageMaker Studio with OneData users and investigating the applicability of AWS Glue Data Quality to reduce the manual quality monitoring efforts. Furthermore, we are currently in the process of implementing Amazon DataZone through various initiatives and efforts, such as verifying its functionality and examining how it can operate while bridging the gap between OneData’s existing processes and to the ideal process we are aiming for ideals.

We have had the opportunity to discuss data utilization with various partners and customers and although individual challenges may differ in size and its context, the issues that we are currently trying to solve with OneData are common to many of them.

This post describes only a small portion of how Fujitsu tackled challenges using the AWS Cloud, but we hope the post will give you some inspiration to solve your own challenges.


About the Author


Kanehito Miyake is an engineer at Fujitsu Japan and in charge of OneData’s solution and cloud architecture. He spearheaded the architectural study of the OneData project and contributed greatly to promoting data utilization at Fujitsu with his expertise. He loves rockfish fishing.

Junpei Ozono is a Go-to-market Data & AI solutions architect at AWS in Japan. Junpei supports customers’ journeys on the AWS Cloud from Data & AI aspects and guides them to design and develop data-driven architectures powered by AWS services.

Introducing Amazon Q data integration in AWS Glue

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-amazon-q-data-integration-in-aws-glue/

Today, we’re excited to announce general availability of Amazon Q data integration in AWS Glue. Amazon Q data integration, a new generative AI-powered capability of Amazon Q Developer, enables you to build data integration pipelines using natural language. This reduces the time and effort you need to learn, build, and run data integration jobs using AWS Glue data integration engines.

Tell Amazon Q Developer what you need in English, it will return a complete job for you. For example, you can ask Amazon Q Developer to generate a complete extract, transform, and load (ETL) script or code snippet for individual ETL operations. You can troubleshoot your jobs by asking Amazon Q Developer to explain errors and propose solutions. Amazon Q Developer provides detailed guidance throughout the entire data integration workflow. Amazon Q Developer helps you learn and build data integration jobs using AWS Glue efficiently by generating the required AWS Glue code based on your natural language descriptions. You can create jobs that extract, transform, and load data that is stored in Amazon Simple Storage Service (Amazon S3), Amazon Redshift, and Amazon DynamoDB. Amazon Q Developer can also help you connect to third-party, software as a service (SaaS), and custom sources.

With general availability, we added new capabilities for you to author jobs using natural language. Amazon Q Developer can now generate complex data integration jobs with multiple sources, destinations, and data transformations. It can generate data integration jobs for extracts and loads to S3 data lakes including file formats like CSV, JSON, and Parquet, and ingestion into open table formats like Apache Hudi, Delta, and Apache Iceberg. It generates jobs for connecting to over 20 data sources, including relational databases like PostgreSQL, MySQL and Oracle; data warehouses like Amazon Redshift, Snowflake, and Google BigQuery; NoSQL databases like DynamoDB, MongoDB and OpenSearch; tables defined in the AWS Glue Data Catalog; and custom user-supplied JDBC and Spark connectors. Generated jobs can use a variety of data transformations, including filter, project, union, join, and custom user-supplied SQL.

Amazon Q data integration in AWS Glue helps you through two different experiences: the Amazon Q chat experience, and AWS Glue Studio notebook experience. This post describes the end-to-end user experiences to demonstrate how Amazon Q data integration in AWS Glue simplifies your data integration and data engineering tasks.

Amazon Q chat experience

Amazon Q Developer provides a conversational Q&A capability and a code generation capability for data integration. To start using the conversational Q&A capability, choose the Amazon Q icon on the right side of the AWS Management Console.

For example, you can ask, “How do I use AWS Glue for my ETL workloads?” and Amazon Q provides concise explanations along with references you can use to follow up on your questions and validate the guidance.

To start using the AWS Glue code generation capability, use the same window. On the AWS Glue console, start authoring a new job, and ask Amazon Q, “Please provide a Glue script that reads from Snowflake, renames the fields, and writes to Redshift.”

You will notice that the code is generated. With this response, you can learn and understand how you can author AWS Glue code for your purpose. You can copy/paste the generated code to the script editor and configure placeholders. After you configure an AWS Identity and Access Management (IAM) role and AWS Glue connections on the job, save and run the job. When the job is complete, you can start querying the table exported from Snowflake in Amazon Redshift.

Let’s try another prompt that reads data from two different sources, filters and projects them individually, joins on a common key, and writes the output to a third target.  Ask Amazon Q: “I want to read data from S3 in Parquet format, and select some fields. I also want to read data from DynamoDB, select some fields, and filter some rows. I want to union these two datasets and write the results to OpenSearch.

The code is generated. When the job is complete, your index is available in OpenSearch and can be used by your downstream workloads.

AWS Glue Studio notebook experience

Amazon Q data integration in AWS Glue helps you author code in an AWS Glue notebook to speed up development of new data integration applications. In this section, we walk you through how to set up the notebook and run a notebook job.

Prerequisites

Before going forward with this tutorial, complete the following prerequisites:

  1. Set up AWS Glue Studio.
  2. Configure an IAM role to interact with Amazon Q. Attach the following policy to your IAM role for the AWS Glue Studio notebook:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CodeWhispererPermissions",
                "Effect": "Allow",
                "Action": [
                    "codewhisperer:GenerateRecommendations"
                ],
                "Resource": "*"
            }
        ]
    }

Create a new AWS Glue Studio notebook job

Create a new AWS Glue Studio notebook job by completing the following steps:

  1. On the AWS Glue console, choose Notebooks under ETL jobs in the navigation pane.
  2. Under Create job, choose Notebook.
  3. For Engine, select Spark (Python).
  4. For Options, select Start fresh.
  5. For IAM role, choose the IAM role you configured as a prerequisite.
  6. Choose Create notebook.

A new notebook is created with sample cells. Let’s try recommendations using the Amazon Q data integration in AWS Glue to auto-generate code based on your intent. Amazon Q would help you with each step as you express an intent in a Notebook cell.

Add a new cell and enter your comment to describe what you want to achieve. After you press Tab and Enter, the recommended code is shown. First intent is to extract the data: “Give me code that reads a Glue Data Catalog table”, followed by “Give me code to apply a filter transform with star_rating>3” and “Give me code that writes the frame into S3 as Parquet”.

Similar to the Amazon Q chat experience, the code is recommended. If you press Tab, then the recommended code is chosen. You can learn more in User actions.

You can run each cell by simply filling in the appropriate options for your sources in the generated code. At any point in the runs, you can also preview a sample of your dataset by simply using the show() method.

Let’s now try to generate a full script with a single complex prompt. “I have JSON data in S3 and data in Oracle that needs combining. Please provide a Glue script that reads from both sources, does a join, and then writes results to Redshift”

You may notice that, on the notebook, the Amazon Q data integration in AWS Glue generated the same code snippet that was generated in the Amazon Q chat.

You can also run the notebook as a job, either by choosing Run or programmatically.

Conclusion

With Amazon Q data integration, you have an artificial intelligence (AI) expert by your side to integrate data efficiently without deep data engineering expertise. These capabilities simplify and accelerate data processing and integration on AWS. Amazon Q data integration in AWS Glue is available in every AWS Region where Amazon Q is available. To learn more, visit the product page, our documentation, and the Amazon Q pricing page.

A special thanks to everyone who contributed to the launch of Amazon Q data integration in AWS Glue: Alexandra Tello, Divya Gaitonde, Andrew Kim, Andrew King, Anshul Sharma, Anshi Shrivastava, Chuhan Liu, Daniel Obi, Hirva Patel, Henry Caballero Corzo, Jake Zych, Jeremy Samuel, Jessica Cheng, , Keerthi Chadalavada, Layth Yassin, Maheedhar Reddy Chappidi, Maya Patwardhan, Neil Gupta, Raghavendhar Vidyasagar Thiruvoipadi, Rajendra Gujja, Rupak Ravi, Shaoying Dong, Vaibhav Naik, Wei Tang, William Jones, Daiyan Alamgir, Japson Jeyasekaran, Matt Sampson, Kartik Panjabi, Ranu Shah, Chuan Lei, Huzefa Rangwala, Jiani Zhang, Xiao Qin, Mukul Prasad, Alon Halevy, Brian Ross, Alona Nadler, Omer Zaki, Rick Sears, Bratin Saha, G2 Krishnamoorthy, Kinshuk Pahare, Nitin Bahadur, and Santosh Chandrachood.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.


Matt Su is a Senior Product Manager on the AWS Glue team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytics services. In his spare time, he enjoys skiing and gardening.

Vishal Kajjam is a Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and using ML/AI for designing and building end-to-end solutions to address customers’ data integration needs. In his spare time, he enjoys spending time with family and friends.


Bo Li is a Senior Software Development Engineer on the AWS Glue team. He is devoted to designing and building end-to-end solutions to address customers’ data analytic and processing needs with cloud-based, data-intensive technologies.


XiaoRun Yu is a Software Development Engineer on the AWS Glue team. He is working on building new features for AWS Glue to help customers. Outside of work, Xiaorun enjoys exploring new places in the Bay Area.


Savio Dsouza is a Software Development Manager on the AWS Glue team. His team works on distributed systems & new interfaces for data integration and efficiently managing data lakes on AWS.


Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with interactive and simple-to-use interfaces to efficiently manage and transform petabytes of data across data lakes on Amazon S3, and databases and data warehouses on the cloud.

Orchestrate an end-to-end ETL pipeline using Amazon S3, AWS Glue, and Amazon Redshift Serverless with Amazon MWAA

Post Syndicated from Radhika Jakkula original https://aws.amazon.com/blogs/big-data/orchestrate-an-end-to-end-etl-pipeline-using-amazon-s3-aws-glue-and-amazon-redshift-serverless-with-amazon-mwaa/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that you can use to set up and operate data pipelines in the cloud at scale. Apache Airflow is an open source tool used to programmatically author, schedule, and monitor sequences of processes and tasks, referred to as workflows. With Amazon MWAA, you can use Apache Airflow and Python to create workflows without having to manage the underlying infrastructure for scalability, availability, and security.

By using multiple AWS accounts, organizations can effectively scale their workloads and manage their complexity as they grow. This approach provides a robust mechanism to mitigate the potential impact of disruptions or failures, making sure that critical workloads remain operational. Additionally, it enables cost optimization by aligning resources with specific use cases, making sure that expenses are well controlled. By isolating workloads with specific security requirements or compliance needs, organizations can maintain the highest levels of data privacy and security. Furthermore, the ability to organize multiple AWS accounts in a structured manner allows you to align your business processes and resources according to your unique operational, regulatory, and budgetary requirements. This approach promotes efficiency, flexibility, and scalability, enabling large enterprises to meet their evolving needs and achieve their goals.

This post demonstrates how to orchestrate an end-to-end extract, transform, and load (ETL) pipeline using Amazon Simple Storage Service (Amazon S3), AWS Glue, and Amazon Redshift Serverless with Amazon MWAA.

Solution overview

For this post, we consider a use case where a data engineering team wants to build an ETL process and give the best experience to their end-users when they want to query the latest data after new raw files are added to Amazon S3 in the central account (Account A in the following architecture diagram). The data engineering team wants to separate the raw data into its own AWS account (Account B in the diagram) for increased security and control. They also want to perform the data processing and transformation work in their own account (Account B) to compartmentalize duties and prevent any unintended changes to the source raw data present in the central account (Account A). This approach allows the team to process the raw data extracted from Account A to Account B, which is dedicated for data handling tasks. This makes sure the raw and processed data can be maintained securely separated across multiple accounts, if required, for enhanced data governance and security.

Our solution uses an end-to-end ETL pipeline orchestrated by Amazon MWAA that looks for new incremental files in an Amazon S3 location in Account A, where the raw data is present. This is done by invoking AWS Glue ETL jobs and writing to data objects in a Redshift Serverless cluster in Account B. The pipeline then starts running stored procedures and SQL commands on Redshift Serverless. As the queries finish running, an UNLOAD operation is invoked from the Redshift data warehouse to the S3 bucket in Account A.

Because security is important, this post also covers how to configure an Airflow connection using AWS Secrets Manager to avoid storing database credentials within Airflow connections and variables.

The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow.

The workflow consists of the following components:

  • The source and target S3 buckets are in a central account (Account A), whereas Amazon MWAA, AWS Glue, and Amazon Redshift are in a different account (Account B). Cross-account access has been set up between S3 buckets in Account A with resources in Account B to be able to load and unload data.
  • In the second account, Amazon MWAA is hosted in one VPC and Redshift Serverless in a different VPC, which are connected through VPC peering. A Redshift Serverless workgroup is secured inside private subnets across three Availability Zones.
  • Secrets like user name, password, DB port, and AWS Region for Redshift Serverless are stored in Secrets Manager.
  • VPC endpoints are created for Amazon S3 and Secrets Manager to interact with other resources.
  • Usually, data engineers create an Airflow Directed Acyclic Graph (DAG) and commit their changes to GitHub. With GitHub actions, they are deployed to an S3 bucket in Account B (for this post, we upload the files into S3 bucket directly). The S3 bucket stores Airflow-related files like DAG files, requirements.txt files, and plugins. AWS Glue ETL scripts and assets are stored in another S3 bucket. This separation helps maintain organization and avoid confusion.
  • The Airflow DAG uses various operators, sensors, connections, tasks, and rules to run the data pipeline as needed.
  • The Airflow logs are logged in Amazon CloudWatch, and alerts can be configured for monitoring tasks. For more information, see Monitoring dashboards and alarms on Amazon MWAA.

Prerequisites

Because this solution centers around using Amazon MWAA to orchestrate the ETL pipeline, you need to set up certain foundational resources across accounts beforehand. Specifically, you need to create the S3 buckets and folders, AWS Glue resources, and Redshift Serverless resources in their respective accounts prior to implementing the full workflow integration using Amazon MWAA.

Deploy resources in Account A using AWS CloudFormation

In Account A, launch the provided AWS CloudFormation stack to create the following resources:

  • The source and target S3 buckets and folders. As a best practice, the input and output bucket structures are formatted with hive style partitioning as s3://<bucket>/products/YYYY/MM/DD/.
  • A sample dataset called products.csv, which we use in this post.

Upload the AWS Glue job to Amazon S3 in Account B

In Account B, create an Amazon S3 location called aws-glue-assets-<account-id>-<region>/scripts (if not present). Replace the parameters for the account ID and Region in the sample_glue_job.py script and upload the AWS Glue job file to the Amazon S3 location.

Deploy resources in Account B using AWS CloudFormation

In Account B, launch the provided CloudFormation stack template to create the following resources:

  • The S3 bucket airflow-<username>-bucket to store Airflow-related files with the following structure:
    • dags – The folder for DAG files.
    • plugins – The file for any custom or community Airflow plugins.
    • requirements – The requirements.txt file for any Python packages.
    • scripts – Any SQL scripts used in the DAG.
    • data – Any datasets used in the DAG.
  • A Redshift Serverless environment. The name of the workgroup and namespace are prefixed with sample.
  • An AWS Glue environment, which contains the following:
    • An AWS Glue crawler, which crawls the data from the S3 source bucket sample-inp-bucket-etl-<username> in Account A.
    • A database called products_db in the AWS Glue Data Catalog.
    • An ELT job called sample_glue_job. This job can read files from the products table in the Data Catalog and load data into the Redshift table products.
  • A VPC gateway endpointto Amazon S3.
  • An Amazon MWAA environment. For detailed steps to create an Amazon MWAA environment using the Amazon MWAA console, refer to Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

launch stack 1

Create Amazon Redshift resources

Create two tables and a stored procedure on an Redshift Serverless workgroup using the products.sql file.

In this example, we create two tables called products and products_f. The name of the stored procedure is sp_products.

Configure Airflow permissions

After the Amazon MWAA environment is created successfully, the status will show as Available. Choose Open Airflow UI to view the Airflow UI. DAGs are automatically synced from the S3 bucket and visible in the UI. However, at this stage, there are no DAGs in the S3 folder.

Add the customer managed policy AmazonMWAAFullConsoleAccess, which grants Airflow users permissions to access AWS Identity and Access Management (IAM) resources, and attach this policy to the Amazon MWAA role. For more information, see Accessing an Amazon MWAA environment.

The policies attached to the Amazon MWAA role have full access and must only be used for testing purposes in a secure test environment. For production deployments, follow the least privilege principle.

Set up the environment

This section outlines the steps to configure the environment. The process involves the following high-level steps:

  1. Update any necessary providers.
  2. Set up cross-account access.
  3. Establish a VPC peering connection between the Amazon MWAA VPC and Amazon Redshift VPC.
  4. Configure Secrets Manager to integrate with Amazon MWAA.
  5. Define Airflow connections.

Update the providers

Follow the steps in this section if your version of Amazon MWAA is less than 2.8.1 (the latest version as of writing this post).

Providers are packages that are maintained by the community and include all the core operators, hooks, and sensors for a given service. The Amazon provider is used to interact with AWS services like Amazon S3, Amazon Redshift Serverless, AWS Glue, and more. There are over 200 modules within the Amazon provider.

Although the version of Airflow supported in Amazon MWAA is 2.6.3, which comes bundled with the Amazon provided package version 8.2.0, support for Amazon Redshift Serverless was not added until the Amazon provided package version 8.4.0. Because the default bundled provider version is older than when Redshift Serverless support was introduced, the provider version must be upgraded in order to use that functionality.

The first step is to update the constraints file and requirements.txt file with the correct versions. Refer to Specifying newer provider packages for steps to update the Amazon provider package.

  1. Specify the requirements as follows:
    --constraint "/usr/local/airflow/dags/constraints-3.10-mod.txt"
    apache-airflow-providers-amazon==8.4.0

  2. Update the version in the constraints file to 8.4.0 or higher.
  3. Add the constraints-3.11-updated.txt file to the /dags folder.

Refer to Apache Airflow versions on Amazon Managed Workflows for Apache Airflow for correct versions of the constraints file depending on the Airflow version.

  1. Navigate to the Amazon MWAA environment and choose Edit.
  2. Under DAG code in Amazon S3, for Requirements file, choose the latest version.
  3. Choose Save.

This will update the environment and new providers will be in effect.

  1. To verify the providers version, go to Providers under the Admin table.

The version for the Amazon provider package should be 8.4.0, as shown in the following screenshot. If not, there was an error while loading requirements.txt. To debug any errors, go to the CloudWatch console and open the requirements_install_ip log in Log streams, where errors are listed. Refer to Enabling logs on the Amazon MWAA console for more details.

Set up cross-account access

You need to set up cross-account policies and roles between Account A and Account B to access the S3 buckets to load and unload data. Complete the following steps:

  1. In Account A, configure the bucket policy for bucket sample-inp-bucket-etl-<username> to grant permissions to the AWS Glue and Amazon MWAA roles in Account B for objects in bucket sample-inp-bucket-etl-<username>:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": [
                        "arn:aws:iam::<account-id-of- AcctB>:role/service-role/<Glue-role>",
                        "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                    ]
                },
                "Action": [
                    "s3:GetObject",
    "s3:PutObject",
    		   "s3:PutObjectAcl",
    		   "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-inp-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  2. Similarly, configure the bucket policy for bucket sample-opt-bucket-etl-<username> to grant permissions to Amazon MWAA roles in Account B to put objects in this bucket:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<account-id-of-AcctB>:role/service-role/<MWAA-role>"
                },
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl",
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>"
                ]
            }
        ]
    }
    

  3. In Account A, create an IAM policy called policy_for_roleA, which allows necessary Amazon S3 actions on the output bucket:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "VisualEditor0",
                "Effect": "Allow",
                "Action": [
                    "kms:Decrypt",
                    "kms:Encrypt",
                    "kms:GenerateDataKey"
                ],
                "Resource": [
                    "<KMS_KEY_ARN_Used_for_S3_encryption>"
                ]
            },
            {
                "Sid": "VisualEditor1",
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:GetBucketAcl",
                    "s3:GetBucketCors",
                    "s3:GetEncryptionConfiguration",
                    "s3:GetBucketLocation",
                    "s3:ListAllMyBuckets",
                    "s3:ListBucket",
                    "s3:ListBucketMultipartUploads",
                    "s3:ListBucketVersions",
                    "s3:ListMultipartUploadParts"
                ],
                "Resource": [
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>",
                    "arn:aws:s3:::sample-opt-bucket-etl-<username>/*"
                ]
            }
        ]
    }

  4. Create a new IAM role called RoleA with Account B as the trusted entity role and add this policy to the role. This allows Account B to assume RoleA to perform necessary Amazon S3 actions on the output bucket.
  5. In Account B, create an IAM policy called s3-cross-account-access with permission to access objects in the bucket sample-inp-bucket-etl-<username>, which is in Account A.
  6. Add this policy to the AWS Glue role and Amazon MWAA role:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject",
                    "s3:PutObjectAcl"
                ],
                "Resource": "arn:aws:s3:::sample-inp-bucket-etl-<username>/*"
            }
        ]
    }

  7. In Account B, create the IAM policy policy_for_roleB specifying Account A as a trusted entity. The following is the trust policy to assume RoleA in Account A:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "CrossAccountPolicy",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::<account-id-of-AcctA>:role/RoleA"
            }
        ]
    }

  8. Create a new IAM role called RoleB with Amazon Redshift as the trusted entity type and add this policy to the role. This allows RoleB to assume RoleA in Account A and also to be assumable by Amazon Redshift.
  9. Attach RoleB to the Redshift Serverless namespace, so Amazon Redshift can write objects to the S3 output bucket in Account A.
  10. Attach the policy policy_for_roleB to the Amazon MWAA role, which allows Amazon MWAA to access the output bucket in Account A.

Refer to How do I provide cross-account access to objects that are in Amazon S3 buckets? for more details on setting up cross-account access to objects in Amazon S3 from AWS Glue and Amazon MWAA. Refer to How do I COPY or UNLOAD data from Amazon Redshift to an Amazon S3 bucket in another account? for more details on setting up roles to unload data from Amazon Redshift to Amazon S3 from Amazon MWAA.

Set up VPC peering between the Amazon MWAA and Amazon Redshift VPCs

Because Amazon MWAA and Amazon Redshift are in two separate VPCs, you need to set up VPC peering between them. You must add a route to the route tables associated with the subnets for both services. Refer to Work with VPC peering connections for details on VPC peering.

Make sure that CIDR range of the Amazon MWAA VPC is allowed in the Redshift security group and the CIDR range of the Amazon Redshift VPC is allowed in the Amazon MWAA security group, as shown in the following screenshot.

If any of the preceding steps are configured incorrectly, you are likely to encounter a “Connection Timeout” error in the DAG run.

Configure the Amazon MWAA connection with Secrets Manager

When the Amazon MWAA pipeline is configured to use Secrets Manager, it will first look for connections and variables in an alternate backend (like Secrets Manager). If the alternate backend contains the needed value, it is returned. Otherwise, it will check the metadata database for the value and return that instead. For more details, refer to Configuring an Apache Airflow connection using an AWS Secrets Manager secret.

Complete the following steps:

  1. Configure a VPC endpoint to link Amazon MWAA and Secrets Manager (com.amazonaws.us-east-1.secretsmanager).

This allows Amazon MWAA to access credentials stored in Secrets Manager.

  1. To provide Amazon MWAA with permission to access Secrets Manager secret keys, add the policy called SecretsManagerReadWrite to the IAM role of the environment.
  2. To create the Secrets Manager backend as an Apache Airflow configuration option, go to the Airflow configuration options, add the following key-value pairs, and save your settings.

This configures Airflow to look for connection strings and variables at the airflow/connections/* and airflow/variables/* paths:

secrets.backend: airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend secrets.backend_kwargs: {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}

  1. To generate an Airflow connection URI string, go to AWS CloudShell and enter into a Python shell.
  2. Run the following code to generate the connection URI string:
    import urllib.parse
    conn_type = 'redshift'
    host = 'sample-workgroup.<account-id-of-AcctB>.us-east-1.redshift-serverless.amazonaws.com' #Specify the Amazon Redshift workgroup endpoint
    port = '5439'
    login = 'admin' #Specify the username to use for authentication with Amazon Redshift
    password = '<password>' #Specify the password to use for authentication with Amazon Redshift
    role_arn = urllib.parse.quote_plus('arn:aws:iam::<account_id>:role/service-role/<MWAA-role>')
    database = 'dev'
    region = 'us-east-1' #YOUR_REGION
    conn_string = '{0}://{1}:{2}@{3}:{4}?role_arn={5}&database={6}&region={7}'.format(conn_type, login, password, host, port, role_arn, database, region)
    print(conn_string)
    

The connection string should be generated as follows:

redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=<region>
  1. Add the connection in Secrets Manager using the following command in the AWS Command Line Interface (AWS CLI).

This can also be done from the Secrets Manager console. This will be added in Secrets Manager as plaintext.

aws secretsmanager create-secret --name airflow/connections/secrets_redshift_connection --description "Apache Airflow to Redshift Cluster" --secret-string "redshift://admin:<password>@sample-workgroup.<account_id>.us-east-1.redshift-serverless.amazonaws.com:5439?role_arn=<MWAA role ARN>&database=dev&region=us-east-1" --region=us-east-1

Use the connection airflow/connections/secrets_redshift_connection in the DAG. When the DAG is run, it will look for this connection and retrieve the secrets from Secrets Manager. In case of RedshiftDataOperator, pass the secret_arn as a parameter instead of connection name.

You can also add secrets using the Secrets Manager console as key-value pairs.

  1. Add another secret in Secrets Manager in and save it as airflow/connections/redshift_conn_test.

Create an Airflow connection through the metadata database

You can also create connections in the UI. In this case, the connection details will be stored in an Airflow metadata database. If the Amazon MWAA environment is not configured to use the Secrets Manager backend, it will check the metadata database for the value and return that. You can create an Airflow connection using the UI, AWS CLI, or API. In this section, we show how to create a connection using the Airflow UI.

  1. For Connection Id, enter a name for the connection.
  2. For Connection Type, choose Amazon Redshift.
  3. For Host, enter the Redshift endpoint (without port and database) for Redshift Serverless.
  4. For Database, enter dev.
  5. For User, enter your admin user name.
  6. For Password, enter your password.
  7. For Port, use port 5439.
  8. For Extra, set the region and timeout parameters.
  9. Test the connection, then save your settings.

Create and run a DAG

In this section, we describe how to create a DAG using various components. After you create and run the DAG, you can verify the results by querying Redshift tables and checking the target S3 buckets.

Create a DAG

In Airflow, data pipelines are defined in Python code as DAGs. We create a DAG that consists of various operators, sensors, connections, tasks, and rules:

  • The DAG starts with looking for source files in the S3 bucket sample-inp-bucket-etl-<username> under Account A for the current day using S3KeySensor. S3KeySensor is used to wait for one or multiple keys to be present in an S3 bucket.
    • For example, our S3 bucket is partitioned as s3://bucket/products/YYYY/MM/DD/, so our sensor should check for folders with the current date. We derived the current date in the DAG and passed this to S3KeySensor, which looks for any new files in the current day folder.
    • We also set wildcard_match as True, which enables searches on bucket_key to be interpreted as a Unix wildcard pattern. Set the mode to reschedule so that the sensor task frees the worker slot when the criteria is not met and it’s rescheduled at a later time. As a best practice, use this mode when poke_interval is more than 1 minute to prevent too much load on a scheduler.
  • After the file is available in the S3 bucket, the AWS Glue crawler runs using GlueCrawlerOperator to crawl the S3 source bucket sample-inp-bucket-etl-<username> under Account A and updates the table metadata under the products_db database in the Data Catalog. The crawler uses the AWS Glue role and Data Catalog database that were created in the previous steps.
  • The DAG uses GlueCrawlerSensor to wait for the crawler to complete.
  • When the crawler job is complete, GlueJobOperator is used to run the AWS Glue job. The AWS Glue script name (along with location) and is passed to the operator along with the AWS Glue IAM role. Other parameters like GlueVersion, NumberofWorkers, and WorkerType are passed using the create_job_kwargs parameter.
  • The DAG uses GlueJobSensor to wait for the AWS Glue job to complete. When it’s complete, the Redshift staging table products will be loaded with data from the S3 file.
  • You can connect to Amazon Redshift from Airflow using three different operators:
    • PythonOperator.
    • SQLExecuteQueryOperator, which uses a PostgreSQL connection and redshift_default as the default connection.
    • RedshiftDataOperator, which uses the Redshift Data API and aws_default as the default connection.

In our DAG, we use SQLExecuteQueryOperator and RedshiftDataOperator to show how to use these operators. The Redshift stored procedures are run RedshiftDataOperator. The DAG also runs SQL commands in Amazon Redshift to delete the data from the staging table using SQLExecuteQueryOperator.

Because we configured our Amazon MWAA environment to look for connections in Secrets Manager, when the DAG runs, it retrieves the Redshift connection details like user name, password, host, port, and Region from Secrets Manager. If the connection is not found in Secrets Manager, the values are retrieved from the default connections.

In SQLExecuteQueryOperator, we pass the connection name that we created in Secrets Manager. It looks for airflow/connections/secrets_redshift_connection and retrieves the secrets from Secrets Manager. If Secrets Manager is not set up, the connection created manually (for example, redshift-conn-id) can be passed.

In RedshiftDataOperator, we pass the secret_arn of the airflow/connections/redshift_conn_test connection created in Secrets Manager as a parameter.

  • As final task, RedshiftToS3Operator is used to unload data from the Redshift table to an S3 bucket sample-opt-bucket-etl in Account B. airflow/connections/redshift_conn_test from Secrets Manager is used for unloading the data.
  • TriggerRule is set to ALL_DONE, which enables the next step to run after all upstream tasks are complete.
  • The dependency of tasks is defined using the chain() function, which allows for parallel runs of tasks if needed. In our case, we want all tasks to run in sequence.

The following is the complete DAG code. The dag_id should match the DAG script name, otherwise it won’t be synced into the Airflow UI.

from datetime import datetime
from airflow import DAG 
from airflow.decorators import task
from airflow.models.baseoperator import chain
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
from airflow.providers.amazon.aws.operators.glue import GlueJobOperator
from airflow.providers.amazon.aws.operators.glue_crawler import GlueCrawlerOperator
from airflow.providers.amazon.aws.sensors.glue import GlueJobSensor
from airflow.providers.amazon.aws.sensors.glue_crawler import GlueCrawlerSensor
from airflow.providers.amazon.aws.operators.redshift_data import RedshiftDataOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.amazon.aws.transfers.redshift_to_s3 import RedshiftToS3Operator
from airflow.utils.trigger_rule import TriggerRule


dag_id = "data_pipeline"
vYear = datetime.today().strftime("%Y")
vMonth = datetime.today().strftime("%m")
vDay = datetime.today().strftime("%d")
src_bucket_name = "sample-inp-bucket-etl-<username>"
tgt_bucket_name = "sample-opt-bucket-etl-<username>"
s3_folder="products"
#Please replace the variable with the glue_role_arn
glue_role_arn_key = "arn:aws:iam::<account_id>:role/<Glue-role>"
glue_crawler_name = "products"
glue_db_name = "products_db"
glue_job_name = "sample_glue_job"
glue_script_location="s3://aws-glue-assets-<account_id>-<region>/scripts/sample_glue_job.py"
workgroup_name = "sample-workgroup"
redshift_table = "products_f"
redshift_conn_id_name="secrets_redshift_connection"
db_name = "dev"
secret_arn="arn:aws:secretsmanager:us-east-1:<account_id>:secret:airflow/connections/redshift_conn_test-xxxx"
poll_interval = 10

@task
def get_role_name(arn: str) -> str:
    return arn.split("/")[-1]

@task
def get_s3_loc(s3_folder: str) -> str:
    s3_loc  = s3_folder + "/year=" + vYear + "/month=" + vMonth + "/day=" + vDay + "/*.csv"
    return s3_loc

with DAG(
    dag_id=dag_id,
    schedule="@once",
    start_date=datetime(2021, 1, 1),
    tags=["example"],
    catchup=False,
) as dag:
    role_arn = glue_role_arn_key
    glue_role_name = get_role_name(role_arn)
    s3_loc = get_s3_loc(s3_folder)


    # Check for new incremental files in S3 source/input bucket
    sensor_key = S3KeySensor(
        task_id="sensor_key",
        bucket_key=s3_loc,
        bucket_name=src_bucket_name,
        wildcard_match=True,
        #timeout=18*60*60,
        #poke_interval=120,
        timeout=60,
        poke_interval=30,
        mode="reschedule"
    )

    # Run Glue crawler
    glue_crawler_config = {
        "Name": glue_crawler_name,
        "Role": role_arn,
        "DatabaseName": glue_db_name,
    }

    crawl_s3 = GlueCrawlerOperator(
        task_id="crawl_s3",
        config=glue_crawler_config,
    )

    # GlueCrawlerOperator waits by default, setting as False to test the Sensor below.
    crawl_s3.wait_for_completion = False

    # Wait for Glue crawler to complete
    wait_for_crawl = GlueCrawlerSensor(
        task_id="wait_for_crawl",
        crawler_name=glue_crawler_name,
    )

    # Run Glue Job
    submit_glue_job = GlueJobOperator(
        task_id="submit_glue_job",
        job_name=glue_job_name,
        script_location=glue_script_location,
        iam_role_name=glue_role_name,
        create_job_kwargs={"GlueVersion": "4.0", "NumberOfWorkers": 10, "WorkerType": "G.1X"},
    )

    # GlueJobOperator waits by default, setting as False to test the Sensor below.
    submit_glue_job.wait_for_completion = False

    # Wait for Glue Job to complete
    wait_for_job = GlueJobSensor(
        task_id="wait_for_job",
        job_name=glue_job_name,
        # Job ID extracted from previous Glue Job Operator task
        run_id=submit_glue_job.output,
        verbose=True,  # prints glue job logs in airflow logs
    )

    wait_for_job.poke_interval = 5

    # Execute the Stored Procedure in Redshift Serverless using Data Operator
    execute_redshift_stored_proc = RedshiftDataOperator(
        task_id="execute_redshift_stored_proc",
        database=db_name,
        workgroup_name=workgroup_name,
        secret_arn=secret_arn,
        sql="""CALL sp_products();""",
        poll_interval=poll_interval,
        wait_for_completion=True,
    )

    # Execute the Stored Procedure in Redshift Serverless using SQL Operator
    delete_from_table = SQLExecuteQueryOperator(
        task_id="delete_from_table",
        conn_id=redshift_conn_id_name,
        sql="DELETE FROM products;",
        trigger_rule=TriggerRule.ALL_DONE,
    )

    # Unload the data from Redshift table to S3
    transfer_redshift_to_s3 = RedshiftToS3Operator(
        task_id="transfer_redshift_to_s3",
        s3_bucket=tgt_bucket_name,
        s3_key=s3_loc,
        schema="PUBLIC",
        table=redshift_table,
        redshift_conn_id=redshift_conn_id_name,
    )

    transfer_redshift_to_s3.trigger_rule = TriggerRule.ALL_DONE

    #Chain the tasks to be executed
    chain(
        sensor_key,
        crawl_s3,
        wait_for_crawl,
        submit_glue_job,
        wait_for_job,
        execute_redshift_stored_proc,
        delete_from_table,
        transfer_redshift_to_s3
        )
    

Verify the DAG run

After you create the DAG file (replace the variables in the DAG script) and upload it to the s3://sample-airflow-instance/dags folder, it will be automatically synced with the Airflow UI. All DAGs appear on the DAGs tab. Toggle the ON option to make the DAG runnable. Because our DAG is set to schedule="@once", you need to manually run the job by choosing the run icon under Actions. When the DAG is complete, the status is updated in green, as shown in the following screenshot.

In the Links section, there are options to view the code, graph, grid, log, and more. Choose Graph to visualize the DAG in a graph format. As shown in the following screenshot, each color of the node denotes a specific operator, and the color of the node outline denotes a specific status.

Verify the results

On the Amazon Redshift console, navigate to the Query Editor v2 and select the data in the products_f table. The table should be loaded and have the same number of records as S3 files.

On the Amazon S3 console, navigate to the S3 bucket s3://sample-opt-bucket-etl in Account B. The product_f files should be created under the folder structure s3://sample-opt-bucket-etl/products/YYYY/MM/DD/.

Clean up

Clean up the resources created as part of this post to avoid incurring ongoing charges:

  1. Delete the CloudFormation stacks and S3 bucket that you created as prerequisites.
  2. Delete the VPCs and VPC peering connections, cross-account policies and roles, and secrets in Secrets Manager.

Conclusion

With Amazon MWAA, you can build complex workflows using Airflow and Python without managing clusters, nodes, or any other operational overhead typically associated with deploying and scaling Airflow in production. In this post, we showed how Amazon MWAA provides an automated way to ingest, transform, analyze, and distribute data between different accounts and services within AWS. For more examples of other AWS operators, refer to the following GitHub repository; we encourage you to learn more by trying out some of these examples.


About the Authors


Radhika Jakkula is a Big Data Prototyping Solutions Architect at AWS. She helps customers build prototypes using AWS analytics services and purpose-built databases. She is a specialist in assessing wide range of requirements and applying relevant AWS services, big data tools, and frameworks to create a robust architecture.

Sidhanth Muralidhar is a Principal Technical Account Manager at AWS. He works with large enterprise customers who run their workloads on AWS. He is passionate about working with customers and helping them architect workloads for costs, reliability, performance, and operational excellence at scale in their cloud journey. He has a keen interest in data analytics as well.

How Salesforce optimized their detection and response platform using AWS managed services

Post Syndicated from Atul Khare original https://aws.amazon.com/blogs/big-data/how-salesforce-optimized-their-detection-and-response-platform-using-aws-managed-services/

This is a guest blog post co-authored with Atul Khare and Bhupender Panwar from Salesforce.

Headquartered in San Francisco, Salesforce, Inc. is a cloud-based customer relationship management (CRM) software company building artificial intelligence (AI)-powered business applications that allow businesses to connect with their customers in new and personalized ways.

The Salesforce Trust Intelligence Platform (TIP) log platform team is responsible for data pipeline and data lake infrastructure, providing log ingestion, normalization, persistence, search, and detection capability to ensure Salesforce is safe from threat actors. It runs miscellaneous services to facilitate investigation, mitigation, and containment for security operations. The TIP team is critical to securing Salesforce’s infrastructure, detecting malicious threat activities, and providing timely responses to security events. This is achieved by collecting and inspecting petabytes of security logs across dozens of organizations, some with thousands of accounts.

In this post, we discuss how the Salesforce TIP team optimized their architecture using Amazon Web Services (AWS) managed services to achieve better scalability, cost, and operational efficiency.

TIP existing architecture bird’s eye view and scale of the platform

The main key performance indicator (KPI) for the TIP platform is its capability to ingest a high volume of security logs from a variety of Salesforce internal systems in real time and process them with high velocity. The platform ingests more than 1 PB of data per day, more than 10 million events per second, and more than 200 different log types. The platform ingests log files in JSON, text, and Common Event Format (CEF) formats.

The message bus in TIP’s existing architecture mainly uses Apache Kafka for ingesting different log types coming from the upstream systems. Kafka had a single topic for all the log types before they were consumed by different downstream applications including Splunk, Streaming Search, and Log Normalizer. The Normalized Parquet Logs are stored in an Amazon Simple Storage Service (Amazon S3) data lake and cataloged into Hive Metastore (HMS) on an Amazon Relational Database Service (Amazon RDS) instance based on S3 event notifications. The data lake consumers then use Apache Presto running on Amazon EMR cluster to perform one-time queries. Other teams including the Data Science and Machine Learning teams use the platform to detect, analyze, and control security threats.

Challenges with the existing TIP log platform architecture

Some of the main challenges that TIP’s existing architecture was facing include:

  • Heavy operational overhead and maintenance cost managing the Kafka cluster
  • High cost to serve (CTS) to meet growing business needs
  • Compute threads limited by partitions’ numbers
  • Difficult to scale out when traffic increases
  • Weekly patching creates lags
  • Challenges with HMS scalability

All these challenges motivated the TIP team to embark on a journey to create a more optimized platform that’s easier to scale with less operational overhead and lower CTS.

New TIP log platform architecture

The Salesforce TIP log platform engineering team, in collaboration with AWS, started building the new architecture to replace the Kafka-based message bus solution with the fully managed AWS messaging and notification solutions Amazon Simple Queue Service (Amazon SQS) and Amazon Simple Notification Service (Amazon SNS). In the new design, the upstream systems send their logs to a central Amazon S3 storage location, which invokes a process to partition the logs and store them in an S3 data lake. Consumer applications such as Splunk get the messages delivered to their system using Amazon SQS. Similarly, the partitioned log data through Amazon SQS events initializes a log normalization process that delivers the normalized log data to open source Delta Lake tables on an S3 data lake. One of the major changes in the new architecture is the use of an AWS Glue Data Catalog to replace the previous Hive Metastore. The one-time analysis applications use Apache Trino on an Amazon EMR cluster to query the Delta Tables cataloged in AWS Glue. Other consumer applications also read the data from S3 data lake files stored in Delta Table format. More details on some of the important processes are as follows:

Log partitioner (Spark structured stream)

This service ingests logs from the Amazon S3 SNS SQS-based store and stores them in the partitioned (by log types) format in S3 for further downstream consumptions from the Amazon SNS SQS subscription. This is the bronze layer of the TIP data lake.

Log normalizer (Spark structured stream)

One of the downstream consumers of log partitioner (Splunk Ingestor is another one), the log normalizer ingests the data from Partitioned Output S3, using Amazon SNS SQS notifications, and enriches them using Salesforce custom parsers and tags. Finally, this enriched data is landed in the data lake on S3. This is the silver layer of the TIP data lake.

Machine learning and other data analytics consumers (Trino, Flink, and Spark Jobs)

These consumers consume from the silver layer of the TIP data lake and run analytics for security detection use cases. The earlier Kafka interface is now converted to delta streams ingestion, which concludes the total removal of the Kafka bus from the TIP data pipeline.

Advantages of the new TIP log platform architecture

The main advantages realized by the Salesforce TIP team based on this new architecture using Amazon S3, Amazon SNS, and Amazon SQS include:

  • Cost savings of approximately $400 thousand per month
  • Auto scaling to meet growing business needs
  • Zero DevOps maintenance overhead
  • No mapping of partitions to compute threads
  • Compute resources can be scaled up and down independently
  • Fully managed Data Catalog to reduce the operational overhead of managing HMS

Summary

In this blog post we discussed how the Salesforce Trust Intelligence Platform (TIP) optimized their data pipeline by replacing the Kafka-based message bus solution with fully managed AWS messaging and notification solutions using Amazon SQS and Amazon SNS. Salesforce and AWS teams worked together to make sure this new platform seamlessly scales to ingest more than 1 PB of data per day, more than 10 millions events per second, and more than 200 different log types. Reach out to your AWS account team if you have similar use cases and you need help architecting your platform to achieve operational efficiencies and scale.


About the authors

Atul Khare is a Director of Engineering at Salesforce Security, where he spearheads the Security Log Platform and Data Lakehouse initiatives. He supports diverse security customers by building robust big data ETL pipeline that is elastic, resilient, and easy to use, providing uniform & consistent security datasets for threat detection and response operations, AI, forensic analysis, analytics, and compliance needs across all Salesforce clouds. Beyond his professional endeavors, Atul enjoys performing music with his band to raise funds for local charities.

Bhupender Panwar is a Big Data Architect at Salesforce and seasoned advocate for big data and cloud computing. His background encompasses the development of data-intensive applications and pipelines, solving intricate architectural and scalability challenges, and extracting valuable insights from extensive datasets within the technology industry. Outside of his big data work, Bhupender loves to hike, bike, enjoy travel and is a great foodie.

Avijit Goswami is a Principal Solutions Architect at AWS specialized in data and analytics. He supports AWS strategic customers in building high-performing, secure, and scalable data lake solutions on AWS using AWS managed services and open-source solutions. Outside of his work, Avijit likes to travel, hike in the San Francisco Bay Area trails, watch sports, and listen to music.

Vikas Panghal is the Principal Product Manager leading the product management team for Amazon SNS and Amazon SQS. He has deep expertise in event-driven and messaging applications and brings a wealth of knowledge and experience to his role, shaping the future of messaging services. He is passionate about helping customers build highly scalable, fault-tolerant, and loosely coupled systems. Outside of work, he enjoys spending time with his family outdoors, playing chess, and running.

Amazon OpenSearch Service Under the Hood : OpenSearch Optimized Instances(OR1)

Post Syndicated from Bukhtawar Khan original https://aws.amazon.com/blogs/big-data/amazon-opensearch-service-under-the-hood-opensearch-optimized-instancesor1/

Amazon OpenSearch Service recently introduced the OpenSearch Optimized Instance family (OR1), which delivers up to 30% price-performance improvement over existing memory optimized instances in internal benchmarks, and uses Amazon Simple Storage Service (Amazon S3) to provide 11 9s of durability. With this new instance family, OpenSearch Service uses OpenSearch innovation and AWS technologies to reimagine how data is indexed and stored in the cloud.

Today, customers widely use OpenSearch Service for operational analytics because of its ability to ingest high volumes of data while also providing rich and interactive analytics. In order to provide these benefits, OpenSearch is designed as a high-scale distributed system with multiple independent instances indexing data and processing requests. As your operational analytics data velocity and volume of data grows, bottlenecks may emerge. To sustainably support high indexing volume and provide durability, we built the OR1 instance family.

In this post, we discuss how the reimagined data flow works with OR1 instances and how it can provide high indexing throughput and durability using a new physical replication protocol. We also dive deep into some of the challenges we solved to maintain correctness and data integrity.

Designing for high throughput with 11 9s of durability

OpenSearch Service manages tens of thousands of OpenSearch clusters. We’ve gained insights into typical cluster configurations that customers use to meet high throughput and durability goals. To achieve higher throughput, customers often choose to drop replica copies to save on the replication latency; however, this configuration results in sacrificing availability and durability. Other customers require high durability and as a result need to maintain multiple replica copies, resulting in higher operating costs for them.

The OpenSearch Optimized Instance family provides additional durability while also keeping costs lower by storing a copy of the data on Amazon S3. With OR1 instances, you can configure multiple replica copies for high read availability while maintaining indexing throughput.
The following diagram illustrates an indexing flow involving a metadata update in OR1

Indexing Request Flow in OR1

During indexing operations, individual documents are indexed into Lucene and also appended to a write-ahead log also known as a translog. Before sending back an acknowledgement to the client, all translog operations are persisted to the remote data store backed by Amazon S3. If any replica copies are configured, the primary copy performs checks to detect the possibility of multiple writers (control flow) on all replica copies for correctness reasons.
The following diagram illustrates the segment generation and replication flow in OR1 instances

Replication Flow in OR1

Periodically, as new segment files are created, the OR1 copy those segments to Amazon S3. When the transfer is complete, the primary publishes new checkpoints to all replica copies, notifying them of a new segment being available for download. The replica copies subsequently download newer segments and make them searchable. This model decouples the data flow that happens using Amazon S3 and the control flow (checkpoint publication and term validation) that happens over inter-node transport communication.

The following diagram illustrates the recovery flow in OR1 instances

Recovery Flow in OR1

OR1 instances persist not only the data, but the cluster metadata like index mappings, templates, and settings in Amazon S3. This makes sure that in the event of a cluster-manager quorum loss, which is a common failure mode in non-dedicated cluster-manager setups, OpenSearch can reliably recover the last acknowledged metadata.

In the event of an infrastructure failure, an OpenSearch domain can end up losing one or more nodes. In such an event, the new instance family guarantees recovery of both the cluster metadata and the index data up to the latest acknowledged operation. As new replacement nodes join the cluster, the internal cluster recovery mechanism bootstraps the new set of nodes and then recovers the latest cluster metadata from the remote cluster metadata store. After the cluster metadata is recovered, the recovery mechanism starts to hydrate the missing segment data and translog from Amazon S3. Then all uncommitted translog operations, up to the last acknowledged operation, are replayed to reinstate the lost copy.

The new design doesn’t modify the way searches work. Queries are processed normally by either the primary or replica shard for each shard in the index. You may see longer delays (in the 10-second range) before all copies are consistent to a particular point in time because the data replication is using Amazon S3.

A key advantage of this architecture is that it serves as a foundational building block for future innovations, like separation of readers and writers, and helps segregate compute and storage layers.

How redefining the replication strategy boosts the indexing throughput

OpenSearch supports two replication strategies: logical (document) and physical (segment) replication. In the case of logical replication, the data is indexed on all the copies independently, leading to redundant computation on the cluster. The OR1 instances use the new physical replication model, where data is indexed only on the primary copy and additional copies are created by copying data from the primary. With a high number of replica copies, the node hosting the primary copy requires significant network bandwidth, replicating the segment to all the copies. The new OR1 instances solve this problem by durably persisting the segment to Amazon S3, which is configured as a remote storage option. They also help with scaling replicas without bottlenecking on primary.

After the segments are uploaded to Amazon S3, the primary sends out a checkpoint request, notifying all replicas to download the new segments. The replica copies then need to download the incremental segments. Because this process frees up compute resources on replicas, which is otherwise required to redundantly index data and network overhead incurred on primaries to replicate data, the cluster is able to churn more throughput. In the event the replicas aren’t able to process the newly created segments, due to overload or slow network paths, the replicas beyond a point are marked as failed to prevent them from returning stale results.

Why high durability is a good idea, but hard to do well

Although all committed segments are durably persisted to Amazon S3 whenever they get created, one of key challenges in achieving high durability is synchronously writing all uncommitted operations to a write-ahead log on Amazon S3, before acknowledging back the request to the client, without sacrificing throughput. The new semantics introduce additional network latency for individual requests, but the way we’ve made sure there is no impact to throughput is by batching and draining requests on a single thread for up to a specified interval, while making sure other threads continue to index requests. As a result, you can drive higher throughput with more concurrent client connections by optimally batching your bulk payloads.

Other challenges in designing a highly durable system include enforcing data integrity and correctness at all times. Although some events like network partitions are rare, they can break the correctness of the system and therefore the system needs to be prepared to deal with these failure modes. Therefore, while switching to the new segment replication protocol, we also introduced a few other protocol changes, like detecting multiple writers on each replica. The protocol makes sure that an isolated writer can’t acknowledge a write request, while another newly promoted primary, based on the cluster-manager quorum, is concurrently accepting newer writes.

The new instance family automatically detects the loss of a primary shard while recovering data, and performs extensive checks on network reachability before the data can be re-hydrated from Amazon S3 and the cluster is brought back to a healthy state.

For data integrity, all files are extensively checksummed to make sure we are able to detect and prevent network or file system corruption that may result in data being unreadable. Furthermore, all files including metadata are designed to be immutable, providing additional safety against corruptions and versioned to prevent accidental mutating changes.

Reimagining how data flows

The OR1 instances hydrate copies directly from Amazon S3 in order to perform recovery of lost shards during an infrastructure failure. By using Amazon S3, we are able to free up the primary node’s network bandwidth, disk throughput, and compute, and therefore provide a more seamless in-place scaling and blue/green deployment experience by orchestrating the entire process with minimal primary node coordination.

OpenSearch Service provides automatic data backups called snapshots at hourly intervals, which means in case of accidental modifications to data, you have the option to go back to a previous point in time state. However, with the new OpenSearch instance family, we’ve discussed that the data is already durably persisted on Amazon S3. So how do snapshots work when we already have the data present on Amazon S3?

With the new instance family, snapshots serve as checkpoints, referencing the already present segment data as it exists at a point in time. This makes snapshots more lightweight and faster because they don’t need to re-upload any additional data. Instead, they upload metadata files that capture the view of the segments at that point in time, which we call shallow snapshots. The benefit of shallow snapshots extends to all operations, namely creation, deletion, and cloning of snapshots. You still have the option to snapshot an independent copy with manual snapshots for other administrative operations.

Summary

OpenSearch is an open source, community-driven software. Most of the foundational changes including the replication model, remote-backed storage, and remote cluster metadata have been contributed to open source; in fact, we follow an open source first development model.

Efforts to improve throughput and reliability is a never-ending cycle as we continue to learn and improve. The new OpenSearch optimized instances serve as a foundational building block, paving the way for future innovations. We are excited to continue our efforts in improving reliability and performance and to see what new and existing solutions builders can create using OpenSearch Service. We hope this leads to a deeper understanding of the new OpenSearch instance family, how this offering achieves high durability and better throughput, and how it can help you configure clusters based on the needs of your business.

If you’re excited to contribute to OpenSearch, open up a GitHub issue and let us know your thoughts. We would also love to hear about your success stories achieving high throughput and durability on OpenSearch Service. If you have other questions, please leave a comment.


About the Authors

Bukhtawar Khan is a Principal Engineer working on Amazon OpenSearch Service. He is interested in building distributed and autonomous systems. He is a maintainer and an active contributor to OpenSearch.

Gaurav Bafna is a Senior Software Engineer working on OpenSearch at Amazon Web Services. He is fascinated about solving problems in distributed systems. He is a maintainer and an active contributor to OpenSearch.

Sachin Kale is a senior software development engineer at AWS working on OpenSearch.

Rohin Bhargava is a Sr. Product Manager with the Amazon OpenSearch Service team. His passion at AWS is to help customers find the correct mix of AWS services to achieve success for their business goals.

Ranjith Ramachandra is a Senior Engineering Manager working on Amazon OpenSearch Service. He is passionate about highly scalable distributed systems, high performance and resilient systems.

Uplevel your data architecture with real- time streaming using Amazon Data Firehose and Snowflake

Post Syndicated from Swapna Bandla original https://aws.amazon.com/blogs/big-data/uplevel-your-data-architecture-with-real-time-streaming-using-amazon-data-firehose-and-snowflake/

Today’s fast-paced world demands timely insights and decisions, which is driving the importance of streaming data. Streaming data refers to data that is continuously generated from a variety of sources. The sources of this data, such as clickstream events, change data capture (CDC), application and service logs, and Internet of Things (IoT) data streams are proliferating. Snowflake offers two options to bring streaming data into its platform: Snowpipe and Snowflake Snowpipe Streaming. Snowpipe is suitable for file ingestion (batching) use cases, such as loading large files from Amazon Simple Storage Service (Amazon S3) to Snowflake. Snowpipe Streaming, a newer feature released in March 2023, is suitable for rowset ingestion (streaming) use cases, such as loading a continuous stream of data from Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK).

Before Snowpipe Streaming, AWS customers used Snowpipe for both use cases: file ingestion and rowset ingestion. First, you ingested streaming data to Kinesis Data Streams or Amazon MSK, then used Amazon Data Firehose to aggregate and write streams to Amazon S3, followed by using Snowpipe to load the data into Snowflake. However, this multi-step process can result in delays of up to an hour before data is available for analysis in Snowflake. Moreover, it’s expensive, especially when you have small files that Snowpipe has to upload to the Snowflake customer cluster.

To solve this issue, Amazon Data Firehose now integrates with Snowpipe Streaming, enabling you to capture, transform, and deliver data streams from Kinesis Data Streams, Amazon MSK, and Firehose Direct PUT to Snowflake in seconds at a low cost. With a few clicks on the Amazon Data Firehose console, you can set up a Firehose stream to deliver data to Snowflake. There are no commitments or upfront investments to use Amazon Data Firehose, and you only pay for the amount of data streamed.

Some key features of Amazon Data Firehose include:

  • Fully managed serverless service – You don’t need to manage resources, and Amazon Data Firehose automatically scales to match the throughput of your data source without ongoing administration.
  • Straightforward to use with no code – You don’t need to write applications.
  • Real-time data delivery – You can get data to your destinations quickly and efficiently in seconds.
  • Integration with over 20 AWS services – Seamless integration is available for many AWS services, such as Kinesis Data Streams, Amazon MSK, Amazon VPC Flow Logs, AWS WAF logs, Amazon CloudWatch Logs, Amazon EventBridge, AWS IoT Core, and more.
  • Pay-as-you-go model – You only pay for the data volume that Amazon Data Firehose processes.
  • Connectivity – Amazon Data Firehose can connect to public or private subnets in your VPC.

This post explains how you can bring streaming data from AWS into Snowflake within seconds to perform advanced analytics. We explore common architectures and illustrate how to set up a low-code, serverless, cost-effective solution for low-latency data streaming.

Overview of solution

The following are the steps to implement the solution to stream data from AWS to Snowflake:

  1. Create a Snowflake database, schema, and table.
  2. Create a Kinesis data stream.
  3. Create a Firehose delivery stream with Kinesis Data Streams as the source and Snowflake as its destination using a secure private link.
  4. To test the setup, generate sample stream data from the Amazon Kinesis Data Generator (KDG) with the Firehose delivery stream as the destination.
  5. Query the Snowflake table to validate the data loaded into Snowflake.

The solution is depicted in the following architecture diagram.

Prerequisites

You should have the following prerequisites:

Create a Snowflake database, schema, and table

Complete the following steps to set up your data in Snowflake:

  • Log in to your Snowflake account and create the database:
    create database adf_snf;

  • Create a schema in the new database:
    create schema adf_snf.kds_blog;

  • Create a table in the new schema:
    create or replace table iot_sensors
    (sensorId number,
    sensorType varchar,
    internetIP varchar,
    connectionTime timestamp_ntz,
    currentTemperature number
    );

Create a Kinesis data stream

Complete the following steps to create your data stream:

  • On the Kinesis Data Streams console, choose Data streams in the navigation pane.
  • Choose Create data stream.
  • For Data stream name, enter a name (for example, KDS-Demo-Stream).
  • Leave the remaining settings as default.
  • Choose Create data stream.

Create a Firehose delivery stream

Complete the following steps to create a Firehose delivery stream with Kinesis Data Streams as the source and Snowflake as its destination:

  • On the Amazon Data Firehose console, choose Create Firehose stream.
  • For Source, choose Amazon Kinesis Data Streams.
  • For Destination, choose Snowflake.
  • For Kinesis data stream, browse to the data stream you created earlier.
  • For Firehose stream name, leave the default generated name or enter a name of your preference.
  • Under Connection settings, provide the following information to connect Amazon Data Firehose to Snowflake:
    • For Snowflake account URL, enter your Snowflake account URL.
    • For User, enter the user name generated in the prerequisites.
    • For Private key, enter the private key generated in the prerequisites. Make sure the private key is in PKCS8 format. Do not include the PEM header-BEGIN prefix and footer-END suffix as part of the private key. If the key is split across multiple lines, remove the line breaks.
    • For Role, select Use custom Snowflake role and enter the IAM role that has access to write to the database table.

You can connect to Snowflake using public or private connectivity. If you don’t provide a VPC endpoint, the default connectivity mode is public. To allow list Firehose IPs in your Snowflake network policy, refer to Choose Snowflake for Your Destination. If you’re using a private link URL, provide the VPCE ID using SYSTEM$GET_PRIVATELINK_CONFIG:

select SYSTEM$GET_PRIVATELINK_CONFIG();

This function returns a JSON representation of the Snowflake account information necessary to facilitate the self-service configuration of private connectivity to the Snowflake service, as shown in the following screenshot.

  • For this post, we’re using a private link, so for VPCE ID, enter the VPCE ID.
  • Under Database configuration settings, enter your Snowflake database, schema, and table names.
  • In the Backup settings section, for S3 backup bucket, enter the bucket you created as part of the prerequisites.
  • Choose Create Firehose stream.

Alternatively, you can use an AWS CloudFormation template to create the Firehose delivery stream with Snowflake as the destination rather than using the Amazon Data Firehose console.

To use the CloudFormation stack, choose

BDB-4100-CFN-Launch-Stack

Generate sample stream data
Generate sample stream data from the KDG with the Kinesis data stream you created:

{ 
"sensorId": {{random.number(999999999)}}, 
"sensorType": "{{random.arrayElement( ["Thermostat","SmartWaterHeater","HVACTemperatureSensor","WaterPurifier"] )}}", 
"internetIP": "{{internet.ip}}", 
"connectionTime": "{{date.now("YYYY-MM-DDTHH:m:ss")}}", 
"currentTemperature": {{random.number({"min":10,"max":150})}} 
}

Query the Snowflake table

Query the Snowflake table:

select * from adf_snf.kds_blog.iot_sensors;

You can confirm that the data generated by the KDG that was sent to Kinesis Data Streams is loaded into the Snowflake table through Amazon Data Firehose.

Troubleshooting

If data is not loaded into Kinesis Data Steams after the KDG sends data to the Firehose delivery stream, refresh and make sure you are logged in to the KDG.

If you made any changes to the Snowflake destination table definition, recreate the Firehose delivery stream.

Clean up

To avoid incurring future charges, delete the resources you created as part of this exercise if you are not planning to use them further.

Conclusion

Amazon Data Firehose provides a straightforward way to deliver data to Snowpipe Streaming, enabling you to save costs and reduce latency to seconds. To try Amazon Kinesis Firehose with Snowflake, refer to the Amazon Data Firehose with Snowflake as destination lab.


About the Authors

Swapna Bandla is a Senior Solutions Architect in the AWS Analytics Specialist SA Team. Swapna has a passion towards understanding customers data and analytics needs and empowering them to develop cloud-based well-architected solutions. Outside of work, she enjoys spending time with her family.

Mostafa Mansour is a Principal Product Manager – Tech at Amazon Web Services where he works on Amazon Kinesis Data Firehose. He specializes in developing intuitive product experiences that solve complex challenges for customers at scale. When he’s not hard at work on Amazon Kinesis Data Firehose, you’ll likely find Mostafa on the squash court, where he loves to take on challengers and perfect his dropshots.

Bosco Albuquerque is a Sr. Partner Solutions Architect at AWS and has over 20 years of experience working with database and analytics products from enterprise database vendors and cloud providers. He has helped technology companies design and implement data analytics solutions and products.