Tag Archives: AWS Big Data

Reference guide to analyze transactional data in near-real time on AWS

Post Syndicated from Jason Dalba original https://aws.amazon.com/blogs/big-data/reference-guide-to-analyze-transactional-data-in-near-real-time-on-aws/

Business leaders and data analysts use near-real-time transaction data to understand buyer behavior to help evolve products. The primary challenge businesses face with near-real-time analytics is getting the data prepared for analytics in a timely manner, which can often take days. Companies commonly maintain entire teams to facilitate the flow of data from ingestion to analysis.

The consequence of delays in your organization’s analytics workflow can be costly. As online transactions have gained popularity with consumers, the volume and velocity of data ingestion has led to challenges in data processing. Consumers expect more fluid changes to service and products. Organizations that can’t quickly adapt their business strategy to align with consumer behavior may experience loss of opportunity and revenue in competitive markets.

To overcome these challenges, businesses need a solution that can provide near-real-time analytics on transactional data with services that don’t lead to latent processing and bloat from managing the pipeline. With a properly deployed architecture using the latest technologies in artificial intelligence (AI), data storage, streaming ingestions, and cloud computing, data will become more accurate, timely, and actionable. With such a solution, businesses can make actionable decisions in near-real time, allowing leaders to change strategic direction as soon as the market changes.

In this post, we discuss how to architect a near-real-time analytics solution with AWS managed analytics, AI and machine learning (ML), and database services.

Solution overview

The most common workloads, agnostic of industry, involve transactional data. Transactional data volumes and velocity have continued to rapidly expand as workloads have been pushed online. Near-real-time data is data stored, processed, and analyzed on a continual basis. It generates information that is available for use almost immediately after being generated. With the power of near-real-time analytics, business units across an organization, including sales, marketing, and operations, can make agile, strategic decisions. Without the proper architecture to support near real-time analytics, organizations will be dependent on delayed data and will not be able to capitalize on emerging opportunities. Missed opportunities could impact operational efficiency, customer satisfaction, or product innovation.

Managed AWS Analytics and Database services allow for each component of the solution, from ingestion to analysis, to be optimized for speed, with little management overhead. It is crucial for critical business solutions to follow the six pillars of the AWS Well-Architected Framework. The framework helps cloud architects build the most secure, high performing, resilient, and efficient infrastructure for critical workloads.

The following diagram illustrates the solution architecture.

Solution architecture

By combining the appropriate AWS services, your organization can run near-real-time analytics off a transactional data store. In the following sections, we discuss the key components of the solution.

Transactional data storage

In this solution, we use Amazon DynamoDB as our transactional data store. DynamoDB is a managed NoSQL database solution that acts as a key-value store for transactional data. As a NoSQL solution, DynamoDB is optimized for compute (as opposed to storage) and therefore the data needs to be modeled and served up to the application based on how the application needs it. This makes DynamoDB good for applications with known access patterns, which is a property of many transactional workloads.

In DynamoDB, you can create, read, update, or delete items in a table through a partition key. For example, if you want to keep track of how many fitness quests a user has completed in your application, you can query the partition key of the user ID to find the item with an attribute that holds data related to completed quests, then update the relevant attribute to reflect a specific quests completion. There are also some added benefits of DynamoDB by design, such as the ability to scale to support massive global internet-scale applications while maintaining consistent single-digit millisecond latency performance, because the date will be horizontally partitioned across the underlying storage nodes by the service itself through the partition keys. Modeling your data here is very important so DynamoDB can horizontally scale based on a partition key, which is again why it’s a good fit for a transactional store. In transactional workloads, when you know what the access patterns are, it will be easier to optimize a data model around those patterns as opposed to creating a data model to accept ad hoc requests. All that being said, DynamoDB doesn’t perform scans across many items as efficiently, so for this solution, we integrate DynamoDB with other services to help meet the data analysis requirements.

Data streaming

Now that we have stored our workload’s transactional data in DynamoDB, we need to move that data to another service that will be better suited for analysis of said data. The time to insights on this data matters, so rather than send data off in batches, we stream the data into an analytics service, which helps us get the near-real time aspect of this solution.

We use Amazon Kinesis Data Streams to stream the data from DynamoDB to Amazon Redshift for this specific solution. Kinesis Data Streams captures item-level modifications in DynamoDB tables and replicates them to a Kinesis data stream. Your applications can access this stream and view item-level changes in near-real time. You can continuously capture and store terabytes of data per hour. Additionally, with the enhanced fan-out capability, you can simultaneously reach two or more downstream applications. Kinesis Data Streams also provides durability and elasticity. The delay between the time a record is put into the stream and the time it can be retrieved (put-to-get delay) is typically less than 1 second. In other words, a Kinesis Data Streams application can start consuming the data from the stream almost immediately after the data is added. The managed service aspect of Kinesis Data Streams relieves you of the operational burden of creating and running a data intake pipeline. The elasticity of Kinesis Data Streams enables you to scale the stream up or down, so you never lose data records before they expire.

Analytical data storage

The next service in this solution is Amazon Redshift, a fully managed, petabyte-scale data warehouse service in the cloud. As opposed to DynamoDB, which is meant to update, delete, or read more specific pieces of data, Amazon Redshift is better suited for analytic queries where you are retrieving, comparing, and evaluating large amounts of data in multi-stage operations to produce a final result. Amazon Redshift achieves efficient storage and optimum query performance through a combination of massively parallel processing, columnar data storage, and very efficient, targeted data compression encoding schemes.

Beyond just the fact that Amazon Redshift is built for analytical queries, it can natively integrate with Amazon streaming engines. Amazon Redshift Streaming Ingestion ingests hundreds of megabytes of data per second, so you can query data in near-real time and drive your business forward with analytics. With this zero-ETL approach, Amazon Redshift Streaming Ingestion enables you to connect to multiple Kinesis data streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) data streams and pull data directly to Amazon Redshift without staging data in Amazon Simple Storage Service (Amazon S3). You can define a schema or choose to ingest semi-structured data with the SUPER data type. With streaming ingestion, a materialized view is the landing area for the data read from the Kinesis data stream, and the data is processed as it arrives. When the view is refreshed, Redshift compute nodes allocate each data shard to a compute slice. We recommend you enable auto refresh for this materialized view so that your data is continuously updated.

Data analysis and visualization

After the data pipeline is set up, the last piece is data analysis with Amazon QuickSight to visualize the changes in consumer behavior. QuickSight is a cloud-scale business intelligence (BI) service that you can use to deliver easy-to-understand insights to the people who you work with, wherever they are.

QuickSight connects to your data in the cloud and combines data from many different sources. In a single data dashboard, QuickSight can include AWS data, third-party data, big data, spreadsheet data, SaaS data, B2B data, and more. As a fully managed cloud-based service, QuickSight provides enterprise-grade security, global availability, and built-in redundancy. It also provides the user-management tools that you need to scale from 10 users to 10,000, all with no infrastructure to deploy or manage.

QuickSight gives decision-makers the opportunity to explore and interpret information in an interactive visual environment. They have secure access to dashboards from any device on your network and from mobile devices. Connecting QuickSight to the rest of our solution will complete the flow of data from being initially ingested into DynamoDB to being streamed into Amazon Redshift. QuickSight can create a visual analysis of the data in near-real time because that data is relatively up to date, so this solution can support use cases for making quick decisions on transactional data.

Using AWS for data services allows for each component of the solution, from ingestion to storage to analysis, to be optimized for speed and with little management overhead. With these AWS services, business leaders and analysts can get near-real-time insights to drive immediate change based on customer behavior, enabling organizational agility and ultimately leading to customer satisfaction.

Next steps

The next step to building a solution to analyze transactional data in near-real time on AWS would be to go through the workshop Enable near real-time analytics on data stored in Amazon DynamoDB using Amazon Redshift. In the workshop, you will get hands-on with AWS managed analytics, AI/ML, and database services to dive deep into an end-to-end solution delivering near-real-time analytics on transactional data. By the end of the workshop, you will have gone through the configuration and deployment of the critical pieces that will enable users to perform analytics on transactional workloads.

Conclusion

Developing an architecture that can serve transactional data to near-real-time analytics on AWS can help business become more agile in critical decisions. By ingesting and processing transactional data delivered directly from the application on AWS, businesses can optimize their inventory levels, reduce holding costs, increase revenue, and enhance customer satisfaction.

The end-to-end solution is designed for individuals in various roles, such as business users, data engineers, data scientists, and data analysts, who are responsible for comprehending, creating, and overseeing processes related to retail inventory forecasting. Overall, being able to analyze near-real time transactional data on AWS can provide businesses timely insight, allowing for quicker decision making in fast paced industries.


About the Authors

Jason D’Alba is an AWS Solutions Architect leader focused on database and enterprise applications, helping customers architect highly available and scalable database solutions.

Veerendra Nayak is a Principal Database Solutions Architect based in the Bay Area, California. He works with customers to share best practices on database migrations, resiliency, and integrating operational data with analytics and AI services.

Evan Day is a Database Solutions Architect at AWS, where he helps customers define technical solutions for business problems using the breadth of managed database services on AWS. He also focuses on building solutions that are reliable, performant, and cost efficient.

Design a data mesh on AWS that reflects the envisioned organization

Post Syndicated from Claudia Chitu original https://aws.amazon.com/blogs/big-data/design-a-data-mesh-on-aws-that-reflects-the-envisioned-organization/

This post is written in collaboration with Claudia Chitu and Spyridon Dosis from ACAST.

Founded in 2014, Acast is the world’s leading independent podcast company, elevating podcast creators and podcast advertisers for the ultimate listening experience. By championing an independent and open ecosystem for podcasting, Acast aims to fuel podcasting with the tools and monetization needed to thrive.

The company uses AWS Cloud services to build data-driven products and scale engineering best practices. To ensure a sustainable data platform amid growth and profitability phases, their tech teams adopted a decentralized data mesh architecture.

In this post, we discuss how Acast overcame the challenge of coupled dependencies between teams working with data at scale by employing the concept of a data mesh.

The problem

With an accelerated growth and expansion, Acast encountered a challenge that resonates globally. Acast found itself with diverse business units and a vast amount of data generated across the organization. The existing monolith and centralized architecture was struggling to meet the growing demands of data consumers. Data engineers were finding it increasingly challenging to maintain and scale the data infrastructure, resulting in data access, data silos, and inefficiencies in data management. A key objective was to enhance the end-to-end user experience, starting from the business needs.

Acast needed to address these challenges in order to get to an operational scale, meaning a global maximum of the number of people that can independently operate and deliver value. In this case, Acast tried to tackle the challenge of this monolith structure and the high time to value for product teams, tech teams, end consumers. It’s worth mentioning that they also have other product and tech teams, including operational or business teams, without AWS accounts.

Acast has a variable number of product teams, continuously evolving by merging existing ones, splitting them, adding new people, or simply creating new teams. In the last 2 years, they have had between 10–20 teams, consisting of 4–10 people each. Each team owns at least two AWS accounts, up to 10 accounts, depending on the ownership. The majority of data produced by these accounts is used downstream for business intelligence (BI) purposes and in Amazon Athena, by hundreds of business users every day.

The solution Acast implemented is a data mesh, architected on AWS. The solution mirrors the organizational structure rather than an explicit architectural decision. As per the Inverse Conway Maneuver, Acast’s technology architecture displays isomorphism with the business architecture. In this case, the business users are enabled through the data mesh architecture to get faster time to insights and know directly who the domain specific owners are, speeding up collaboration. This will be further detailed when we discuss the AWS Identity and Access Management (IAM) roles used, because one of the roles is dedicated to the business group.

Parameters of success

Acast succeeded in bootstrapping and scaling a new team- and domain-oriented data product and its corresponding infrastructure and setup, resulting in less friction in gathering insights and happier users and consumers.

The success of the implementation meant assessing various aspects of the data infrastructure, data management, and business outcomes. They classified the metrics and indicators in the following categories:

  • Data usage – A clear understanding of who is consuming what data source, materialized with a mapping of consumers and producers. Discussions with users showed they were happier to have faster access to data in a simpler way, a more structured data organization, and a clear mapping of who the producer is. A lot of progress has been made to advance their data-driven culture (data literacy, data sharing, and collaboration across business units).
  • Data governance – With their service-level object stating when the data sources are available (among other details), teams know whom to notify and can do so in a shorter time when there is late data coming in or other issues with the data. With a data steward role in place, the ownership has been strengthened.
  • Data team productivity – Through engineering retrospectives, Acast found that their teams appreciate autonomy to make decisions regarding their data domains.
  • Cost and resource efficiency – This is an area where Acast observed a reduction in data duplication, and therefore cost reduction (in some accounts, removing the copy of data 100%), by reading data across accounts while enabling scaling.

Data mesh overview

A data mesh is a sociotechnical approach to build a decentralized data architecture by using a domain-oriented, self-serve design (in a software development perspective), and borrows Eric Evans’ theory of domain-driven design and Manuel Pais’ and Matthew Skelton’s theory of team topologies. It’s important to establish the context to understand what data mesh is because it sets the stage for the technical details that follow and can help you understand how the concepts discussed in this post fit into the broader framework of a data mesh.

To recap before diving deeper into Acast’s implementation, the data mesh concept is based on the following principles:

  • It’s domain driven, as opposed to pipelines as a first-class concern
  • It serves data as a product
  • It’s a good product that delights users (data is trustworthy, documentation is available, and it’s easily consumable)
  • It offers federated computational governance and decentralized ownership—a self-serve data platform

Domain-driven architecture

In Acast’s approach of owning the operational and analytical datasets, teams are structured with ownership based on domain, reading directly from the producer of the data, via an API or programmatically from Amazon S3 storage or using Athena as a SQL query engine. Some examples of Acast’s domains are presented in the following figure.

As illustrated in the preceding figure, some domains are loosely coupled to other domains’ operational or analytical endpoints, with a different ownership. Others might have stronger dependency, which is expected, for business (some podcasters can be also advertisers, creating sponsorship creatives and running campaigns for their own shows, or transacting ads using Acast’s software as a service).

Data as a product

Treating data as a product entails three key components: the data itself, the metadata, and the associated code and infrastructure. In this approach, teams responsible for generating data are referred to as producers. These producer teams possess in-depth knowledge about their consumers, understanding how their data product is utilized. Any changes planned by the data producers are communicated in advance to all consumers. This proactive notification ensures that downstream processes are not disrupted. By providing consumers with advance notice, they have sufficient time to prepare for and adapt to the upcoming changes, maintaining a smooth and uninterrupted workflow. The producers run a new version of the initial dataset in parallel, notify the consumers individually, and discuss with them their necessary timeframe to start consuming the new version. When all consumers are using the new version, the producers make the initial version unavailable.

Data schemas are inferred from the common agreed-upon format to share files between teams, which is Parquet in the case of Acast. Data can be shared in files, batched or stream events, and more. Each team has its own AWS account acting as an independent and autonomous entity with its own infrastructure. For orchestration, they use the AWS Cloud Development Kit (AWS CDK) for infrastructure as code (IaC) and AWS Glue Data Catalogs for metadata management. Users can also raise requests to producers to improve the way the data is presented or to enrich the data with new data points for generating a higher business value.

With each team owning an AWS account and a data catalog ID from Athena, it’s straightforward to see this through the lenses of a distributed data lake on top of Amazon S3, with a common catalog mapping all the catalogs from all the accounts.

At the same time, each team can also map other catalogs to their own account and use their own data, which they produce along with the data from other accounts. Unless it is sensitive data, the data can be accessed programmatically or from the AWS Management Console in a self-service manner without being dependent on the data infrastructure engineers. This is a domain-agnostic, shared way to self-serve data. The product discovery happens through the catalog registration. Using only a few standards commonly agreed upon and adopted across the company, for the purpose of interoperability, Acast addressed the fragmented silos and friction to exchange data or consume domain-agnostic data.

With this principle, teams get assurance that the data is secure, trustworthy, and accurate, and appropriate access controls are managed at each domain level. Moreover, on the central account, roles are defined for different types of permissions and access, using AWS IAM Identity Center permissions. All datasets are discoverable from a single central account. The following figure illustrates how it’s instrumented, where two IAM roles are assumed by two types of user (consumer) groups: one that has access to a limited dataset, which is restricted data, and one that has access to non-restricted data. There is also a way to assume any of these roles, for service accounts, such as those used by data processing jobs in Amazon Managed Workflows for Apache Airflow (Amazon MWAA), for example.

How Acast solved for high alignment and a loosely coupled architecture

The following diagram shows a conceptual architecture of how Acast’s teams are organizing data and collaborating with each other.

Acast used the Well-Architected Framework for the central account to improve its practice running analytical workloads in the cloud. Through the lenses of the tool, Acast was able to address better monitoring, cost optimization, performance, and security. It helped them understand the areas where they could improve their workloads and how to address common issues, with automated solutions, as well as how to measure the success, defining KPIs. It saved them time to get the learnings that otherwise would have been taking longer to find. Spyridon Dosis, Acast’s Information Security Officer, shares, “We are happy AWS is always ahead with releasing tools that enable the configuration, assessment, and review of multi-account setup. This is a big plus for us, working in a decentralized organization.” Spyridon also adds, “A very important concept we value is the AWS security defaults (e.g. default encryption for S3 buckets).”

In the architecture diagram, we can see that each team can be a data producer, except the team owning the central account, which serves as the central data platform, modeling the logic from multiple domains to paint the full business picture. All other teams can be data producers or data consumers. They can connect to the central account and discover datasets via the cross-account AWS Glue Data Catalog, analyze them in the Athena query editor or with Athena notebooks, or map the catalog to their own AWS account. Access to the central Athena catalog is implemented with IAM Identity Center, with roles for open data and restricted data access.

For non-sensitive data (open data), Acast uses a template where the datasets are by default open to the entire organization to read from, using a condition to provide the organization-assigned ID parameter, as shown in the following code snippet:

{
    "Version": "2012-10-17",
    "Statement": [
        
        {
            "Effect": "Allow",
            "Principal": "*",
            "Action": [
               "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"  
            ],
            "Resource": [
                "arn:aws:s3:::DOC-EXAMPLE-BUCKET",
                "arn:aws:s3:::DOC-EXAMPLE-BUCKET/*"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:PrincipalOrgID": "ORG-ID-NUMBER"
                }
            }
        }
    ]
}

When handling sensitive data like financials, the teams use a collaborative data steward model. The data steward works with the requester to evaluate access justification for the intended use case. Together, they determine appropriate access methods to meet the need while maintaining security. This could include IAM roles, service accounts, or specific AWS services. This approach enables business users outside the tech organization (which means they don’t have an AWS account) to independently access and analyze the information they need. By granting access through IAM policies on AWS Glue resources and S3 buckets, Acast provides self-serve capabilities while still governing delicate data through human review. The data steward role has been valuable for understanding use cases, assessing security risks, and ultimately facilitating access that accelerates the business through analytical insights.

For Acast’s use case, granular row- or column-level access controls weren’t needed, so the approach sufficed. However, other organizations may require more fine-grained governance over sensitive data fields. In those cases, solutions like AWS Lake Formation could implement permissions needed, while still providing a self-serve data access model. For more information, refer to Design a data mesh architecture using AWS Lake Formation and AWS Glue.

At the same time, teams can read from other producers directly, from Amazon S3 or via an API, keeping the dependency at minimum, which enhances the velocity of development and delivery. Therefore, an account can be a producer and a consumer in parallel. Each team is autonomous, and is accountable for their own tech stack.

Additional learnings

What did Acast learn? So far, we’ve discussed that the architectural design is an effect of the organizational structure. Because the tech organization consists of multiple cross-functional teams, and it’s straightforward to bootstrap a new team, following the common principles of data mesh, Acast learned this doesn’t go seamlessly every time. To set up a fully new account in AWS, teams go through the same journey, but slightly different, considering their own set of particularities.

This can create certain frictions, and it’s difficult to get all data producing teams to reach a high maturity of being data producers. This can be explained by the different data competencies in those cross-functional teams and not being dedicated data teams.

By implementing the decentralized solution, Acast effectively tackled the scalability challenge by adapting their teams to align with evolving business needs. This approach ensures high decoupling and alignment. Furthermore, they strengthened ownership, significantly reducing the time needed to identify and resolve issues because the upstream source is readily known and easily accessible with specified SLAs. The volume of data support inquiries has seen a reduction of over 50%, because business users are empowered to gain faster insights. Notably, they successfully eliminated tens of terabytes of redundant storage that were previously copied solely to fulfill downstream requests. This achievement was made possible through the implementation of cross-account reading, leading to the removal of associated development and maintenance costs for these pipelines.

Conclusion

Acast used the Inverse Conway Maneuver law and employed AWS services where each cross-functional product team has its own AWS account to build a data mesh architecture that allows scalability, high ownership, and self-service data consumption. This has been working well for the company, regarding how data ownership and operations were approached, to meet their engineering principles, resulting in having the data mesh as an effect rather than a deliberate intent. For other organizations, the desired data mesh might look different and the approach might have other learnings.

To conclude, a modern data architecture on AWS allows you to efficiently construct data products and data mesh infrastructure at a low cost without compromising on performance.

The following are some examples of AWS services you can use to design your desired data mesh on AWS:


About the Authors

Claudia Chitu is a Data strategist and an influential leader in the Analytics space. Focused on aligning data initiatives with the overall strategic goals of the organization, she employs data as a guiding force for long-term planning and sustainable growth.

Spyridon Dosis is an Information Security Professional in Acast. Spyridon supports the organization in designing, implementing and operating its services in a secure manner protecting the company and users’ data.

Srikant Das is an Acceleration Lab Solutions Architect at Amazon Web Services. He has over 13 years of experience in Big Data analytics and Data Engineering, where he enjoys building reliable, scalable, and efficient solutions. Outside of work, he enjoys traveling and blogging his experiences in social media.

How smava makes loans transparent and affordable using Amazon Redshift Serverless

Post Syndicated from Alex Naumov original https://aws.amazon.com/blogs/big-data/how-smava-makes-loans-transparent-and-affordable-using-amazon-redshift-serverless/

This is a guest post co-written by Alex Naumov, Principal Data Architect at smava.

smava GmbH is one of the leading financial services companies in Germany, making personal loans transparent, fair, and affordable for consumers. Based on digital processes, smava compares loan offers from more than 20 banks. In this way, borrowers can choose the deals that are most favorable to them in a fast, digitalized, and efficient way.

smava believes in and takes advantage of data-driven decisions in order to become the market leader. The Data Platform team is responsible for supporting data-driven decisions at smava by providing data products across all departments and branches of the company. The departments include teams from engineering to sales and marketing. Branches range by products, namely B2C loans, B2B loans, and formerly also B2C mortgages. The data products used inside the company include insights from user journeys, operational reports, and marketing campaign results, among others. The data platform serves on average 60 thousand queries per day. The data volume is in double-digit TBs with steady growth as business and data sources evolve.

smava’s Data Platform team faced the challenge to deliver data to stakeholders with different SLAs, while maintaining the flexibility to scale up and down while staying cost-efficient. It took up to 3 hours to generate daily reporting, which impacted business decision-making when re-calculations needed to happen during the day. To speed up the self-service analytics and foster innovation based on data, a solution was needed to provide ways to allow any team to create data products on their own in a decentralized manner. To create and manage the data products, smava uses Amazon Redshift, a cloud data warehouse.

In this post, we show how smava optimized their data platform by using Amazon Redshift Serverless and Amazon Redshift data sharing to overcome right-sizing challenges for unpredictable workloads and further improve price-performance. Through the optimizations, smava achieved up to 50% cost savings and up to three times faster report generation compared to the previous analytics infrastructure.

Overview of solution

As a data-driven company, smava relies on the AWS Cloud to power their analytics use cases. To bring their customers the best deals and user experience, smava follows the modern data architecture principles with a data lake as a scalable, durable data store and purpose-built data stores for analytical processing and data consumption.

smava ingests data from various external and internal data sources into a landing stage on the data lake based on Amazon Simple Storage Service (Amazon S3). To ingest the data, smava uses a set of popular third-party customer data platforms complemented by custom scripts.

After the data lands in Amazon S3, smava uses the AWS Glue Data Catalog and crawlers to automatically catalog the available data, capture the metadata, and provide an interface that allows querying all data assets.

Data analysts who require access to the raw assets on the data lake use Amazon Athena, a serverless, interactive analytics service for exploration with ad hoc queries. For the downstream consumption by all departments across the organization, smava’s Data Platform team prepares curated data products following the extract, load, and transform (ELT) pattern. smava uses Amazon Redshift as their cloud data warehouse to transform, store, and analyze data, and uses Amazon Redshift Spectrum to efficiently query and retrieve structured and semi-structured data from the data lake using SQL.

smava follows the data vault modeling methodology with the Raw Vault, Business Vault, and Data Mart stages to prepare the data products for end consumers. The Raw Vault describes objects loaded directly from the data sources and represents a copy of the landing stage in the data lake. The Business Vault is populated with data sourced from the Raw Vault and transformed according to the business rules. Finally, the data is aggregated into specific data products oriented to a specific business line. This is the Data Mart stage. The data products from the Business Vault and Data Mart stages are now available for consumers. smava decided to use Tableau for business intelligence, data visualization, and further analytics. The data transformations are managed with dbt to simplify the workflow governance and team collaboration.

The following diagram shows the high-level data platform architecture before the optimizations.

High-level Data Platform architecture before the optimizations

Evolution of the data platform requirements

smava started with a single Redshift cluster to host all three data stages. They chose provisioned cluster nodes of the RA3 type with Reserved Instances (RIs) for cost optimization. As data volumes grew 53% year over year, so did the complexity and requirements from various analytic workloads.

smava quickly addressed the growing data volumes by right-sizing the cluster and using Amazon Redshift Concurrency Scaling for peak workloads. Furthermore, smava wanted to give all teams the option to create their own data products in a self-service manner to increase the pace of innovation. To avoid any interference with the centrally managed data products, the decentralized product development environments needed to be strictly isolated. The same requirement was also applied for the isolation of different product stages curated by the Data Platform team.

Optimizing the architecture with data sharing and Redshift Serverless

To meet the evolved requirements, smava decided to separate the workload by splitting the single provisioned Redshift cluster into multiple data warehouses, with each warehouse serving a different stage. In addition, smava added new staging environments in the Business Vault to develop new data products without the risk of interfering with existing product pipelines. To avoid any interference with the centrally managed data products of the Data Platform team, smava introduced an additional Redshift cluster, isolating the decentralized workloads.

smava was looking for an out-of-the-box solution to achieve workload isolation without managing a complex data replication pipeline.

Right after the launch of Redshift data sharing capabilities in 2021, the Data Platform team recognized that this was the solution they had been looking for. smava adopted the data sharing feature to have the data from producer clusters available for read access on different consumer clusters, with each of those consumer clusters serving a different stage.

Redshift data sharing enables instant, granular, and fast data access across Redshift clusters without the need to copy data. It provides live access to data so that users always see the most up-to-date and consistent information as it’s updated in the data warehouse. With data sharing, you can securely share live data with Redshift clusters in the same or different AWS accounts and across Regions.

With Redshift data sharing, smava was able to optimize the data architecture by separating the data workloads to individual consumer clusters without having to replicate the data. The following diagram illustrates the high-level data platform architecture after splitting the single Redshift cluster into multiple clusters.

High-level Data Platform architecture after splitting the single Redshift cluster in multiple clusters

By providing a self-service data mart, smava increased data democratization by providing users with access to all aspects of the data. They also provided teams with a set of custom tools for data discovery, ad hoc analysis, prototyping, and operating the full lifecycle of mature data products.

After collecting operational data from the individual clusters, the Data Platform team identified further potential optimizations: the Raw Vault cluster was under steady load 24/7, but the Business Vault clusters were only updated nightly. To optimize for costs, smava used the pause and resume capabilities of Redshift provisioned clusters. These capabilities are useful for clusters that need to be available at specific times. While the cluster is paused, on-demand billing is suspended. Only the cluster’s storage incurs charges.

The pause and resume feature helped smava optimize for cost, but it required additional operational overhead to trigger the cluster operations. Additionally, the development clusters remained subject to idle times during working hours. These challenges were finally solved by adopting Redshift Serverless in 2022. The Data Platform team decided to move the Business Data Vault stage clusters to Redshift Serverless, which allows them to pay for the data warehouse only when in use, reliably and efficiently.

Redshift Serverless is ideal for cases when it’s difficult to predict compute needs such as variable workloads, periodic workloads with idle time, and steady-state workloads with spikes. Additionally, as usage demand evolves with new workloads and more concurrent users, Redshift Serverless automatically provisions the right compute resources, and the data warehouse scales seamlessly and automatically, without the need for manual intervention. Data sharing is supported in both directions between Redshift Serverless and provisioned Redshift clusters with RA3 nodes, so no changes to the smava architecture were needed. The following diagram shows the high-level architecture setup after the move to Redshift Serverless.

High-level Data Platform architecture after introducing Redshift Serverless for Business Vault clusters

smava combined the benefits of Redshift Serverless and dbt through a seamless CI/CD pipeline, adopting a trunk-based development methodology. Changes on the Git repository are automatically deployed to a test stage and validated using automated integration tests. This approach increased the efficiency of developers and decreased the average time to production from days to minutes.

smava adopted an architecture that utilizes both provisioned and serverless Redshift data warehouses, together with the data sharing capability to isolate the workloads. By choosing the right architectural patterns for their needs, smava was able to accomplish the following:

  • Simplify the data pipelines and reduce operational overhead
  • Reduce the feature release time from days to minutes
  • Increase price-performance by reducing idle times and right-sizing the workload
  • Achieve up to three times faster report generation (faster calculations and higher parallelization) at 50% of the original setup costs
  • Increase agility of all departments and support data-driven decision-making by democratizing access to data
  • Increase the speed of innovation by exposing self-service data capabilities for teams across all departments and strengthening the A/B test capabilities to cover the complete customer journey

Now, all departments at smava are using the available data products to make data-driven, accurate, and agile decisions.

Future vision

For the future, smava plans to continue to optimize the Data Platform based on operational metrics. They’re considering switching more provisioned clusters like the Self-Service Data Mart cluster to serverless. Additionally, smava is optimizing the ELT orchestration toolchain to increase the number of parallel data pipelines to be run. This will increase the utilization of provisioned Redshift resources and allow for cost reductions.

With the introduction of the decentralized, self-service for data product creation, smava made a step forward towards a data mesh architecture. In the future, the Data Platform team plans to further evaluate the needs of their service users and establish further data mesh principles like federated data governance.

Conclusion

In this post, we showed how smava optimized their data platform by isolating environments and workloads using Redshift Serverless and data sharing features. Those Redshift environments are well integrated with their infrastructure, flexible in scaling on demand, and highly available, and they require minimum administration efforts. Overall, smava has increased performance by three times while reducing the total platform costs by 50%. Additionally, they reduced operational overhead to a minimum while maintaining the existing SLAs for report generation times. Moreover, smava has strengthened the culture of innovation by providing self-service data product capabilities to speed up their time to market.

If you’re interested in learning more about Amazon Redshift capabilities, we recommend watching the most recent What’s new with Amazon Redshift session in the AWS Events channel to get an overview of the features recently added to the service. You can also explore the self-service, hands-on Amazon Redshift labs to experiment with key Amazon Redshift functionalities in a guided manner.

You can also dive deeper into Redshift Serverless use cases and data sharing use cases. Additionally, check out the data sharing best practices and discover how other customers optimized for cost and performance with Redshift data sharing to get inspired for your own workloads.

If you prefer books, check out Amazon Redshift: The Definitive Guide by O’Reilly, where the authors detail the capabilities of Amazon Redshift and provide you with insights on corresponding patterns and techniques.


About the Authors

Blog author: Alex NaumovAlex Naumov is a Principal Data Architect at smava GmbH, and leads the transformation projects at the Data department. Alex previously worked 10 years as a consultant and data/solution architect in a wide variety of domains, such as telecommunications, banking, energy, and finance, using various tech stacks, and in many different countries. He has a great passion for data and transforming organizations to become data-driven and the best in what they do.

Blog author: Lingli ZhengLingli Zheng works as a Business Development Manager in the AWS worldwide specialist organization, supporting customers in the DACH region to get the best value out of Amazon analytics services. With over 12 years of experience in energy, automation, and the software industry with a focus on data analytics, AI, and ML, she is dedicated to helping customers achieve tangible business results through digital transformation.

Blog author: Alexander SpivakAlexander Spivak is a Senior Startup Solutions Architect at AWS, focusing on B2B ISV customers across EMEA North. Prior to AWS, Alexander worked as a consultant in financial services engagements, including various roles in software development and architecture. He is passionate about data analytics, serverless architectures, and creating efficient organizations.


This post was reviewed for technical accuracy by David Greenshtein, Senior Analytics Solutions Architect.

Unlocking the value of data as your differentiator

Post Syndicated from G2 Krishnamoorthy original https://aws.amazon.com/blogs/big-data/unlocking-the-value-of-data-as-your-differentiator/

Today on the AWS re:Invent keynote stage, Swami Sivasubramanian, VP of Data and AI, AWS, spoke about the beneficial relationship among data, generative AI, and humans—all working together to unleash new possibilities in efficiency and creativity. There has never been a more exciting time in modern technology. Innovation is accelerating everywhere, and the future is rife with possibility. While Swami explored many facets of this beneficial relationship in the keynote today, one area that is especially critical for our customers to get right if they want to see success in generative AI is data. When you want to build generative AI applications that are unique to your business needs, data is the differentiator. This week, we launched many new tools to help you turn your data into your differentiator. This includes tools to help you customize your foundation models, and new services and features to build a strong data foundation to fuel your generative AI applications.

Customizing foundation models

The need for data is quite obvious if you are building your own foundation models (FMs). These models need vast amounts of data. But data is necessary even when you are building on top of FMs. If you think about it, everyone has access to the same models for building generative AI applications. It’s data that is the key to moving from generic applications to generative AI applications that create real value for your customers and your business. For instance, Intuit’s new generative AI-powered assistant, Intuit Assist, uses relevant contextual datasets spanning small business, consumer finance, and tax information to deliver personalized financial insights to their customers. With Amazon Bedrock, you can privately customize FMs for your specific use case using a small set of your own labeled data through a visual interface without writing any code. Today, we announced the ability to fine-tune Cohere Command and Meta Llama 2 in addition to Amazon Titan. In addition to fine-tuning, we’re also making it easier for you to provide models with up-to-date and contextually relevant information from your data sources using Retrieval Augmented Generation (RAG). Amazon Bedrock’s Knowledge Bases feature, which went to general availability today, supports the entire RAG workflow, from ingestion, to retrieval, and prompt augmentation. Knowledge Bases works with popular vector databases and engines including Amazon OpenSearch Serverless, Redis Enterprise Cloud, and Pinecone, with support for Amazon Aurora and MongoDB coming soon.

Building a strong data foundation

To produce the high-quality data that you need to build or customize FMs for generative AI, you need a strong data foundation. Of course, the value of a strong data foundation is not new and the need for one spans well beyond generative AI. Across all types of use cases, from generative AI to business intelligence (BI), we’ve found that a strong data foundation includes a comprehensive set of services to meet all your use case needs, integrations across those services to break down data silos, and tools to govern data across the end-to-end data workflow so you can innovate more quickly. These tools also need to be intelligent to remove the heavy lifting from data management.

Comprehensive

First, you need a comprehensive set of data services so you can get the price/performance, speed, flexibility, and capabilities for any use case. AWS offers a broad set of tools that enable you to store, organize, access, and act upon various types of data. We have the broadest selection of database services, including relational databases like Aurora and Amazon Relational Database Service (Amazon RDS)—and on Monday, we introduced the newest addition to the RDS family: Amazon RDS for Db2. Now Db2 customers can easily set up, operate, and scale highly available Db2 databases in the cloud. We also offer non-relational databases like Amazon DynamoDB, used by over 1 million customers for its serverless, single-digit millisecond performance at any scale. You also need services to store data for analysis and machine learning (ML) like Amazon Simple Storage Service (Amazon S3). Customers have created hundreds of thousands of data lakes on Amazon S3. It also includes our data warehouse, Amazon Redshift, which delivers more than 6 times better price/performance than other cloud data warehouses. We also have tools that enable you to act on your data, including Amazon QuickSight for BI, Amazon SageMaker for ML, and of course, Amazon Bedrock for generative AI.

Serverless enhancements

The dynamic nature of data makes it perfectly suited to serverless technologies, which is why AWS offers a broad range of serverless database and analytics offerings that help support our customers’ most demanding workloads. This week, we made even more improvements to our serverless options in this area, including a new Aurora capability that automatically scales to millions of write transactions per second and manages petabytes of data while maintaining the simplicity of operating a single database. We also released a new serverless option for Amazon ElastiCache, which makes it faster and easier to create highly available caches and instantly scales to meet application demand. Finally, we announced new AI-driven scaling and optimizations for Amazon Redshift Serverless that enable the service to learn from your patterns and proactively scale on multiple dimensions, including concurrent users, data variability, and query complexity. It does all of this while factoring in your price/performance targets so you can optimize between cost and performance.

Vector capabilities across more databases

Your data foundation also needs to include services to store, index, retrieve, and search vector data. As our customers need vector embeddings as part as part of their generative AI application workflows, they told us they want to use vector capabilities in their existing databases to eliminate the steep learning curve for new programming tools, APIs, and SDKs. They also feel more confident knowing their existing databases are proven in production and meet requirements for scalability, availability, and storage and compute. And when your vectors and business data are stored in the same place, your applications will run faster—and there’s no data sync or data movement to worry about.

For all of these reasons, we’ve invested in adding vector capabilities to some of our most popular data services, including Amazon OpenSearch Service and OpenSearch Serverless, Aurora, and Amazon RDS. Today, we added four more to that list, with the addition of vector support in Amazon MemoryDB for Redis, Amazon DocumentDB (with MongoDB compatibility), DynamoDB, and Amazon Neptune. Now you can use vectors and generative AI with your database of choice.

Integrated

Another key to your data foundation is integrating data across your data sources for a more complete view of your business. Typically, connecting data across different data sources requires complex extract, transform, and load (ETL) pipelines, which can take hours—if not days—to build. These pipelines also have to be continuously maintained and can be brittle. AWS is investing in a zero-ETL future so you can quickly and easily connect and act on all your data, no matter where it lives. We’re delivering on this vision in a number of ways, including zero-ETL integrations between our most popular data stores. Earlier this year, we brought you our fully managed zero-ETL integration between Amazon Aurora MySQL-Compatible Edition and Amazon Redshift. Within seconds of data being written into Aurora, you can use Amazon Redshift to do near-real-time analytics and ML on petabytes of data. Woolworths, a pioneer in retail who helped build the retail model of today, was able to reduce development time for analysis of promotions and other events from 2 months to 1 day using the Aurora zero-ETL integration with Amazon Redshift.

More zero-ETL options

At re:Invent, we announced three more zero-ETL integrations with Amazon Redshift, including Amazon Aurora PostgreSQL-Compatible Edition, Amazon RDS for MySQL, and DynamoDB, to make it easier for you to take advantage of near-real-time analytics to improve your business outcomes. In addition to Amazon Redshift, we’ve also expanded our zero ETL support to OpenSearch Service, which tens of thousands of customers use for real-time search, monitoring, and analysis of business and operational data. This includes zero-ETL integrations with DynamoDB and Amazon S3. With all of these zero-ETL integrations, we’re making it even easier to leverage relevant data for your applications, including generative AI.

Governed

Finally, your data foundation needs to be secure and governed to ensure the data that’s used throughout the development cycle of your generative AI applications is high quality and compliant. To help with this, we launched Amazon DataZone last year. Amazon DataZone is being used by companies like Guardant Health and Bristol Meyers Squibb to catalog, discover, share, and govern data across their organization. Amazon DataZone uses ML to automatically add metadata to your data catalog, making all of your data more discoverable. This week, we added a new feature to Amazon DataZone that uses generative AI to automatically create business descriptions and context for your datasets with just a few clicks, making data even easier to understand and apply. While Amazon DataZone helps you share data in a governed way within your organization, many customers also want to securely share data with their partners.

Infusing intelligence across the data foundation

Not only have we added generative AI to Amazon DataZone, but we’re leveraging intelligent technology across our data services to make data easier to use, more intuitive to work with, and more accessible. Amazon Q, our new generative AI assistant, helps you in QuickSight to author dashboards and create compelling visual stories from your dashboard data using natural language. We also announced that Amazon Q can help you create data integration pipelines using natural language. For example, you can ask Q to “read JSON files from S3, join on ‘accountid’, and load into DynamoDB,” and Q will return an end-to-end data integration job to perform this action. Amazon Q is also making it easier to query data in your data warehouse with generative AI SQL in Amazon Redshift Query Editor (in preview). Now data analysts, scientists, and engineers can be more productive using generative AI text-to-code functionality. You can also improve accuracy by enabling query history access to specific users—without compromising data privacy.

These new innovations are going to make it easy for you to leverage data to differentiate your generative AI applications and create new value for your customers and your business. We look forward to seeing what you create!


About the authors

G2 Krishnamoorthy is VP of Analytics, leading AWS data lake services, data integration, Amazon OpenSearch Service, and Amazon QuickSight. Prior to his current role, G2 built and ran the Analytics and ML Platform at Facebook/Meta, and built various parts of the SQL Server database, Azure Analytics, and Azure ML at Microsoft.

Rahul Pathak is VP of Relational Database Engines, leading Amazon Aurora, Amazon Redshift, and Amazon QLDB. Prior to his current role, he was VP of Analytics at AWS, where he worked across the entire AWS database portfolio. He has co-founded two companies, one focused on digital media analytics and the other on IP-geolocation.

BMW Cloud Efficiency Analytics powered by Amazon QuickSight and Amazon Athena

Post Syndicated from Phillip Karg original https://aws.amazon.com/blogs/big-data/bmw-cloud-efficiency-analytics-powered-by-amazon-quicksight-and-amazon-athena/

This post is written in collaboration with Philipp Karg and Alex Gutfreund  from BMW Group.

Bayerische Motoren Werke AG (BMW) is a motor vehicle manufacturer headquartered in Germany with 149,475 employees worldwide and the profit before tax in the financial year 2022 was € 23.5 billion on revenues amounting to € 142.6 billion. BMW Group is one of the world’s leading premium manufacturers of automobiles and motorcycles, also providing premium financial and mobility services.

BMW Group uses 4,500 AWS Cloud accounts across the entire organization but is faced with the challenge of reducing unnecessary costs, optimizing spend, and having a central place to monitor costs. BMW Cloud Efficiency Analytics (CLEA) is a homegrown tool developed within the BMW FinOps CoE (Center of Excellence) aiming to optimize and reduce costs across all these accounts.

In this post, we explore how the BMW Group FinOps CoE implemented their Cloud Efficiency Analytics tool (CLEA), powered by Amazon QuickSight and Amazon Athena. With this tool, they effectively reduced costs and optimized spend across all their AWS Cloud accounts, utilizing a centralized cost monitoring system and using key AWS services. The CLEA dashboards were built on the foundation of the Well-Architected Lab. For more information on this foundation, refer to A Detailed Overview of the Cost Intelligence Dashboard.

CLEA gives full transparency into cloud costs, usage, and efficiency from a high-level overview to granular service, resource, and operational levels. It seamlessly consolidates data from various data sources within AWS, including AWS Cost Explorer (and forecasting with Cost Explorer), AWS Trusted Advisor, and AWS Compute Optimizer. Additionally, it incorporates BMW Group’s internal system to integrate essential metadata, offering a comprehensive view of the data across various dimensions, such as group, department, product, and applications.

The ultimate goal is to raise awareness of cloud efficiency and optimize cloud utilization in a cost-effective and sustainable manner. The dashboards, which offer a holistic view together with a variety of cost and BMW Group-related dimensions, were successfully launched in May 2023 and became accessible to users within the BMW Group.

Overview of the BMW Cloud Data Hub

At the BMW Group, Cloud Data Hub (CDH) is the central platform for managing company-wide data and data solutions. It works as a bundle for resources that are bound to a specific staging environment and Region to store data on Amazon Simple Storage Service (Amazon S3), which is renowned for its industry-leading scalability, data availability, security, and performance. Additionally, it manages table definitions in the AWS Glue Data Catalog, containing references to data sources and targets of extract, transform, and load (ETL) jobs in AWS Glue.

Data providers and consumers are the two fundamental users of a CDH dataset. Providers create datasets within assigned domain and as the owner of a dataset, they are responsible for the actual content and for providing appropriate metadata. They can use their own toolsets or rely on provided blueprints to ingest the data from source systems. Once released, consumers use datasets from different providers for analysis, machine learning (ML) workloads, and visualization.

Each CDH dataset has three processing layers: source (raw data), prepared (transformed data in Parquet), and semantic (combined datasets). It is possible to define stages (DEV, INT, PROD) in each layer to allow structured release and test without affecting PROD. Within each stage, it’s possible to create resources for storing actual data. Two resource types are associated with each database in a layer:

  • File store – S3 buckets for data storage
  • Database – AWS Glue databases for metadata sharing

Overview of the CLEA Landscape

The following diagram is a high-level overview of some of the technologies used for the extract, load, and transform (ELT) stages, as well as the final visualization and analysis layer. You might notice that this differs slightly from traditional ETL. The difference lies in when and where data transformation takes place. In ETL, data is transformed before it’s loaded into the data warehouse. In ELT, raw data is loaded into the data warehouse first, then it’s transformed directly within the warehouse. The ELT process has gained popularity with the rise of cloud-based, high-performance data warehouses, where transformation can be done more efficiently after loading.

Regardless of the method used, the goal is to provide high-quality, reliable data that can be used to drive business decisions.

CLEA Architecture

In this section, we take a closer look at the three essential stages mentioned previously: extract, load and transform.

Extract

The extract stage plays a pivotal role in the CLEA, serving as the initial step where data related to cost and usage and optimization is collected from a diverse range of sources within AWS. These sources encompass the AWS Cost and Usage Reports, Cost Explorer (and forecasting with Cost Explorer), Trusted Advisor, and Compute Optimizer. Furthermore, it fetches essential metadata from BMW Group’s internal system, offering a comprehensive view of the data across various dimensions, such as group, department, product, and applications in the later stages of data transformation.

The following diagram illustrates one of the data collection architectures that we use to collect Trusted Advisor data from nearly 4,500 AWS accounts and subsequently load that into Cloud Data Hub.

Let’s go through each numbered step as outlined in the architecture:

  1. A time-based rule in Amazon EventBridge triggers the CLEA Shared Workflow AWS Step Functions state machine.
  2. Based on the inputs, the Shared Workflow state machine invokes the Account Collector AWS Lambda function to retrieve AWS account details from AWS Organizations.
  3. The Account Collector Lambda function assumes an AWS Identity and Access Management (IAM) role to access linked account details via the Organizations API and writes them to Amazon Simple Queue Service (Amazon SQS) queues.
  4. The SQS queues trigger the Data Collector Lambda function using SQS Lambda triggers.
  5. The Data Collector Lambda function assumes an IAM role in each linked account to retrieve the relevant data and load it into the CDH source S3 bucket.
  6. When all linked accounts data is collected, the Shared Workflow state machine triggers an AWS Glue job for further data transformation.
  7. The AWS Glue job reads raw data from the CDH source bucket and transforms it into a compact Parquet format.

Load and transform

For the data transformations, we used an open-source data transformation tool called dbt (Data Build Tool), modifying and preprocessing the data through a number of abstract data layers:

  • Source – This layer contains the raw data the data source provides. The preferred data format is Parquet, but JSON, CSV, or plain text file are also allowed.
  • Prepared – The source layer is transformed and stored as the prepared layer in Parquet format for optimized columnar access. Initial cleaning, filtering, and basic transformations are performed in this layer.
  • Semantic – A semantic layer combines several prepared layer datasets to a single dataset that contains transformations, calculations, and business logic to deliver business-friendly insights.
  • QuickSight – QuickSight is the final presentation layer, which is directly ingested into QuickSight SPICE from Athena via incremental daily ingestion queries. These ingested datasets are used as a source in CLEA dashboards.

Overall, using dbt’s data modeling and the pay-as-you-go pricing of Athena, BMW Group can control costs by running efficient queries on demand. Furthermore, with the serverless architecture of Athena and dbt’s structured transformations, you can scale data processing without worrying about infrastructure management. In CLEA there are currently more than 120 dbt models implemented with complex transformations. The semantic layer is incrementally materialized and partially ingested into QuickSight with up to 4 TB of SPICE capacity. For dbt deployment and scheduling, we use GitHub Actions which allows us to introduce new dbt models and changes easily with automatic deployments and tests.

CLEA Access control

In this section, we explain how we implemented access control using row-level security in QuickSight and QuickSight embedding for authentication and authorization.

RLS for QuickSight

Row-level security (RLS) is a key feature that governs data access and privacy, which we implemented for CLEA. RLS is a mechanism that allows us to control the visibility of data at the row level based on user attributes. In essence, it ensures that users can only access the data that they are authorized to view, adding an additional layer of data protection within the QuickSight environment.

Understanding the importance of RLS requires a broader view of the data landscape. In organizations where multiple users interact with the same datasets but require different access levels due to their roles, RLS becomes a pivotal tool. It ensures data security and compliance with privacy regulations, preventing unauthorized access to sensitive data. Additionally, it offers a tailored user experience by displaying only relevant data to the user, thereby enhancing the effectiveness of data analysis.

For CLEA, we collected BMW Group metadata such as department, application, and organization, which are quite important to allow users to only see the accounts within their department, application, organization, and so on. This is achieved using both a user name and group name for access control. We use the user name for user-specific access control and the group name for adding some users to a specific group to extend their permissions for different use cases.

Lastly, because there are many dashboards created by CLEA, we also control which users a unique user can see and also the data itself in the dashboard. This is done at the group level. By default, all users are assigned to CLEA-READER, which is granted access to core dashboards that we want to share with users, but there are different groups that allow users to see additional dashboards after they’re assigned to that group.

The RLS dataset is refreshed daily to catch recent changes regarding new user additions, group changes, or any other user access changes. This dataset is also ingested to SPICE daily, which automatically updates all datasets restricted via this RLS dataset.

QuickSight embedding

CLEA is a cross-platform application that provides secure access to QuickSight embedded content with custom-built authentication and authorization logic that sits on top of BMW Group identity and role management services (referred to as BMW IAM).

CLEA provides access to sensitive data to multiple users with different permissions, which is why it is designed with fine-grained access control rules. It enforces access control using role-based access control (RBAC) and attribute-based access control (ABAC) models at two different levels:

  • At the dashboard level via QuickSight user groups (RBAC)
  • At the dashboard data level via QuickSight RLS (RBAC and ABAC)

Dashboard-level permissions define the list of dashboards users are able to visualize.

Dashboard data-level permissions define the subsets of dashboard data shown to the user and are applied using RLS with the user attributes mentioned earlier. Although the majority of roles defined in CLEA are used for dashboard-level permissions, some specific roles are strategically defined to grant permissions at the dashboard data level, taking priority over the ABAC model.

BMW has a defined set of guidelines suggesting the usage of their IAM services as the single source of truth for identity and access control, which the team took into careful consideration when designing the authentication and authorization processes for CLEA.

Upon their first login, users are automatically registered in CLEA and assigned a base role that grants them access to a basic set of dashboards.

The process of registering users in CLEA consists of mapping a user’s identity as retrieved from BMW’s identity provider (IdP) to a QuickSight user, then assigning the newly created user to the respective QuickSight user group.

For users that require more extensive permissions (at one of the levels mentioned before), it is possible to order additional role assignments via BMW’s self-service portal for role management. Authorized reviewers will then review it and either accept or reject the role assignments.

Role assignments will take effect the next time the user logs in, at which time the user’s assigned roles in BMW Group IAM are synced to the user’s QuickSight groups—internally referred to as the identity and permissions sync. As shown in the following diagram, the sync groups step calculates which users’ group memberships should be kept, created, and deleted following the logic.

Usage Insights

Amazon CloudWatch plays an indispensable role in enhancing the efficiency and usability of CLEA dashboards. Not only does CloudWatch offer real-time monitoring of AWS resources, but it also allows to track user activity and dashboard utilization. By analyzing usage metrics and logs, we can see who has logged in to the CLEA dashboards, what features are most frequently accessed, and how long users interact with various elements. These insights are invaluable for making data-driven decisions on how to improve the dashboards for a better user experience. Through the intuitive interface of CloudWatch, it’s possible to set up alarms for alerting about abnormal activities or performance issues. Ultimately, employing CloudWatch for monitoring offers a comprehensive view of both system health and user engagement, helping us refine and enhance our dashboards continually.

Conclusion

BMW Group’s CLEA platform offers a comprehensive and effective solution to manage and optimize cloud resources. By providing full transparency into cloud costs, usage, and efficiency, CLEA offers insights from high-level overviews to granular details at the service, resource, and operational level.

CLEA aggregates data from various sources, enabling a detailed roadmap of the cloud operations, tracking footprints across primes, departments, products, applications, resources, and tags. This dynamic vision helps identify trends, anticipate future needs, and make strategic decisions.

Future plans for CLEA include enhancing capabilities with data consistency and accuracy, integrating additional sources like Amazon S3 Storage Lens for deeper insights, and introducing Amazon QuickSight Q for intelligent recommendations powered by machine learning, further streamlining cloud operations.

By following the practices here, you can unlock the potential of efficient cloud resource management by implementing Cloud Intelligence Dashboards, providing you with precise insights into costs, savings, and operational effectiveness.


About the Authors

Philipp Karg is Lead FinOps Engineer at BMW Group and founder of the CLEA platform. He focus on boosting cloud efficiency initiatives and establishing a cost-aware culture within the company to ultimately leverage the cloud in a sustainable way.

Alex Gutfreund is Head of Product and Technology Integration at the BMW Group. He spearheads the digital transformation with a particular focus on platforms ecosystems and efficiencies. With extensive experience at the interface of business and IT, he drives change and makes an impact in various organizations. His industry knowledge spans from automotive, semiconductor, public transportation, and renewable energies.

Cizer Pereira is a Senior DevOps Architect at AWS Professional Services. He works closely with AWS customers to accelerate their journey to the cloud. He has a deep passion for Cloud Native and DevOps, and in his free time, he also enjoys contributing to open-source projects.

Selman Ay is a Data Architect in the AWS Professional Services team. He has worked with customers from various industries such as e-commerce, pharma, automotive and finance to build scalable data architectures and generate insights from the data. Outside of work, he enjoys playing tennis and engaging in outdoor activities.

Nick McCarthy is a Senior Machine Learning Engineer in the AWS Professional Services team. He has worked with AWS clients across various industries including healthcare, finance, sports, telecoms and energy to accelerate their business outcomes through the use of AI/ML. Outside of work Nick loves to travel, exploring new cuisines and cultures in the process.

Miguel Henriques is a Cloud Application Architect in the AWS Professional Services team with 4 years of experience in the automotive industry delivering cloud native solutions. In his free time, he is constantly looking for advancements in the web development space and searching for the next great pastel de nata.

Clean up your Excel and CSV files without writing code using AWS Glue DataBrew

Post Syndicated from Ismail Makhlouf original https://aws.amazon.com/blogs/big-data/clean-up-your-excel-and-csv-files-without-writing-code-using-aws-glue-databrew/

Managing data within an organization is complex. Handling data from outside the organization adds even more complexity. As the organization receives data from multiple external vendors, it often arrives in different formats, typically Excel or CSV files, with each vendor using their own unique data layout and structure. In this blog post, we’ll explore a solution that streamlines this process by leveraging the capabilities of AWS Glue DataBrew.

DataBrew is an excellent tool for data quality and preprocessing. You can use its built-in transformations, recipes, as well as integrations with the AWS Glue Data Catalog and Amazon Simple Storage Service (Amazon S3) to preprocess the data in your landing zone, clean it up, and send it downstream for analytical processing.

In this post, we demonstrate the following:

  • Extracting non-transactional metadata from the top rows of a file and merging it with transactional data
  • Combining multi-line rows into single-line rows
  • Extracting unique identifiers from within strings or text

Solution overview

For this use case, imagine you’re a data analyst working at your organization. The sales leadership have requested a consolidated view of the net sales they are making from each of the organization’s suppliers. Unfortunately, this information is not available in a database. The sales data comes from each supplier in layouts like the following example.

However, with hundreds of resellers, manually extracting the information at the top is not feasible. Your goal is to clean up and flatten the data into the following output layout.

image2

To achieve this, you can use pre-built transformations in DataBrew to quickly get the data in the layout you want.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Connect to the dataset

The first thing we need to do is upload the input dataset to Amazon S3. Create an S3 bucket for the project and create a folder to upload the raw input data. The output data will be stored in another folder in a later step.

Next, we need to connect DataBrew to our CSV file. We create what we call a dataset, which
is an artifact that points to whatever data source we will be using. Navigate to “Datasets” on
the left hand menu.

Ensure the Column header values field is set to Add default header. The input CSV has an irregular format, so the first row will not have the needed column values.

Create a project

To create a new project, complete the following steps:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Choose Create project.
  3. For Project name, enter FoodMartSales-AllUpProject.
  4. For Attached recipe, choose Create new recipe.
  5. For Recipe name, enter FoodMartSales-AllUpProject-recipe.
  6. For Select a dataset, select My datasets.
  7. Select the FoodMartSales-AllUp dataset.
  8. Under Permissions, for Role name, choose the IAM role you created as a prerequisite or create a new role.
  9. Choose Create project.

After the project is opened, an interactive session is created where you can author transformations on a sample of the data.

Extract non-transactional metadata from within the contents of the file and merge it with transactional data

In this section, we consider data that has metadata on the first few rows of the file, followed by transactional data. We walk through how to extract data relevant to the whole file from the top of the document and combine it with the transactional data into one flat table.

Extract metadata from the header and remove invalid rows

Complete the following steps to extract metadata from the header:

  1. Choose Conditions and then choose IF.
  2. For Matching conditions, choose Match all conditions.
  3. For Source, choose Value of and Column_1.
  4. For Logical condition, choose Is exactly.
  5. For Enter a value, choose Enter custom value and enter RESELLER NAME.
  6. For Flag result value as, choose Custom value.
  7. For Value if true, choose Select source column and set Value of to Column_2.
  8. For Value if false, choose Enter custom value and enter INVALID.
  9. Choose Apply.

Your dataset should now look like the following screenshot, with the Reseller Name value extracted to a column by itself.

Next, you remove invalid rows and fill the rows with the Reseller Name value.

  1. Choose Clean and then choose Custom values.
  2. For Source column, choose ResellerName.
  3. For Specify values to remove, choose Custom value.
  4. For Values to remove, choose Invalid.
  5. For Apply transform to, choose All rows.
  6. Choose Apply.
  7. Choose Missing and then choose Fill with most frequent value.
  8. For Source column, choose FirstTransactionDate.
  9. For Missing value action, choose Fill with most frequent value.
  10. For Apply transform to, choose All rows.
  11. Choose Apply.

Your dataset should now look like the following screenshot, with the Reseller Name value extracted to a column by itself.

Repeat the same steps in this section for the rest of the metadata, including Reseller Email Address, Reseller ID, and First Transaction Date.

Promote column headers and clean up data

To promote column headers, complete the following steps:

  1. Reorder the columns to put the metadata columns to the left of the dataset by choosing Column, Move column, and Start of the table.
  2. Rename the columns with the appropriate names.

Now you can clean up some columns and rows.

  1. Delete unnecessary columns, such as Column_7.

You can also delete invalid rows by filtering out records that don’t have a transaction date value.

  1. Choose the ABC icon on the menu of the Transaction_Date column and choose date.

  2. For Handle invalid values, select Delete rows, then choose Apply.

The dataset should now have the metadata extracted and the column headers promoted.

Combine multi-line rows into single-line rows

The next issue to address is transactions pertaining to the same row that are split across multiple lines. In the following steps, we extract the needed data from the rows and merge it into single-line transactions. For this example specifically, the Reseller Margin data is split across two lines.


Complete the following steps to get the Reseller Margin value on the same line as the corresponding transaction. First, we identify the Reseller Margin rows and store them in a temporary column.

  1. Choose Conditions and then choose IF.
  2. For Matching conditions, choose Match all conditions.
  3. For Source, choose Value of and Transaction_ID.
  4. For Logical condition, choose Contains.
  5. For Enter a value, choose Enter custom value and enter Reseller Margin.
  6. For Flag result value as, choose Custom value.
  7. For Value if true, choose Select source column set Value of to TransactionAmount.
  8. For Value if false, choose Enter custom value and enter Invalid.
  9. For Destination column, choose ResellerMargin_Temp.
  10. Choose Apply.

Next, you shift the Reseller Margin value up one row.

  1. Choose Functions and then choose NEXT.
  2. For Source column, choose ResellerMargin_Temp.
  3. For Number of rows, enter 1.
  4. For Destination column, choose ResellerMargin.
  5. For Apply transform to, choose All rows.
  6. Choose Apply.

Next, delete the invalid rows.

  1. Choose Missing and then choose Remove missing rows.
  2. For Source column, choose TransactionDate.
  3. For Missing value action, choose Delete rows with missing values.
  4. For Apply transform to, choose All rows.
  5. Choose Apply.

Your dataset should now look like the following screenshot, with the Reseller Margin value extracted to a column by itself.

With the data structured properly, we can move on to mining the cleaned data.

Extract unique identifiers from within strings and text

Many types of data contain important information stored as unstructured text in a cell. In this section, we look at how to extract this data. Within the sample dataset, the BankTransferText column has valuable information around our resellers’ registered bank account numbers as well as the currency of the transaction, namely IBAN, SWIFT Code, and Currency.

Complete the following steps to extract IBAN, SWIFT code, and Currency into separate columns. First, you extract the IBAN number from the text using a regular expression (regex).

  1. Choose Extract and then choose Custom value or pattern.
  2. For Create column options, choose Extract values.
  3. For Source column, choose BankTransferText.
  4. For Extract options, choose Custom value or pattern.
  5. For Values to extract, enter [a-zA-Z][a-zA-Z][0-9]{2}[A-Z0-9]{1,30}.
  6. For Destination column, choose IBAN.
  7. For Apply transform to, choose All rows.
  8. Choose Apply.
  9. Extract the SWIFT code from the text using a regex following the same steps used to extract the IBAN number, but using the following regex instead: (?!^)(SWIFT Code: )([A-Z]{2}[A-Z0-9]+).

Next, remove the SWIFT Code: label from the extracted text.

  1. Choose Remove and then choose Custom values.
  2. For Source column, choose SWIFT Code.
  3. For Specify values to remove, choose Custom value.
  4. For Apply transform to, choose All rows.
  5. Extract the currency from the text using a regex following the same steps used to extract the IBAN number, but using the following regex instead: (?!^)(Currency: )([A-Z]{3}).
  6. Remove the Currency: label from the extracted text following the same steps used to remove the SWIFT Code: label.

You can clean up by deleting any unnecessary columns.

  1. Choose Column and then choose Delete.
  2. For Source columns, choose BankTransferText.
  3. Choose Apply.
  4. Repeat for any remaining columns.

Your dataset should now look like the following screenshot, with IBAN, SWIFT Code, and Currency extracted to separate columns.

Write the transformed data to Amazon S3

With all the steps captured in the recipe, the last step is to write the transformed data to Amazon S3.

  1. On the DataBrew console, choose Run job.
  1. For Job name, enter FoodMartSalesToDataLake.
  2. For Output to, choose Amazon S3.
  3. For File type, choose CSV.
  4. For Delimiter, choose Comma (,).
  5. For Compression, choose None.
  6. For S3 bucket owners’ account, select Current AWS account.
  7. For S3 location, enter s3://{name of S3 bucket}/clean/.
  8. For Role name, choose the IAM role created as a prerequisite or create a new role.
  9. Choose Create and run job.
  10. Go to the Jobs tab and wait for the job to complete.
  11. Navigate to the job output folder on the Amazon S3 console.
  12. Download the CSV file and view the transformed output.

Your dataset should look similar to the following screenshot.

Clean up

To optimize cost, make sure to clean up the resources deployed for this project by completing the following steps:

  1. Delete every DataBrew project along with their linked recipes.
  2. Delete all the DataBrew datasets.
  3. Delete the contents in your S3 bucket.
  4. Delete the S3 bucket.

Conclusion

The reality of exchanging data with suppliers is that we can’t always control the shape of the input data. With DataBrew, we can use a list of pre-built transformations and repeatable steps to transform incoming data into a desired layout and extract relevant data and insights from Excel or CSV files. Start using DataBrew today and transform 3 rd party files into structured datasets ready for consumption by your business.


About the Author

Ismail Makhlouf is a Senior Specialist Solutions Architect for Data Analytics at AWS. Ismail focuses on architecting solutions for organizations across their end-to-end data analytics estate, including batch and real-time streaming, big data, data warehousing, and data lake workloads. He primarily works with direct-to-consumer platform companies in the ecommerce, FinTech, PropTech, and HealthTech space to achieve their business objectives with well-architected data platforms.

Connect your data for faster decisions with AWS

Post Syndicated from Rahul Pathak original https://aws.amazon.com/blogs/big-data/connect-your-data-for-faster-decisions-with-aws/

The most impactful data-driven insights come from connecting the dots between all your data sources—across departments, services, on-premises tools, and third-party applications. But typically, connecting data requires complex extract, transform, and load (ETL) pipelines, taking hours or days. That’s too slow for decision-making speed. ETL needs to be easier and sometimes eliminated.

AWS is investing in addressing this in several ways. First, for common use cases where ETL is repeated with little value-add, we’re integrating services to decrease or eliminate the need for ETL. Second, organizations still need transformations like cleansing, deduplication, and combining datasets for analysis and machine learning (ML). For these, AWS Glue provides fast, scalable data transformation. Third, AWS continues adding support for more data sources including connections to software as a service (SaaS) applications, on-premises applications, and other clouds so organizations can act on their data.

In this post, we discuss how we’re delivering on these investments with a number of data integration innovations that span AWS databases, analytics, business intelligence (BI), and ML services.

Amazon Aurora MySQL zero-ETL integration with Amazon Redshift is now generally available

In June 2023, we announced the public preview of Amazon Aurora MySQL-Compatible Edition zero-ETL integration with Amazon Redshift. We are thrilled to announce that this zero-ETL integration is now generally available. Amazon Aurora MySQL zero-ETL integration with Amazon Redshift processes over 1 million transactions per minute, enabling near-real-time analytics. Within seconds of new data coming into Amazon Aurora MySQL, the data is replicated to Amazon Redshift. Updates in Amazon Aurora MySQL are automatically and continuously propagated to Amazon Redshift. Customers and partners can derive tremendous time savings by reducing traditional ETL bottlenecks. They can now analyze business metrics in near-real time and make data-driven decisions faster than ever before.

In the retail industry, for example, Infosys wanted to gain faster insights about their business, such as best-selling products and high-revenue stores, based on transactions in a store management system. They used Amazon Aurora MySQL zero-ETL integration with Amazon Redshift to achieve this. With this integration, Infosys replicated Aurora data to Amazon Redshift and created Amazon QuickSight dashboards for product managers and channel leaders in just a few seconds, instead of several hours. Now, as part of Infosys Cobalt and Infosys Topaz blueprints, enterprises can have near real-time analytics on transactional data, which can help them make informed decisions related to store management.

– Sunil Senan, SVP and Global Head of Data, Analytics, and AI, Infosys

Amazon SageMaker Canvas integration with Amazon QuickSight

We are empowering business analysts to create predictive, interactive dashboards by connecting Amazon SageMaker Canvas, our no-code ML service, with Amazon QuickSight, our BI service. Business analysts use SageMaker Canvas to build ML models and generate predictions without needing to write code. They can then seamlessly integrate these predictions in QuickSight to create interactive dashboards that can be shared across their organization. This enables democratization of predictive insights for better decision-making.

Moreover, we have enabled deep, bidirectional integration between SageMaker Canvas and QuickSight. Business analysts can send ML models from SageMaker Canvas to QuickSight and run predictions from within QuickSight. Analysts can now also directly send data from QuickSight to SageMaker Canvas with just a few clicks to rapidly build ML models using a simple point-and-click interface, without needing to create or maintain complex data pipelines between the two services. This integration empowers users to go from data to predictions and visualizations faster than ever.

Connecting to SaaS applications

AWS services already connect to hundreds of AWS and third-party data sources. Data engineers can use services such as Amazon AppFlow and AWS Glue to make data quickly accessible from diverse sources. This enables organizations to derive unified insights across siloed datasets. We recently added new Amazon AppFlow and AWS Glue integrations to our existing portfolio.

Amazon AppFlow now supports concurrent processing for data transfers from SAP applications

Amazon AppFlow, a fully managed integration service that helps you securely transfer data between AWS services and SaaS applications, now supports concurrent processing and configurable page sizes for faster data transfers from SAP. This reduces the time taken to move SAP data into AWS data and artificial intelligence (AI) services.

Google BigQuery connectivity for AWS Glue for Apache Spark is now generally available

AWS Glue for Apache Spark has added native connectivity to Google BigQuery, enabling reading and writing of BigQuery data directly without the need to install or manage libraries. You can now add BigQuery as a source or target in AWS Glue Studio’s visual interface or directly in an AWS Glue ETL script.

Summary

The data integration innovations we have highlighted show our commitment to empowering organizations to easily connect their data. Whether it’s achieving near-real-time insights, democratizing predictive analytics, or connecting diverse data sources, we are focused on helping you derive more value from your data. With the new capabilities of Amazon Aurora MySQL, Amazon Redshift, SageMaker Canvas, QuickSight, Amazon AppFlow, and AWS Glue, data engineers and business analysts can break down data silos to uncover insights.

Visit Data integration with AWS to learn more.


About the authors

Rahul Pathak is VP of Relational Database Engines, leading Amazon Aurora, Amazon Redshift, and Amazon QLDB. Prior to his current role, he was VP of Analytics at AWS, where he worked across the entire AWS database portfolio. He has co-founded two companies, one focused on digital media analytics and the other on IP-geolocation.

G2 Krishnamoorthy is VP of Analytics, leading AWS data lake services, data integration, Amazon OpenSearch Service, and Amazon QuickSight. Prior to his current role, G2 built and ran the Analytics and ML Platform at Facebook/Meta, and built various parts of the SQL Server database, Azure Analytics, and Azure ML at Microsoft.

Unlock scalable analytics with AWS Glue and Google BigQuery

Post Syndicated from Kartikay Khator original https://aws.amazon.com/blogs/big-data/unlock-scalable-analytics-with-aws-glue-and-google-bigquery/

Data integration is the foundation of robust data analytics. It encompasses the discovery, preparation, and composition of data from diverse sources. In the modern data landscape, accessing, integrating, and transforming data from diverse sources is a vital process for data-driven decision-making. AWS Glue, a serverless data integration and extract, transform, and load (ETL) service, has revolutionized this process, making it more accessible and efficient. AWS Glue eliminates complexities and costs, allowing organizations to perform data integration tasks in minutes, boosting efficiency.

This blog post explores the newly announced managed connector for Google BigQuery and demonstrates how to build a modern ETL pipeline with AWS Glue Studio without writing code.

Overview of AWS Glue

AWS Glue is a serverless data integration service that makes it easier to discover, prepare, and combine data for analytics, machine learning (ML), and application development. AWS Glue provides all the capabilities needed for data integration, so you can start analyzing your data and putting it to use in minutes instead of months. AWS Glue provides both visual and code-based interfaces to make data integration easier. Users can more easily find and access data using the AWS Glue Data Catalog. Data engineers and ETL (extract, transform, and load) developers can visually create, run, and monitor ETL workflows in a few steps in AWS Glue Studio. Data analysts and data scientists can use AWS Glue DataBrew to visually enrich, clean, and normalize data without writing code.

Introducing Google BigQuery Spark connector

To meet the demands of diverse data integration use cases, AWS Glue now offers a native spark connector for Google BigQuery. Customers can now use AWS Glue 4.0 for Spark to read from and write to tables in Google BigQuery. Additionally, you can read an entire table or run a custom query and write your data using direct and indirect writing methods. You connect to BigQuery using service account credentials stored securely in AWS Secrets Manager.

Benefits of Google BigQuery Spark connector

  • Seamless integration: The native connector offers an intuitive and streamlined interface for data integration, reducing the learning curve.
  • Cost efficiency: Building and maintaining custom connectors can be expensive. The native connector provided by AWS Glue is a cost-effective alternative.
  • Efficiency: Data transformation tasks that previously took weeks or months can now be accomplished within minutes, optimizing efficiency.

Solution overview

In this example, you create two ETL jobs using AWS Glue with the native Google BigQuery connector.

  1. Query a BigQuery table and save the data into Amazon Simple Storage Service (Amazon S3) in Parquet format.
  2. Use the data extracted from the first job to transform and create an aggregated result to be stored in Google BigQuery.

solution architecture

Prerequisites

The dataset used in this solution is the NCEI/WDS Global Significant Earthquake Database, with a global listing of over 5,700 earthquakes from 2150 BC to the present. Copy this public data into your Google BigQuery project or use your existing dataset.

Configure BigQuery connections

To connect to Google BigQuery from AWS Glue, see Configuring BigQuery connections. You must create and store your Google Cloud Platform credentials in a Secrets Manager secret, then associate that secret with a Google BigQuery AWS Glue connection.

Set up Amazon S3

Every object in Amazon S3 is stored in a bucket. Before you can store data in Amazon S3, you must create an S3 bucket to store the results.

To create an S3 bucket:

  1. On the AWS Management Console for Amazon S3, choose Create bucket.
  2. Enter a globally unique Name for your bucket; for example, awsglue-demo.
  3. Choose Create bucket.

Create an IAM role for the AWS Glue ETL job

When you create the AWS Glue ETL job, you specify an AWS Identity and Access Management (IAM) role for the job to use. The role must grant access to all resources used by the job, including Amazon S3 (for any sources, targets, scripts, driver files, and temporary directories), and Secrets Manager.

For instructions, see Configure an IAM role for your ETL job.

Solution walkthrough

Create a visual ETL job in AWS Glue Studio to transfer data from Google BigQuery to Amazon S3

  1. Open the AWS Glue console.
  2. In AWS Glue, navigate to Visual ETL under the ETL jobs section and create a new ETL job using Visual with a blank canvas.
  3. Enter a Name for your AWS Glue job, for example, bq-s3-dataflow.
  4. Select Google BigQuery as the data source.
    1. Enter a name for your Google BigQuery source node, for example, noaa_significant_earthquakes.
    2. Select a Google BigQuery connection, for example, bq-connection.
    3. Enter a Parent project, for example, bigquery-public-datasources.
    4. Select Choose a single table for the BigQuery Source.
    5. Enter the table you want to migrate in the form [dataset].[table], for example, noaa_significant_earthquakes.earthquakes.
      big query data source for bq to amazon s3 dataflow
  5. Next, choose the data target as Amazon S3.
    1. Enter a Name for the target Amazon S3 node, for example, earthquakes.
    2. Select the output data Format as Parquet.
    3. Select the Compression Type as Snappy.
    4. For the S3 Target Location, enter the bucket created in the prerequisites, for example, s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/.
    5. You should replace <YourBucketName> with the name of your bucket.
      s3 target node for bq to amazon s3 dataflow
  6. Next go to the Job details. In the IAM Role, select the IAM role from the prerequisites, for example, AWSGlueRole.
    IAM role for bq to amazon s3 dataflow
  7. Choose Save.

Run and monitor the job

  1. After your ETL job is configured, you can run the job. AWS Glue will run the ETL process, extracting data from Google BigQuery and loading it into your specified S3 location.
  2. Monitor the job’s progress in the AWS Glue console. You can see logs and job run history to ensure everything is running smoothly.

run and monitor bq to amazon s3 dataflow

Data validation

  1. After the job has run successfully, validate the data in your S3 bucket to ensure it matches your expectations. You can see the results using Amazon S3 Select.

review results in amazon s3 from the bq to s3 dataflow run

Automate and schedule

  1. If needed, set up job scheduling to run the ETL process regularly. You can use AWS to automate your ETL jobs, ensuring your S3 bucket is always up to date with the latest data from Google BigQuery.

You’ve successfully configured an AWS Glue ETL job to transfer data from Google BigQuery to Amazon S3. Next, you create the ETL job to aggregate this data and transfer it to Google BigQuery.

Finding earthquake hotspots with AWS Glue Studio Visual ETL.

  1. Open AWS Glue console.
  2. In AWS Glue navigate to Visual ETL under the ETL jobs section and create a new ETL job using Visual with a blank canvas.
  3. Provide a name for your AWS Glue job, for example, s3-bq-dataflow.
  4. Choose Amazon S3 as the data source.
    1. Enter a Name for the source Amazon S3 node, for example, earthquakes.
    2. Select S3 location as the S3 source type.
    3. Enter the S3 bucket created in the prerequisites as the S3 URL, for example, s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/.
    4. You should replace <YourBucketName> with the name of your bucket.
    5. Select the Data format as Parquet.
    6. Select Infer schema.
      amazon s3 source node for s3 to bq dataflow
  5. Next, choose Select Fields transformation.
    1. Select earthquakes as Node parents.
    2. Select fields: id, eq_primary, and country.
      select field node for amazon s3 to bq dataflow
  6. Next, choose Aggregate transformation.
    1. Enter a Name, for example Aggregate.
    2. Choose Select Fields as Node parents.
    3. Choose eq_primary and country as the group by columns.
    4. Add id as the aggregate column and count as the aggregation function.
      aggregate node for amazon s3 to bq dataflow
  7. Next, choose RenameField transformation.
    1. Enter a name for the source Amazon S3 node, for example, Rename eq_primary.
    2. Choose Aggregate as Node parents.
    3. Choose eq_primary as the Current field name and enter earthquake_magnitude as the New field name.
      rename eq_primary field for amazon s3 to bq dataflow
  8. Next, choose RenameField transformation
    1. Enter a name for the source Amazon S3 node, for example, Rename count(id).
    2. Choose Rename eq_primary as Node parents.
    3. Choose count(id) as the Current field name and enter number_of_earthquakes as the New field name.
      rename cound(id) field for amazon s3 to bq dataflow
  9. Next, choose the data target as Google BigQuery.
    1. Provide a name for your Google BigQuery source node, for example, most_powerful_earthquakes.
    2. Select a Google BigQuery connection, for example, bq-connection.
    3. Select Parent project, for example, bigquery-public-datasources.
    4. Enter the name of the Table you want to create in the form [dataset].[table], for example, noaa_significant_earthquakes.most_powerful_earthquakes.
    5. Choose Direct as the Write method.
      bq destination for amazon s3 to bq dataflow
  10. Next go to the Job details tab and in the IAM Role, select the IAM role from the prerequisites, for example, AWSGlueRole.
    IAM role for amazon s3 to bq dataflow
  11. Choose Save.

Run and monitor the job

  1. After your ETL job is configured, you can run the job. AWS Glue runs the ETL process, extracting data from Google BigQuery and loading it into your specified S3 location.
  2. Monitor the job’s progress in the AWS Glue console. You can see logs and job run history to ensure everything is running smoothly.

monitor and run for amazon s3 to bq dataflow

Data validation

  1. After the job has run successfully, validate the data in your Google BigQuery dataset. This ETL job returns a list of countries where the most powerful earthquakes have occurred. It provides these by counting the number of earthquakes for a given magnitude by country.

aggregated results for amazon s3 to bq dataflow

Automate and schedule

  1. You can set up job scheduling to run the ETL process regularly. AWS Glue allows you to automate your ETL jobs, ensuring your S3 bucket is always up to date with the latest data from Google BigQuery.

That’s it! You’ve successfully set up an AWS Glue ETL job to transfer data from Amazon S3 to Google BigQuery. You can use this integration to automate the process of data extraction, transformation, and loading between these two platforms, making your data readily available for analysis and other applications.

Clean up

To avoid incurring charges, clean up the resources used in this blog post from your AWS account by completing the following steps:

  1. On the AWS Glue console, choose Visual ETL in the navigation pane.
  2. From the list of jobs, select the job bq-s3-data-flow and delete it.
  3. From the list of jobs, select the job s3-bq-data-flow and delete it.
  4. On the AWS Glue console, choose Connections in the navigation pane under Data Catalog.
  5. Choose the BiqQuery connection you created and delete it.
  6. On the Secrets Manager console, choose the secret you created and delete it.
  7. On the IAM console, choose Roles in the navigation pane, then select the role you created for the AWS Glue ETL job and delete it.
  8. On the Amazon S3 console, search for the S3 bucket you created, choose Empty to delete the objects, then delete the bucket.
  9. Clean up resources in your Google account by deleting the project that contains the Google BigQuery resources. Follow the documentation to clean up the Google resources.

Conclusion

The integration of AWS Glue with Google BigQuery simplifies the analytics pipeline, reduces time-to-insight, and facilitates data-driven decision-making. It empowers organizations to streamline data integration and analytics. The serverless nature of AWS Glue means no infrastructure management, and you pay only for the resources consumed while your jobs are running. As organizations increasingly rely on data for decision-making, this native spark connector provides an efficient, cost-effective, and agile solution to swiftly meet data analytics needs.

If you’re interested to see how to read from and write to tables in Google BigQuery in AWS Glue, take a look at step-by-step video tutorial. In this tutorial, we walk through the entire process, from setting up the connection to running the data transfer flow. For more information on AWS Glue, visit AWS Glue.

Appendix

If you are looking to implement this example, using code instead of the AWS Glue console, use the following code snippets.

Reading data from Google BigQuery and writing data into Amazon S3

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# STEP-1 Read the data from Big Query Table 
noaa_significant_earthquakes_node1697123333266 = (
    glueContext.create_dynamic_frame.from_options(
        connection_type="bigquery",
        connection_options={
            "connectionName": "bq-connection",
            "parentProject": "bigquery-public-datasources",
            "sourceType": "table",
            "table": "noaa_significant_earthquakes.earthquakes",
        },
        transformation_ctx="noaa_significant_earthquakes_node1697123333266",
    )
)
# STEP-2 Write the data read from Big Query Table into S3
# You should replace <YourBucketName> with the name of your bucket.
earthquakes_node1697157772747 = glueContext.write_dynamic_frame.from_options(
    frame=noaa_significant_earthquakes_node1697123333266,
    connection_type="s3",
    format="glueparquet",
    connection_options={
        "path": "s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/",
        "partitionKeys": [],
    },
    format_options={"compression": "snappy"},
    transformation_ctx="earthquakes_node1697157772747",
)

job.commit()

Reading and aggregating data from Amazon S3 and writing into Google BigQuery

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from awsglue import DynamicFrame
from pyspark.sql import functions as SqlFuncs

def sparkAggregate(
    glueContext, parentFrame, groups, aggs, transformation_ctx
) -> DynamicFrame:
    aggsFuncs = []
    for column, func in aggs:
        aggsFuncs.append(getattr(SqlFuncs, func)(column))
    result = (
        parentFrame.toDF().groupBy(*groups).agg(*aggsFuncs)
        if len(groups) > 0
        else parentFrame.toDF().agg(*aggsFuncs)
    )
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

# STEP-1 Read the data from Amazon S3 bucket
# You should replace <YourBucketName> with the name of your bucket.
earthquakes_node1697218776818 = glueContext.create_dynamic_frame.from_options(
    format_options={},
    connection_type="s3",
    format="parquet",
    connection_options={
        "paths": [
            "s3://<YourBucketName>/noaa_significant_earthquakes/earthquakes/"
        ],
        "recurse": True,
    },
    transformation_ctx="earthquakes_node1697218776818",
)

# STEP-2 Select fields
SelectFields_node1697218800361 = SelectFields.apply(
    frame=earthquakes_node1697218776818,
    paths=["id", "eq_primary", "country"],
    transformation_ctx="SelectFields_node1697218800361",
)

# STEP-3 Aggregate data
Aggregate_node1697218823404 = sparkAggregate(
    glueContext,
    parentFrame=SelectFields_node1697218800361,
    groups=["eq_primary", "country"],
    aggs=[["id", "count"]],
    transformation_ctx="Aggregate_node1697218823404",
)

Renameeq_primary_node1697219483114 = RenameField.apply(
    frame=Aggregate_node1697218823404,
    old_name="eq_primary",
    new_name="earthquake_magnitude",
    transformation_ctx="Renameeq_primary_node1697219483114",
)

Renamecountid_node1697220511786 = RenameField.apply(
    frame=Renameeq_primary_node1697219483114,
    old_name="`count(id)`",
    new_name="number_of_earthquakes",
    transformation_ctx="Renamecountid_node1697220511786",
)

# STEP-1 Write the aggregated data in Google BigQuery
most_powerful_earthquakes_node1697220563923 = (
    glueContext.write_dynamic_frame.from_options(
        frame=Renamecountid_node1697220511786,
        connection_type="bigquery",
        connection_options={
            "connectionName": "bq-connection",
            "parentProject": "bigquery-public-datasources",
            "writeMethod": "direct",
            "table": "noaa_significant_earthquakes.most_powerful_earthquakes",
        },
        transformation_ctx="most_powerful_earthquakes_node1697220563923",
    )
)

job.commit()


About the authors

Kartikay Khator is a Solutions Architect in Global Life Sciences at Amazon Web Services (AWS). He is passionate about building innovative and scalable solutions to meet the needs of customers, focusing on AWS Analytics services. Beyond the tech world, he is an avid runner and enjoys hiking.

Kamen SharlandjievKamen Sharlandjiev is a Sr. Big Data and ETL Solutions Architect and Amazon AppFlow expert. He’s on a mission to make life easier for customers who are facing complex data integration challenges. His secret weapon? Fully managed, low-code AWS services that can get the job done with minimal effort and no coding.

Anshul SharmaAnshul Sharma is a Software Development Engineer in AWS Glue Team. He is driving the connectivity charter which provide Glue customer native way of connecting any Data source (Data-warehouse, Data-lakes, NoSQL etc) to Glue ETL Jobs. Beyond the tech world, he is a cricket and soccer lover.

Create, train, and deploy Amazon Redshift ML model integrating features from Amazon SageMaker Feature Store

Post Syndicated from Anirban Sinha original https://aws.amazon.com/blogs/big-data/create-train-and-deploy-amazon-redshift-ml-model-integrating-features-from-amazon-sagemaker-feature-store/

Amazon Redshift is a fast, petabyte-scale, cloud data warehouse that tens of thousands of customers rely on to power their analytics workloads. Data analysts and database developers want to use this data to train machine learning (ML) models, which can then be used to generate insights on new data for use cases such as forecasting revenue, predicting customer churn, and detecting anomalies. Amazon Redshift ML makes it easy for SQL users to create, train, and deploy ML models using SQL commands familiar to many roles such as executives, business analysts, and data analysts. We covered in a previous post how you can use data in Amazon Redshift to train models in Amazon SageMaker, a fully managed ML service, and then make predictions within your Redshift data warehouse.

Redshift ML currently supports ML algorithms such as XGBoost, multilayer perceptron (MLP), KMEANS, and Linear Learner. Additionally, you can import existing SageMaker models into Amazon Redshift for in-database inference or remotely invoke a SageMaker endpoint.

Amazon SageMaker Feature Store is a fully managed, purpose-built repository to store, share, and manage features for ML models. However, one challenge in training a production-ready ML model using SageMaker Feature Store is access to a diverse set of features that aren’t always owned and maintained by the team that is building the model. For example, an ML model to identify fraudulent financial transactions needs access to both identifying (device type, browser) and transaction (amount, credit or debit, and so on) related features. As a data scientist building an ML model, you may have access to the identifying information but not the transaction information, and having access to a feature store solves this.

In this post, we discuss the combined feature store pattern, which allows teams to maintain their own local feature stores using a local Redshift table while still being able to access shared features from the centralized feature store. In a local feature store, you can store sensitive data that can’t be shared across the organization for regulatory and compliance reasons.

We also show you how to use familiar SQL statements to create and train ML models by combining shared features from the centralized store with local features and use these models to make in-database predictions on new data for use cases such as fraud risk scoring.

Overview of solution

For this post, we create an ML model to predict if a transaction is fraudulent or not, given the transaction record. To build this, we need to engineer features that describe an individual credit card’s spending pattern, such as the number of transactions or the average transaction amount, and also information about the merchant, the cardholder, the device used to make the payment, and any other data that may be relevant to detecting fraud.

To get started, we need an Amazon Redshift Serverless data warehouse with the Redshift ML feature enabled and an Amazon SageMaker Studio environment with access to SageMaker Feature Store. For an introduction to Redshift ML and instructions on setting it up, see Create, train, and deploy machine learning models in Amazon Redshift using SQL with Amazon Redshift ML.

We also need an offline feature store to store features in feature groups. The offline store uses an Amazon Simple Storage Service (Amazon S3) bucket for storage and can also fetch data using Amazon Athena queries. For an introduction to SageMaker Feature Store and instructions on setting it up, see Getting started with Amazon SageMaker Feature Store.

The following diagram illustrates solution architecture.

The workflow contains the following steps:

  1. Create the offline feature group in SageMaker Feature Store and ingest data into the feature group.
  2. Create a Redshift table and load local feature data into the table.
  3. Create an external schema for Amazon Redshift Spectrum to access the offline store data stored in Amazon S3 using the AWS Glue Data Catalog.
  4. Train and validate a fraud risk scoring ML model using local feature data and external offline feature store data.
  5. Use the offline feature store and local store for inference.

Dataset

To demonstrate this use case, we use a synthetic dataset with two tables: identity and transactions. They can both be joined by the TransactionID column. The transaction table contains information about a particular transaction, such as amount, credit or debit card, and so on, and the identity table contains information about the user, such as device type and browser. The transaction must exist in the transaction table, but might not always be available in the identity table.

The following is an example of the transactions dataset.

The following is an example of the identity dataset.

Let’s assume that across the organization, data science teams centrally manage the identity data and process it to extract features in a centralized offline feature store. The data warehouse team ingests and analyzes transaction data in a Redshift table, owned by them.

We work through this use case to understand how the data warehouse team can securely retrieve the latest features from the identity feature group and join it with transaction data in Amazon Redshift to create a feature set for training and inferencing a fraud detection model.

Create the offline feature group and ingest data

To start, we set up SageMaker Feature Store, create a feature group for the identity dataset, inspect and process the dataset, and ingest some sample data. We then prepare the transaction features from the transaction data and store it in Amazon S3 for further loading into the Redshift table.

Alternatively, you can author features using Amazon SageMaker Data Wrangler, create feature groups in SageMaker Feature Store, and ingest features in batches using an Amazon SageMaker Processing job with a notebook exported from SageMaker Data Wrangler. This mode allows for batch ingestion into the offline store.

Let’s explore some of the key steps in this section.

  1. Download the sample notebook.
  2. On the SageMaker console, under Notebook in the navigation pane, choose Notebook instances.
  3. Locate your notebook instance and choose Open Jupyter.
  4. Choose Upload and upload the notebook you just downloaded.
  5. Open the notebook sagemaker_featurestore_fraud_redshiftml_python_sdk.ipynb.
  6. Follow the instructions and run all the cells up to the Cleanup Resources section.

The following are key steps from the notebook:

  1. We create a Pandas DataFrame with the initial CSV data. We apply feature transformations for this dataset.
    identity_data = pd.read_csv(io.BytesIO(identity_data_object["Body"].read()))
    transaction_data = pd.read_csv(io.BytesIO(transaction_data_object["Body"].read()))
    
    identity_data = identity_data.round(5)
    transaction_data = transaction_data.round(5)
    
    identity_data = identity_data.fillna(0)
    transaction_data = transaction_data.fillna(0)
    
    # Feature transformations for this dataset are applied 
    # One hot encode card4, card6
    encoded_card_bank = pd.get_dummies(transaction_data["card4"], prefix="card_bank")
    encoded_card_type = pd.get_dummies(transaction_data["card6"], prefix="card_type")
    
    transformed_transaction_data = pd.concat(
        [transaction_data, encoded_card_type, encoded_card_bank], axis=1
    )

  2. We store the processed and transformed transaction dataset in an S3 bucket. This transaction data will be loaded later in the Redshift table for building the local feature store.
    transformed_transaction_data.to_csv("transformed_transaction_data.csv", header=False, index=False)
    s3_client.upload_file("transformed_transaction_data.csv", default_s3_bucket_name, prefix + "/training_input/transformed_transaction_data.csv")

  3. Next, we need a record identifier name and an event time feature name. In our fraud detection example, the column of interest is TransactionID.EventTime can be appended to your data when no timestamp is available. In the following code, you can see how these variables are set, and then EventTime is appended to both features’ data.
    # record identifier and event time feature names
    record_identifier_feature_name = "TransactionID"
    event_time_feature_name = "EventTime"
    
    # append EventTime feature
    identity_data[event_time_feature_name] = pd.Series(
        [current_time_sec] * len(identity_data), dtype="float64"
    )

  4. We then create and ingest the data into the feature group using the SageMaker SDK FeatureGroup.ingest API. This is a small dataset and therefore can be loaded into a Pandas DataFrame. When we work with large amounts of data and millions of rows, there are other scalable mechanisms to ingest data into SageMaker Feature Store, such as batch ingestion with Apache Spark.
    identity_feature_group.create(
        s3_uri=<S3_Path_Feature_Store>,
        record_identifier_name=record_identifier_feature_name,
        event_time_feature_name=event_time_feature_name,
        role_arn=<role_arn>,
        enable_online_store=False,
    )
    
    identity_feature_group_name = "identity-feature-group"
    
    # load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
    identity_feature_group.load_feature_definitions(data_frame=identity_data)
    identity_feature_group.ingest(data_frame=identity_data, max_workers=3, wait=True)
    

  5. We can verify that data has been ingested into the feature group by running Athena queries in the notebook or running queries on the Athena console.

At this point, the identity feature group is created in an offline feature store with historical data persisted in Amazon S3. SageMaker Feature Store automatically creates an AWS Glue Data Catalog for the offline store, which enables us to run SQL queries against the offline data using Athena or Redshift Spectrum.

Create a Redshift table and load local feature data

To build a Redshift ML model, we build a training dataset joining the identity data and transaction data using SQL queries. The identity data is in a centralized feature store where the historical set of records are persisted in Amazon S3. The transaction data is a local feature for training data that needs to made available in the Redshift table.

Let’s explore how to create the schema and load the processed transaction data from Amazon S3 into a Redshift table.

  1. Create the customer_transaction table and load daily transaction data into the table, which you’ll use to train the ML model:
    DROP TABLE customer_transaction;
    CREATE TABLE customer_transaction (
      TransactionID INT,    
      isFraud INT,  
      TransactionDT INT,    
      TransactionAmt decimal(10,2), 
      card1 INT,    
      card2 decimal(10,2),card3 decimal(10,2),  
      card4 VARCHAR(20),card5 decimal(10,2),    
      card6 VARCHAR(20),    
      B1 INT,B2 INT,B3 INT,B4 INT,B5 INT,B6 INT,
      B7 INT,B8 INT,B9 INT,B10 INT,B11 INT,B12 INT,
      F1 INT,F2 INT,F3 INT,F4 INT,F5 INT,F6 INT,
      F7 INT,F8 INT,F9 INT,F10 INT,F11 INT,F12 INT,
      F13 INT,F14 INT,F15 INT,F16 INT,F17 INT,  
      N1 VARCHAR(20),N2 VARCHAR(20),N3 VARCHAR(20), 
      N4 VARCHAR(20),N5 VARCHAR(20),N6 VARCHAR(20), 
      N7 VARCHAR(20),N8 VARCHAR(20),N9 VARCHAR(20), 
      card_type_0  boolean,
      card_type_credit boolean,
      card_type_debit  boolean,
      card_bank_0  boolean,
      card_bank_american_express boolean,
      card_bank_discover  boolean,
      card_bank_mastercard  boolean,
      card_bank_visa boolean  
    );

  2. Load the sample data by using the following command. Replace your Region and S3 path as appropriate. You will find the S3 path in the S3 Bucket Setup For The OfflineStore section in the notebook or by checking the dataset_uri_prefix in the notebook.
    COPY customer_transaction
    FROM '<s3path>/transformed_transaction_data.csv' 
    IAM_ROLE default delimiter ',' 
    region 'your-region';

Now that we have created a local feature store for the transaction data, we focus on integrating a centralized feature store with Amazon Redshift to access the identity data.

Create an external schema for Redshift Spectrum to access the offline store data

We have created a centralized feature store for identity features, and we can access this offline feature store using services such as Redshift Spectrum. When the identity data is available through the Redshift Spectrum table, we can create a training dataset with feature values from the identity feature group and customer_transaction, joining on the TransactionId column.

This section provides an overview of how to enable Redshift Spectrum to query data directly from files on Amazon S3 through an external database in an AWS Glue Data Catalog.

  1. First, check that the identity-feature-group table is present in the Data Catalog under the sagemamker_featurestore database.
  2. Using Redshift Query Editor V2, create an external schema using the following command:
    CREATE EXTERNAL SCHEMA sagemaker_featurestore
    FROM DATA CATALOG
    DATABASE 'sagemaker_featurestore'
    IAM_ROLE default
    create external database if not exists;

All the tables, including identity-feature-group external tables, are visible under the sagemaker_featurestore external schema. In Redshift Query Editor v2, you can check the contents of the external schema.

  1. Run the following query to sample a few records—note that your table name may be different:
    Select * from sagemaker_featurestore.identity_feature_group_1680208535 limit 10;

  2. Create a view to join the latest data from identity-feature-group and customer_transaction on the TransactionId column. Be sure to change the external table name to match your external table name:
    create or replace view public.credit_fraud_detection_v
    AS select  "isfraud",
            "transactiondt",
            "transactionamt",
            "card1","card2","card3","card5",
             case when "card_type_credit" = 'False' then 0 else 1 end as card_type_credit,
             case when "card_type_debit" = 'False' then 0 else 1 end as card_type_debit,
             case when "card_bank_american_express" = 'False' then 0 else 1 end as card_bank_american_express,
             case when "card_bank_discover" = 'False' then 0 else 1 end as card_bank_discover,
             case when "card_bank_mastercard" = 'False' then 0 else 1 end as card_bank_mastercard,
             case when "card_bank_visa" = 'False' then 0 else 1 end as card_bank_visa,
            "id_01","id_02","id_03","id_04","id_05"
    from public.customer_transaction ct left join sagemaker_featurestore.identity_feature_group_1680208535 id
    on id.transactionid = ct.transactionid with no schema binding;

Train and validate the fraud risk scoring ML model

Redshift ML gives you the flexibility to specify your own algorithms and model types and also to provide your own advanced parameters, which can include preprocessors, problem type, and hyperparameters. In this post, we create a customer model by specifying AUTO OFF and the model type of XGBOOST. By turning AUTO OFF and using XGBoost, we are providing the necessary inputs for SageMaker to train the model. A benefit of this can be faster training times. XGBoost is as open-source version of the gradient boosted trees algorithm. For more details on XGBoost, refer to Build XGBoost models with Amazon Redshift ML.

We train the model using 80% of the dataset by filtering on transactiondt < 12517618. The other 20% will be used for inference. A centralized feature store is useful in providing the latest supplementing data for training requests. Note that you will need to provide an S3 bucket name in the create model statement. It will take approximately 10 minutes to create the model.

CREATE MODEL frauddetection_xgboost
FROM (select  "isfraud",
        "transactiondt",
        "transactionamt",
        "card1","card2","card3","card5",
        "card_type_credit",
        "card_type_debit",
        "card_bank_american_express",
        "card_bank_discover",
        "card_bank_mastercard",
        "card_bank_visa",
        "id_01","id_02","id_03","id_04","id_05"
from credit_fraud_detection_v where transactiondt < 12517618
)
TARGET isfraud
FUNCTION ml_fn_frauddetection_xgboost
IAM_ROLE default
AUTO OFF
MODEL_TYPE XGBOOST
OBJECTIVE 'binary:logistic'
PREPROCESSORS 'none'
HYPERPARAMETERS DEFAULT EXCEPT(NUM_ROUND '100')
SETTINGS (S3_BUCKET <s3_bucket>);

When you run the create model command, it will complete quickly in Amazon Redshift while the model training is happening in the background using SageMaker. You can check the status of the model by running a show model command:

show model frauddetection_xgboost;

The output of the show model command shows that the model state is TRAINING. It also shows other information such as the model type and the training job name that SageMaker assigned.
After a few minutes, we run the show model command again:

show model frauddetection_xgboost;

Now the output shows the model state is READY. We can also see the train:error score here, which at 0 tells us we have a good model. Now that the model is trained, we can use it for running inference queries.

Use the offline feature store and local store for inference

We can use the SQL function to apply the ML model to data in queries, reports, and dashboards. Let’s use the function ml_fn_frauddetection_xgboost created by our model against our test dataset by filtering where transactiondt >=12517618, to predict whether a transaction is fraudulent or not. SageMaker Feature Store can be useful in supplementing data for inference requests.

Run the following query to predict whether transactions are fraudulent or not:

select  "isfraud" as "Actual",
        ml_fn_frauddetection_xgboost(
        "transactiondt",
        "transactionamt",
        "card1","card2","card3","card5",
        "card_type_credit",
        "card_type_debit",
        "card_bank_american_express",
        "card_bank_discover",
        "card_bank_mastercard",
        "card_bank_visa",
        "id_01","id_02","id_03","id_04","id_05") as "Predicted"
from credit_fraud_detection_v where transactiondt >= 12517618;

For binary and multi-class classification problems, we compute the accuracy as the model metric. Accuracy can be calculated based on the following:

accuracy = (sum (actual == predicted)/total) *100

Let’s apply the preceding code to our use case to find the accuracy of the model. We use the test data (transactiondt >= 12517618) to test the accuracy, and use the newly created function ml_fn_frauddetection_xgboost to predict and take the columns other than the target and label as the input:

-- check accuracy 
WITH infer_data AS (
SELECT "isfraud" AS label,
ml_fn_frauddetection_xgboost(
        "transactiondt",
        "transactionamt",
        "card1","card2","card3","card5",
        "card_type_credit",
        "card_type_debit",
        "card_bank_american_express",
        "card_bank_discover",
        "card_bank_mastercard",
        "card_bank_visa",
        "id_01","id_02","id_03","id_04","id_05") AS predicted,
CASE 
   WHEN label IS NULL
       THEN 0
   ELSE label
   END AS actual,
CASE 
   WHEN actual = predicted
       THEN 1::INT
   ELSE 0::INT
   END AS correct
FROM credit_fraud_detection_v where transactiondt >= 12517618),
aggr_data AS (
SELECT SUM(correct) AS num_correct,
COUNT(*) AS total
FROM infer_data) 

SELECT (num_correct::FLOAT / total::FLOAT) AS accuracy FROM aggr_data;

Clean up

As a final step, clean up the resources:

  1. Delete the Redshift cluster.
  2. Run the Cleanup Resources section of your notebook.

Conclusion

Redshift ML enables you to bring machine learning to your data, powering fast and informed decision-making. SageMaker Feature Store provides a purpose-built feature management solution to help organizations scale ML development across business units and data science teams.

In this post, we showed how you can train an XGBoost model using Redshift ML with data spread across SageMaker Feature Store and a Redshift table. Additionally, we showed how you can make inferences on a trained model to detect fraud using Amazon Redshift SQL commands.


About the authors

Anirban Sinha is a Senior Technical Account Manager at AWS. He is passionate about building scalable data warehouses and big data solutions working closely with customers. He works with large ISVs customers, in helping them build and operate secure, resilient, scalable, and high-performance SaaS applications in the cloud.

Phil Bates is a Senior Analytics Specialist Solutions Architect at AWS. He has more than 25 years of experience implementing large-scale data warehouse solutions. He is passionate about helping customers through their cloud journey and using the power of ML within their data warehouse.

Gaurav Singh is a Senior Solutions Architect at AWS, specializing in AI/ML and Generative AI. Based in Pune, India, he focuses on helping customers build, deploy, and migrate ML production workloads to SageMaker at scale. In his spare time, Gaurav loves to explore nature, read, and run.

Unstructured data management and governance using AWS AI/ML and analytics services

Post Syndicated from Sakti Mishra original https://aws.amazon.com/blogs/big-data/unstructured-data-management-and-governance-using-aws-ai-ml-and-analytics-services/

Unstructured data is information that doesn’t conform to a predefined schema or isn’t organized according to a preset data model. Unstructured information may have a little or a lot of structure but in ways that are unexpected or inconsistent. Text, images, audio, and videos are common examples of unstructured data. Most companies produce and consume unstructured data such as documents, emails, web pages, engagement center phone calls, and social media. By some estimates, unstructured data can make up to 80–90% of all new enterprise data and is growing many times faster than structured data. After decades of digitizing everything in your enterprise, you may have an enormous amount of data, but with dormant value. However, with the help of AI and machine learning (ML), new software tools are now available to unearth the value of unstructured data.

In this post, we discuss how AWS can help you successfully address the challenges of extracting insights from unstructured data. We discuss various design patterns and architectures for extracting and cataloging valuable insights from unstructured data using AWS. Additionally, we show how to use AWS AI/ML services for analyzing unstructured data.

Why it’s challenging to process and manage unstructured data

Unstructured data makes up a large proportion of the data in the enterprise that can’t be stored in a traditional relational database management systems (RDBMS). Understanding the data, categorizing it, storing it, and extracting insights from it can be challenging. In addition, identifying incremental changes requires specialized patterns and detecting sensitive data and meeting compliance requirements calls for sophisticated functions. It can be difficult to integrate unstructured data with structured data from existing information systems. Some view structured and unstructured data as apples and oranges, instead of being complementary. But most important of all, the assumed dormant value in the unstructured data is a question mark, which can only be answered after these sophisticated techniques have been applied. Therefore, there is a need to being able to analyze and extract value from the data economically and flexibly.

Solution overview

Data and metadata discovery is one of the primary requirements in data analytics, where data consumers explore what data is available and in what format, and then consume or query it for analysis. If you can apply a schema on top of the dataset, then it’s straightforward to query because you can load the data into a database or impose a virtual table schema for querying. But in the case of unstructured data, metadata discovery is challenging because the raw data isn’t easily readable.

You can integrate different technologies or tools to build a solution. In this post, we explain how to integrate different AWS services to provide an end-to-end solution that includes data extraction, management, and governance.

The solution integrates data in three tiers. The first is the raw input data that gets ingested by source systems, the second is the output data that gets extracted from input data using AI, and the third is the metadata layer that maintains a relationship between them for data discovery.

The following is a high-level architecture of the solution we can build to process the unstructured data, assuming the input data is being ingested to the raw input object store.

Unstructured Data Management - Block Level Architecture Diagram

The steps of the workflow are as follows:

  1. Integrated AI services extract data from the unstructured data.
  2. These services write the output to a data lake.
  3. A metadata layer helps build the relationship between the raw data and AI extracted output. When the data and metadata are available for end-users, we can break the user access pattern into additional steps.
  4. In the metadata catalog discovery step, we can use query engines to access the metadata for discovery and apply filters as per our analytics needs. Then we move to the next stage of accessing the actual data extracted from the raw unstructured data.
  5. The end-user accesses the output of the AI services and uses the query engines to query the structured data available in the data lake. We can optionally integrate additional tools that help control access and provide governance.
  6. There might be scenarios where, after accessing the AI extracted output, the end-user wants to access the original raw object (such as media files) for further analysis. Additionally, we need to make sure we have access control policies so the end-user has access only to the respective raw data they want to access.

Now that we understand the high-level architecture, let’s discuss what AWS services we can integrate in each step of the architecture to provide an end-to-end solution.

The following diagram is the enhanced version of our solution architecture, where we have integrated AWS services.

Unstructured Data Management - AWS Native Architecture

Let’s understand how these AWS services are integrated in detail. We have divided the steps into two broad user flows: data processing and metadata enrichment (Steps 1–3) and end-users accessing the data and metadata with fine-grained access control (Steps 4–6).

  1. Various AI services (which we discuss in the next section) extract data from the unstructured datasets.
  2. The output is written to an Amazon Simple Storage Service (Amazon S3) bucket (labeled Extracted JSON in the preceding diagram). Optionally, we can restructure the input raw objects for better partitioning, which can help while implementing fine-grained access control on the raw input data (labeled as the Partitioned bucket in the diagram).
  3. After the initial data extraction phase, we can apply additional transformations to enrich the datasets using AWS Glue. We also build an additional metadata layer, which maintains a relationship between the raw S3 object path, the AI extracted output path, the optional enriched version S3 path, and any other metadata that will help the end-user discover the data.
  4. In the metadata catalog discovery step, we use the AWS Glue Data Catalog as the technical catalog, Amazon Athena and Amazon Redshift Spectrum as query engines, AWS Lake Formation for fine-grained access control, and Amazon DataZone for additional governance.
  5. The AI extracted output is expected to be available as a delimited file or in JSON format. We can create an AWS Glue Data Catalog table for querying using Athena or Redshift Spectrum. Like the previous step, we can use Lake Formation policies for fine-grained access control.
  6. Lastly, the end-user accesses the raw unstructured data available in Amazon S3 for further analysis. We have proposed integrating Amazon S3 Access Points for access control at this layer. We explain this in detail later in this post.

Now let’s expand the following parts of the architecture to understand the implementation better:

  • Using AWS AI services to process unstructured data
  • Using S3 Access Points to integrate access control on raw S3 unstructured data

Process unstructured data with AWS AI services

As we discussed earlier, unstructured data can come in a variety of formats, such as text, audio, video, and images, and each type of data requires a different approach for extracting metadata. AWS AI services are designed to extract metadata from different types of unstructured data. The following are the most commonly used services for unstructured data processing:

  • Amazon Comprehend – This natural language processing (NLP) service uses ML to extract metadata from text data. It can analyze text in multiple languages, detect entities, extract key phrases, determine sentiment, and more. With Amazon Comprehend, you can easily gain insights from large volumes of text data such as extracting product entity, customer name, and sentiment from social media posts.
  • Amazon Transcribe – This speech-to-text service uses ML to convert speech to text and extract metadata from audio data. It can recognize multiple speakers, transcribe conversations, identify keywords, and more. With Amazon Transcribe, you can convert unstructured data such as customer support recordings into text and further derive insights from it.
  • Amazon Rekognition – This image and video analysis service uses ML to extract metadata from visual data. It can recognize objects, people, faces, and text, detect inappropriate content, and more. With Amazon Rekognition, you can easily analyze images and videos to gain insights such as identifying entity type (human or other) and identifying if the person is a known celebrity in an image.
  • Amazon Textract – You can use this ML service to extract metadata from scanned documents and images. It can extract text, tables, and forms from images, PDFs, and scanned documents. With Amazon Textract, you can digitize documents and extract data such as customer name, product name, product price, and date from an invoice.
  • Amazon SageMaker – This service enables you to build and deploy custom ML models for a wide range of use cases, including extracting metadata from unstructured data. With SageMaker, you can build custom models that are tailored to your specific needs, which can be particularly useful for extracting metadata from unstructured data that requires a high degree of accuracy or domain-specific knowledge.
  • Amazon Bedrock – This fully managed service offers a choice of high-performing foundation models (FMs) from leading AI companies like AI21 Labs, Anthropic, Cohere, Meta, Stability AI, and Amazon with a single API. It also offers a broad set of capabilities to build generative AI applications, simplifying development while maintaining privacy and security.

With these specialized AI services, you can efficiently extract metadata from unstructured data and use it for further analysis and insights. It’s important to note that each service has its own strengths and limitations, and choosing the right service for your specific use case is critical for achieving accurate and reliable results.

AWS AI services are available via various APIs, which enables you to integrate AI capabilities into your applications and workflows. AWS Step Functions is a serverless workflow service that allows you to coordinate and orchestrate multiple AWS services, including AI services, into a single workflow. This can be particularly useful when you need to process large amounts of unstructured data and perform multiple AI-related tasks, such as text analysis, image recognition, and NLP.

With Step Functions and AWS Lambda functions, you can create sophisticated workflows that include AI services and other AWS services. For instance, you can use Amazon S3 to store input data, invoke a Lambda function to trigger an Amazon Transcribe job to transcribe an audio file, and use the output to trigger an Amazon Comprehend analysis job to generate sentiment metadata for the transcribed text. This enables you to create complex, multi-step workflows that are straightforward to manage, scalable, and cost-effective.

The following is an example architecture that shows how Step Functions can help invoke AWS AI services using Lambda functions.

AWS AI Services - Lambda Event Workflow -Unstructured Data

The workflow steps are as follows:

  1. Unstructured data, such as text files, audio files, and video files, are ingested into the S3 raw bucket.
  2. A Lambda function is triggered to read the data from the S3 bucket and call Step Functions to orchestrate the workflow required to extract the metadata.
  3. The Step Functions workflow checks the type of file, calls the corresponding AWS AI service APIs, checks the job status, and performs any postprocessing required on the output.
  4. AWS AI services can be accessed via APIs and invoked as batch jobs. To extract metadata from different types of unstructured data, you can use multiple AI services in sequence, with each service processing the corresponding file type.
  5. After the Step Functions workflow completes the metadata extraction process and performs any required postprocessing, the resulting output is stored in an S3 bucket for cataloging.

Next, let’s understand how can we implement security or access control on both the extracted output as well as the raw input objects.

Implement access control on raw and processed data in Amazon S3

We just consider access controls for three types of data when managing unstructured data: the AI-extracted semi-structured output, the metadata, and the raw unstructured original files. When it comes to AI extracted output, it’s in JSON format and can be restricted via Lake Formation and Amazon DataZone. We recommend keeping the metadata (information that captures which unstructured datasets are already processed by the pipeline and available for analysis) open to your organization, which will enable metadata discovery across the organization.

To control access of raw unstructured data, you can integrate S3 Access Points and explore additional support in the future as AWS services evolve. S3 Access Points simplify data access for any AWS service or customer application that stores data in Amazon S3. Access points are named network endpoints that are attached to buckets that you can use to perform S3 object operations. Each access point has distinct permissions and network controls that Amazon S3 applies for any request that is made through that access point. Each access point enforces a customized access point policy that works in conjunction with the bucket policy that is attached to the underlying bucket. With S3 Access Points, you can create unique access control policies for each access point to easily control access to specific datasets within an S3 bucket. This works well in multi-tenant or shared bucket scenarios where users or teams are assigned to unique prefixes within one S3 bucket.

An access point can support a single user or application, or groups of users or applications within and across accounts, allowing separate management of each access point. Every access point is associated with a single bucket and contains a network origin control and a Block Public Access control. For example, you can create an access point with a network origin control that only permits storage access from your virtual private cloud (VPC), a logically isolated section of the AWS Cloud. You can also create an access point with the access point policy configured to only allow access to objects with a defined prefix or to objects with specific tags. You can also configure custom Block Public Access settings for each access point.

The following architecture provides an overview of how an end-user can get access to specific S3 objects by assuming a specific AWS Identity and Access Management (IAM) role. If you have a large number of S3 objects to control access, consider grouping the S3 objects, assigning them tags, and then defining access control by tags.

S3 Access Points - Unstructured Data Management - Access Control

If you are implementing a solution that integrates S3 data available in multiple AWS accounts, you can take advantage of cross-account support for S3 Access Points.

Conclusion

This post explained how you can use AWS AI services to extract readable data from unstructured datasets, build a metadata layer on top of them to allow data discovery, and build an access control mechanism on top of the raw S3 objects and extracted data using Lake Formation, Amazon DataZone, and S3 Access Points.

In addition to AWS AI services, you can also integrate large language models with vector databases to enable semantic or similarity search on top of unstructured datasets. To learn more about how to enable semantic search on unstructured data by integrating Amazon OpenSearch Service as a vector database, refer to Try semantic search with the Amazon OpenSearch Service vector engine.

As of writing this post, S3 Access Points is one of the best solutions to implement access control on raw S3 objects using tagging, but as AWS service features evolve in the future, you can explore alternative options as well.


About the Authors

Sakti Mishra is a Principal Solutions Architect at AWS, where he helps customers modernize their data architecture and define their end-to-end data strategy, including data security, accessibility, governance, and more. He is also the author of the book Simplify Big Data Analytics with Amazon EMR. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places with family.

Bhavana Chirumamilla is a Senior Resident Architect at AWS with a strong passion for data and machine learning operations. She brings a wealth of experience and enthusiasm to help enterprises build effective data and ML strategies. In her spare time, Bhavana enjoys spending time with her family and engaging in various activities such as traveling, hiking, gardening, and watching documentaries.

Sheela Sonone is a Senior Resident Architect at AWS. She helps AWS customers make informed choices and trade-offs about accelerating their data, analytics, and AI/ML workloads and implementations. In her spare time, she enjoys spending time with her family—usually on tennis courts.

Daniel Bruno is a Principal Resident Architect at AWS. He had been building analytics and machine learning solutions for over 20 years and splits his time helping customers build data science programs and designing impactful ML products.

Processing large records with Amazon Kinesis Data Streams

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/processing-large-records-with-amazon-kinesis-data-streams/

In today’s digital era, data is abundant and constantly flowing. Businesses across industries are seeking ways to harness this wealth of information to gain valuable insights and make real-time decisions. To meet this need, AWS offers Amazon Kinesis Data Streams, a powerful and scalable real-time data streaming service. With Kinesis Data Streams, you can effortlessly collect, process, and analyze streaming data in real time at any scale. This service seamlessly integrates into your data architecture, allowing you to tap into the full potential of your data for informed decision-making.

Data streaming technologies like Kinesis Data Streams are designed to efficiently process and manage continuous streams of data in real time at large scale. The individual pieces of data within these streams are often referred to as records. In scenarios like large file processing or performing image, audio, or video analytics, your record may exceed 1 MB. You may struggle to ingest such a large record with Kinesis Data Streams because, as of this writing, the service has a 1 MB upper limit for maximum data record size.

In this post, we show you some different options for handling large records within Kinesis Data Streams and the benefits and disadvantages of each approach. We provide some sample code for each option to help you get started with any of these approaches with your own workloads.

Understanding the default behavior of Kinesis Data Streams

You can send records to Kinesis Data Streams using the PutRecord or PutRecords API calls. These APIs include a mandatory field known as PartitionKey, where you must provide a specific value. This partition key is used by the service to map records with the same partition keys to the same shard to ensure ordering and locality for consumption. Locality means that you want the same consumer to process all records for a given partition key. This helps ensure that data with the same partition key stays together within the same shard, maintaining data order.

Each shard, which holds your data, can handle writing up to 1 MB per second. Let’s consider a scenario where you define a partition key and attempt to send a data record that exceeds 1 MB in size. Based on the explanation so far, the service will reject this request because the record size is over 1 MB. To help you understand better, we experimented by trying to send a record of 1.5 MB to a stream, and the outcome was the following exception message:

import json
import boto3
client = boto3.client('kinesis', region_name='ap-southeast-2')

def lambda_handler(event, context):
    try:
        response = client.put_record(
            StreamName='test',
            Data=b'Sample 1 MB....',
            PartitionKey='string'
            #StreamARN='string'
        )
    
    except Exception as e:
        print (e)

START RequestId: 84b3ab0c-3f30-4267-aec1-549c2d59dfdb Version: $LATEST An error occurred (ValidationException) when calling the PutRecord operation: 1 validation error detected: Value at 'data' failed to satisfy constraint: Member must have length less than or equal to 1048576 END RequestId: 84b3ab0c-3f30-4267-aec1-549c2d59dfdb

Strategies for handling large records

Now that we understand the behavior of the PutRecord and PutRecords APIs, let’s discuss strategies you can use to overcome this situation. One thing to keep in mind is that there is no single best solution; in the following sections, we discuss some of the approaches that you can evaluate based on your use case:

  • Store large records in Amazon Simple Storage Service (Amazon S3) with a reference in Kinesis Data Streams
  • Split one large record into multiple records
  • Compress your large records

Let’s discuss these points one by one.

Store large records in Amazon S3 with a reference in Kinesis Data Streams

A useful approach for storing large records involves utilizing an alternative storage solution while employing a reference within Kinesis Data Streams. In this context, Amazon S3 stands out as an excellent choice due to its exceptional durability and cost-effectiveness. The procedure involves uploading the record as an object to an S3 bucket and subsequently writing a reference entry in Kinesis Data Streams. This entry incorporates an attribute that serves as a pointer, indicating the location of the object within Amazon S3.

With this approach, you can generate a pre-signed URL associated with the S3 object’s location. This link can be shared with the requester, offering them direct access to the object without the need for intermediary server-side data transfers.

The following diagram illustrates the architecture of this solution.

The following is the sample code to write data to Kinesis Data Streams using this approach:

import json
import boto3
import random

def lambda_handler(event, context):
    try:
        s3 = boto3.client('s3', region_name='ap-southeast-2')
        kds = boto3.client('kinesis', region_name='ap-southeast-2')
        expiration=3600
        pk=str(random.randint(100,100000000))
        bucket_name = 'MY_BUCKET'
        object_key = 'air/' + pk + '.txt'
        file_content = b'LARGE OBJECT'
        response = s3.put_object(Bucket=bucket_name, Key=object_key, Body=file_content)
        presigned_url = s3.generate_presigned_url(
            'get_object',
            Params={'Bucket': bucket_name, 'Key': object_key},
            ExpiresIn=expiration
        )
        
        kdata = {'message': presigned_url}
        response = kds.put_record(
            StreamName='test',
            Data=json.dumps(kdata),
            PartitionKey=pk
        )
        print (response)
    except Exception as e:
        print (e)

If you are using an AWS Lambda consumer to process this data, you can now decode the record to get the S3 pre-signed URL to efficiently retrieve the object from Amazon S3. Then you can implement your business logic to effectively process the data. The following is sample code for reference:

import json
import base64
import json

def lambda_handler(event, context):
    item = None
    decoded_record_data = [base64.b64decode(record['kinesis']['data']).decode().replace('\n','') for record in event['Records']]
    deserialized_data = [json.loads(decoded_record) for decoded_record in decoded_record_data]
    
    
    for item in deserialized_data:
        LOB=(item['message'])
        #process LOB implementing your business logic

An inherent benefit of adopting this technique is the capability to store data in Amazon S3, accommodating an extensive range of sizes per individual object. This method helps you reduce the costs of using Kinesis Data Streams because it uses less storage space and requires fewer read and write throughput for item access. This optimization is achieved by storing just the URL within Kinesis Data Streams. However, it’s important to acknowledge that accessing the sizable object necessitates an additional call to Amazon S3, thereby introducing higher latency for clients as they manage the additional request.

Split one large record into multiple records

Splitting large records into smaller ones in Kinesis Data Streams brings advantages like faster processing, improved throughput, efficient resource use, and more straightforward error handling. Let’s say you have a large record that you want to split into smaller chunks before sending them to a Kinesis data stream. First, you need to set up a Kinesis producer. Suppose you have a large record as a string. You can split it into smaller chunks of a predefined size. For this example, let’s say you’re splitting the record into chunks of 100 characters each. After you split that, loop through the record chunks and send each chunk as a separate message to a Kinesis data stream. The following is the sample code:

import boto3
kinesis = boto3.client('kinesis', region_name='ap-southeast-2')  

def split_record(record, chunk_size):
    chunks = [record[i:i + chunk_size] for i in range(0, len(record), chunk_size)]
    return chunks

def send_to_kinesis(stream_name, record):
    response = kinesis.put_record(
        StreamName=stream_name,
        Data=record,
        PartitionKey= '100'
    )
    return response

def main():
    stream_name = 'test'  
    large_record = 'Your large record'  # Replace with your actual record
    chunk_size = 100  

    record_chunks = split_record(large_record, chunk_size)

    for chunk in record_chunks:
        response = send_to_kinesis(stream_name, chunk)
        print(f"Record sent: {response['SequenceNumber']}")

if __name__ == "__main__":
    main()

Ensure that all chunks of a given message are directed to a single partition, thereby guaranteeing the preservation of their order. In the final chunk, include metadata within the header indicating the conclusion of the message during production. This enables consumers to identify the ultimate chunk and facilitates seamless message reconstruction. The drawback of this method is that it adds complexity to the client-side tasks of dividing and putting back together the different parts. Therefore, these functions need thorough testing to prevent any loss of data.

Compress your large records

Applying data compression prior to transmitting it to Kinesis Data Streams has numerous advantages. This approach not only reduces the data’s size, enabling swifter travel and more efficient utilization of network resources, but also leads to cost savings in terms of storage expenses while optimizing overall resource consumption. Additionally, this practice simplifies storage and data retention. By using compression algorithms such as GZIP, Snappy, or LZ4, you can achieve substantial reduction in the size of large records. Compression brings the benefit of simplicity because it’s implemented seamlessly without requiring the caller to make changes to the item or use extra AWS services to support storage. However, compression introduces additional CPU overhead and latency on the producer side, and its impact on the compression ratio and efficiency can vary depending on the data type and format. Also, compression can enhance consumer throughput at the expense of some decompression overhead.

Conclusion

For real-time data streaming use cases, it’s essential to carefully consider the handling of large records when using Kinesis Data Streams. In this post, we discussed the challenges associated with managing large records and explored strategies such as utilizing Amazon S3 references, record splitting, and compression. Each approach has its own set of benefits and drawbacks, so it’s crucial to evaluate the nature of your data and the tasks you need to perform. Select the most suitable approach based on your data’s characteristics and your processing task requirements.

We encourage you to try out the approaches discussed in this post and share your thoughts in the comments section.


About the author

Masudur Rahaman Sayem is a Streaming Data Architect at AWS. He works with AWS customers globally to design and build data streaming architectures to solve real-world business problems. He specializes in optimizing solutions that use streaming data services and NoSQL. Sayem is very passionate about distributed computing.

Orchestrate Amazon EMR Serverless jobs with AWS Step functions

Post Syndicated from Naveen Balaraman original https://aws.amazon.com/blogs/big-data/orchestrate-amazon-emr-serverless-jobs-with-aws-step-functions/

Amazon EMR Serverless provides a serverless runtime environment that simplifies the operation of analytics applications that use the latest open source frameworks, such as Apache Spark and Apache Hive. With EMR Serverless, you don’t have to configure, optimize, secure, or operate clusters to run applications with these frameworks. You can run analytics workloads at any scale with automatic scaling that resizes resources in seconds to meet changing data volumes and processing requirements. EMR Serverless automatically scales resources up and down to provide just the right amount of capacity for your application, and you only pay for what you use.

AWS Step Functions is a serverless orchestration service that enables developers to build visual workflows for applications as a series of event-driven steps. Step Functions ensures that the steps in the serverless workflow are followed reliably, that the information is passed between stages, and errors are handled automatically.

The integration between AWS Step Functions and Amazon EMR Serverless makes it easier to manage and orchestrate big data workflows. Before this integration, you had to manually poll for job statuses or implement waiting mechanisms through API calls. Now, with the support for “Run a Job (.sync)” integration, you can more efficiently manage your EMR Serverless jobs. Using .sync allows your Step Functions workflow to wait for the EMR Serverless job to complete before moving on to the next step, effectively making job execution part of your state machine. Similarly, the “Request Response” pattern can be useful for triggering a job and immediately getting a response back, all within the confines of your Step Functions workflow. This integration simplifies your architecture by eliminating the need for additional steps to monitor job status, making the whole system more efficient and easier to manage.

In this post, we explain how you can orchestrate a PySpark application using Amazon EMR Serverless and AWS Step Functions. We run a Spark job on EMR Serverless that processes Citi Bike dataset data in an Amazon Simple Storage Service (Amazon S3) bucket and stores the aggregated results in Amazon S3.

Solution Overview

We demonstrate this solution with an example using the Citi Bike dataset. This dataset includes numerous parameters such as Rideable type, Start station, Started at, End station, Ended at, and various other elements about Citi Bikers ride. Our objective is to find the minimum, maximum, and average bike trip duration in a given month.

In this solution, the input data is read from the S3 input path, transformations and aggregations are applied with the PySpark code, and the summarized output is written to the S3 output path s3://<bucket-name>/serverlessout/.

The solution is implemented as follows:

  • Creates an EMR Serverless application with Spark runtime. After the application is created, you can submit the data-processing jobs to that application. This API step waits for Application creation to complete.
  • Submits the PySpark job and waits for its completion with the StartJobRun (.sync) API. This allows you to submit a job to an Amazon EMR Serverless application and wait until the job completes.
  • After the PySpark job completes, the summarized output is available in the S3 output directory.
  • If the job encounters an error, the state machine workflow will indicate a failure. You can inspect the specific error within the state machine. For a more detailed analysis, you can also check the EMR job failure logs in the EMR studio console.

Prerequisites

Before you get started, make sure you have the following prerequisites:

  • An AWS account
  • An IAM user with administrator access
  • An S3 bucket

Solution Architecture

To automate the complete process, we use the following architecture, which integrates Step Functions for orchestration and Amazon EMR Serverless for data transformations. Summarized output is then written to Amazon S3 bucket.

The following diagram illustrates the architecture for this use case

Deployment steps

Before beginning this tutorial, ensure that the role being used to deploy has all the relevant permissions to create the required resources as part of the solution. The roles with the appropriate permissions will be created through a CloudFormation template using the following steps.

Step 1: Create a Step Functions state machine

You can create a Step Functions State Machine workflow in two ways— either through the code directly or through the Step Functions studio graphical interface. To create a state machine, you can follow the steps from either option 1 or option 2 below.

Option 1: Create the state machine through code directly

To create a Step Functions state machine along with the necessary IAM roles, complete the following steps:

  1. Launch the CloudFormation stack using this link. On the Cloud Formation console, provide a stack name and accept the defaults to create the stack. Once the CloudFormation deployment completes, the following resources are created, in addition EMR Service Linked Role will be automatically created by this CloudFormation stack to access EMR Serverless:
    • S3 bucket to upload the PySpark script and write output data from EMR Serverless job. We recommend enabling default encryption on your S3 bucket to encrypt new objects, as well as enabling access logging to log all requests made to the bucket. Following these recommendations will improve security and provide visibility into access of the bucket.
    • EMR Serverless Runtime role that provides granular permissions to specific resources that are required when EMR Serverless jobs run.
    • Step Functions Role to grant AWS Step Functions permissions to access the AWS resources that will be used by its state machines.
    • State Machine with EMR Serverless steps.

  1. To prepare the S3 bucket with PySpark script, open AWS Cloudshell from the toolbar on the top right corner of AWS console and run the following AWS CLI command in CloudShell (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID):

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-3594/bikeaggregator.py s3://serverless-<<ACCOUNT-ID>>-blog/scripts/

  1. To prepare the S3 bucket with Input data, run the following AWS CLI command in CloudShell (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID):

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-3594/201306-citibike-tripdata.csv s3://serverless-<<ACCOUNT-ID>>-blog/data/ --copy-props none

Option 2: Create the Step Functions state machine through Workflow Studio

Prerequisites

Before creating the State Machine though Workshop Studio, please ensure that all the relevant roles and resources are created as part of the solution.

  1. To deploy the necessary IAM roles and S3 bucket into your AWS account, launch the CloudFormation stack using this link. Once the CloudFormation deployment completes, the following resources are created:
    • S3 bucket to upload the PySpark script and write output data. We recommend enabling default encryption on your S3 bucket to encrypt new objects, as well as enabling access logging to log all requests made to the bucket. Following these recommendations will improve security and provide visibility into access of the bucket.
    • EMR Serverless Runtime role that provides granular permissions to specific resources that are required when EMR Serverless jobs run.
    • Step Functions Role to grant AWS Step Functions permissions to access the AWS resources that will be used by its state machines.

  1. To prepare the S3 bucket with PySpark script, open AWS Cloudshell from the toolbar on the top right of the AWS console and run the following AWS CLI command in CloudShell (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID):

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-3594/bikeaggregator.py s3://serverless-<<ACCOUNT-ID>>-blog/scripts/

  1. To prepare the S3 bucket with Input data, run the following AWS CLI command in CloudShell (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID):

aws s3 cp s3://aws-blogs-artifacts-public/artifacts/BDB-3594/201306-citibike-tripdata.csv s3://serverless-<<ACCOUNT-ID>>-blog/data/ --copy-props none

To create a Step Functions state machine, complete the following steps:

  1. On the Step Functions console, choose Create state machine.
  2. Keep the Blank template selected, and click Select.
  3. In the Actions Menu on the left, Step Functions provides a list of AWS services APIs that you can drag and drop into your workflow graph in the design canvas. Type EMR Serverless in the search and drag the Amazon EMR Serverless CreateApplication state to the workflow graph:

  1. In the canvas, select Amazon EMR Serverless CreateApplication state to configure its properties. The Inspector panel on the right shows configuration options. Provide the following Configuration values:
    • Change the State name to Create EMR Serverless Application
    • Provide the following values to the API Parameters. This creates an EMR Serverless Application with Apache Spark based on Amazon EMR release 6.12.0 using default configuration settings.
      {
          "Name": "ServerlessBikeAggr",
          "ReleaseLabel": "emr-6.12.0",
          "Type": "SPARK"
      }

    • Click the Wait for task to complete – optional check box to wait for EMR Serverless Application creation state to complete before executing the next state.
    • Under Next state, select the Add new state option from the drop-down.
  2. Drag EMR Serverless StartJobRun state from the left browser to the next state in the workflow.
    • Rename State name to Submit PySpark Job
    • Provide the following values in the API parameters and click Wait for task to complete – optional (make sure to replace <<ACCOUNT-ID>> with your AWS Account ID).
{
"ApplicationId.$": "$.ApplicationId",
    "ExecutionRoleArn": "arn:aws:iam::<<ACCOUNT-ID>>:role/EMR-Serverless-Role-<<ACCOUNT-ID>>",
    "JobDriver": {
        "SparkSubmit": {
            "EntryPoint": "s3://serverless-<<ACCOUNT-ID>>-blog/scripts/bikeaggregator.py",
            "EntryPointArguments": [
                "s3://serverless-<<ACCOUNT-ID>>-blog/data/",
                "s3://serverless-<<ACCOUNT-ID>>-blog/serverlessout/"
            ],
            "SparkSubmitParameters": "--conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"
        }
    }
}

  1. Select the Config tab for the state machine from the top and change the following configurations:
    • Change State machine name to EMRServerless-BikeAggr found in Details.
    • In the Permissions section, select StateMachine-Role-<<ACCOUNT-ID>> from the dropdown for Execution role. (Make sure that you replace <<ACCOUNT-ID>> with your AWS Account ID).
  2. Continue to add steps for Check Job Success from the studio as shown in the following diagram.

  1. Click Create to create the Step Functions State Machine for orchestrating the EMR Serverless jobs.

Step 2: Invoke the Step Functions

Now that the Step Function is created, we can invoke it by clicking on the Start execution button:

When the step function is being invoked, it presents its run flow as shown in the following screenshot. Because we have selected Wait for task to complete config (.sync API) for this step, the next step would not start wait until EMR Serverless Application is created (blue represents the Amazon EMR Serverless Application being created).

After successfully creating the EMR Serverless Application, we submit a PySpark Job to that Application.

When the EMR Serverless job completes, the Submit PySpark Job step changes to green. This is because we have selected the Wait for task to complete configuration (using the .sync API) for this step.

The EMR Serverless Application ID as well as PySpark Job run Id from Output tab for Submit PySpark Job step.

Step 3: Validation

To confirm the successful completion of the job, navigate to EMR Serverless console and find the EMR Serverless Application Id. Click the Application Id to find the execution details for the PySpark Job run submitted from the Step Functions.

To verify the output of the job execution, you can check the S3 bucket where the output will be stored in a .csv file as shown in the following graphic.

Cleanup

Log in to the AWS Management Console and delete any S3 buckets created by this deployment to avoid unwanted charges to your AWS account. For example: s3://serverless-<<ACCOUNT-ID>>-blog/

Then clean up your environment, delete the CloudFormation template you created in the Solution configuration steps.

Delete Step function you created as part of this solution.

Conclusion

In this post, we explained how to launch an Amazon EMR Serverless Spark job with Step Functions using Workflow Studio to implement a simple ETL pipeline that creates aggregated output from the Citi Bike dataset and generate reports.

We hope this gives you a great starting point for using this solution with your datasets and applying more complex business rules to solve your transient cluster use cases.

Do you have follow-up questions or feedback? Leave a comment. We’d love to hear your thoughts and suggestions.

References


About the Authors

Naveen Balaraman is a Sr Cloud Application Architect at Amazon Web Services. He is passionate about Containers, Serverless, Architecting Microservices and helping customers leverage the power of AWS cloud.

Karthik Prabhakar is a Senior Big Data Solutions Architect for Amazon EMR at AWS. He is an experienced analytics engineer working with AWS customers to provide best practices and technical advice in order to assist their success in their data journey.

Parul Saxena is a Big Data Specialist Solutions Architect at Amazon Web Services, focused on Amazon EMR, Amazon Athena, AWS Glue and AWS Lake Formation, where she provides architectural guidance to customers for running complex big data workloads over AWS platform. In her spare time, she enjoys traveling and spending time with her family and friends.

Query big data with resilience using Trino in Amazon EMR with Amazon EC2 Spot Instances for less cost

Post Syndicated from Ashwini Kumar original https://aws.amazon.com/blogs/big-data/query-big-data-with-resilience-using-trino-in-amazon-emr-with-amazon-ec2-spot-instances-for-less-cost/

Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances offer spare compute capacity available in the AWS Cloud at steep discounts compared to On-Demand prices. Amazon EMR provides a managed Hadoop framework that makes it straightforward, fast, and cost-effective to process vast amounts of data using EC2 instances. Amazon EMR with Spot Instances allows you to reduce costs for running your big data workloads on AWS. Amazon EC2 can interrupt Spot Instances with a 2-minute notification whenever Amazon EC2 needs to reclaim capacity for On-Demand customers. Spot Instances are best suited for running stateless and fault-tolerant big data applications such as Apache Spark with Amazon EMR, which are resilient against Spot node interruptions.

Trino (formerly PrestoSQL) is an open-source, highly parallel, distributed SQL query engine to run interactive queries as well as batch processing on petabytes of data. It can perform in-place, federated queries on data stored in a multitude of data sources, including relational databases (MySQL, PostgreSQL, and others), distributed data stores (Cassandra, MongoDB, Elasticsearch, and others), and Amazon Simple Storage Service (Amazon S3), without the need for complex and expensive processes of copying the data to a single location.

Before Project Tardigrade, Trino queries failed whenever any of the nodes in Trino clusters failed, and there was no automatic retry mechanism with iterative querying capability. Also, failed queries had to be restarted from scratch. Due to this limitation, the cost of failures of long-running extract, transform, and load (ETL) and batch queries on Trino was high in terms of completion time, compute wastage, and spend. Spot Instances were not appropriate for long-running queries with Trino clusters and only suited for short-lived Trino queries.

In October 2022, Amazon EMR announced a new capability in the Trino engine to detect 2-minute Spot interruption notifications and determine if the existing queries can complete within 2 minutes on those nodes. If the queries can’t finish, Trino will fail them quickly and retry the queries on different nodes. Also, Trino doesn’t schedule new queries on these Spot nodes, which are about to be reclaimed. In November 2022, Amazon EMR added support for Project Tardigrade’s fault-tolerant option in the Trino engine with Amazon EMR 6.8 and above. Enabling this feature mitigates Trino task failures caused by worker node failures due to Spot interruptions or On-Demand node stops. Trino now retries failed tasks using intermediate exchange data checkpointed on Amazon S3 or HDFS.

These new enhancements in Trino with Amazon EMR provide improved resiliency for running ETL and batch workloads on Spot Instances with reduced costs. This post showcases the resilience of Amazon EMR with Trino using fault-tolerant configuration to run long-running queries on Spot Instances to save costs. We simulate Spot interruptions on Trino worker nodes by using AWS Fault Injection Simulator (AWS FIS).

Trino architecture overview

Trino runs a query by breaking up the run into a hierarchy of stages, which are implemented as a series of tasks distributed over a network of Trino workers. This pipelined execution model runs multiple stages in parallel and streams data from one stage to another as the data becomes available. This parallel architecture reduces end-to-end latency and makes Trino a fast tool for ad hoc data exploration and ETL jobs over very large datasets. The following diagram illustrates this architecture.

In a Trino cluster, the coordinator is the server responsible for parsing statements, planning queries, and managing workers. The coordinator is also the node to which a client connects and submits statements to run. Every Trino cluster must have at least one coordinator. The coordinator creates a logical model of a query involving a series of stages, which is then translated into a series of connected tasks running on Trino workers. In Amazon EMR, the Trino coordinator runs on the EMR primary node and workers run on core and task nodes.

Faster insights with lower costs with EC2 Spot

You can save significant costs for your ETL and batch workloads running on EMR Trino clusters with a blend of Spot and On-Demand Instances. You can also reduce time-to-insight with faster query runs with lower costs by running more worker nodes on Spot Instances, using the parallel architecture of Trino.

For example, a long-running query on EMR Trino that takes an hour can be finished faster by provisioning more worker nodes on Spot Instances, as shown in the following figure.

Fault-tolerant Trino configuration in Amazon EMR

Fault-tolerant execution in Trino is disabled by default; you can enable it by setting a retry policy in the Amazon EMR configuration. Trino supports two types of retry policies:

  • QUERY – The QUERY retry policy instructs Trino to retry the whole query automatically when an error occurs on a worker node. This policy is only suitable for short-running queries because the whole query is retried from scratch.
  • TASK – The TASK retry policy instructs Trino to retry individual query tasks in the event of failure. This policy is recommended for long-running ETL and batch queries.

With fault-tolerant execution enabled, intermediate exchange data is spooled on an exchange manager so that another worker node can reuse it in the event of a node failure to complete the query run. The exchange manager uses a storage location on Amazon S3 or Hadoop Distributed File System (HDFS) to store and manage spooled data, which is spilled beyond in-memory buffer size of worker nodes. By default, Amazon EMR release 6.9.0 and later uses HDFS as an exchange manager.

Solution overview

In this post, we create an EMR cluster with following architecture.

We provision the following resources using Amazon EMR and AWS FIS:

  • An EMR 6.9.0 cluster with the following configuration:
    • Apache Hadoop, Hue, and Trino applications
    • EMR instance fleets with the following:
      • One primary node (On-Demand) as the Trino coordinator
      • Two core nodes (On-Demand) as the Trino workers and exchange manager
      • Four task nodes (Spot Instances) as Trino workers
    • Trino’s fault-tolerant configuration with following:
      • TPCDS connector
      • The TASK retry policy
      • Exchange manager directory on HDFS
      • Optional recommended settings for query performance optimization
  • An FIS experiment template to target Spot worker nodes in the Trino cluster with interruptions to demonstrate fault-tolerance of EMR Trino with Spot Instances

We use the new Amazon EMR console to create an EMR 6.9.0 cluster. For more information about the new console, refer to Summary of differences.

Create an EMR 6.9.0 cluster

Complete the following steps to create your EMR cluster:

  1. On the Amazon EMR console, create an EMR 6.9.0 cluster named emr-trino-cluster with Hadoop, Hue, and Trino applications using the Custom application bundle.

We need Hue’s web-based interface for submitting SQL queries to the Trino engine and HDFS on core nodes to store intermediate exchange data for Trino’s fault-tolerant runs.

Using multiple Spot capacity pools (each instance type in each Availability Zone is a separate pool) is a best practice to increase your chances of getting large-scale Spot capacity and minimize the impact of a specific instance type being reclaimed in EMR clusters. The Amazon EMR console allows you to configure up to 5 instance types for your core fleet and 15 instance types for your task fleet with the Spot allocation strategy, which allows up to 30 instance types for each fleet from the AWS Command Line Interface (AWS CLI) or Amazon EMR API.

  1. Configure the primary, core, and task fleets with primary and core nodes with On-Demand Instances (m5.xlarge) and task nodes with Spot Instances using multiple instance types.

When you use the Amazon EMR console, the number of vCPUs of the EC2 instance type are used as the count towards the total target capacity of a core or task fleet by default. For example, an m5.xlarge instance type with 4 vCPUs is considered as 4 units of capacity by default.

  1. On the Actions menu under Core or Task fleet, choose Edit weighted capacity.

  1. Because each instance type with 4 vCPUs (xlarge size) is 4 units of capacity, let’s set the cluster size with 8 core units (2 nodes) with On-Demand and 16 task units (4 nodes) with Spot.

Unlike core and task fleets, the primary fleet is always one instance, so no sizing configuration is needed or available for the primary node on the Amazon EMR console.

  1. Select Price-capacity optimized as your Spot allocation strategy, which launches the lowest-priced Spot Instances from your most available pools.

  1. Configure Trino’s fault-tolerant settings in the Software settings section:
[
  {
    "Classification": "trino-connector-tpcds",
    "Properties": {
      "connector.name": "tpcds"
    }
  },
  {
    "Classification": "trino-config",
    "Properties": {
      "exchange.compression-enabled": "true",
      "query.low-memory-killer.delay": "0s",
      "query.remote-task.max-error-duration": "1m",
      "retry-policy": "TASK"
    }
  },
  {
    "Classification": "trino-exchange-manager",
    "Properties": {
      "exchange.base-directories": "/exchange",
      "exchange.use-local-hdfs": "true"
    }
  }
]

Alternatively, you can create a JSON config file with the configuration, store it in an S3 bucket, and select the file path from its S3 location by selecting Load JSON from Amazon S3.

Let’s understand some optional settings for query performance optimization that we have configured:

  • “exchange.compression-enabled”:”true” – This is recommended to enable compression to reduce the amount of data spooled on exchange manager.
  • “query.low-memory-killer.delay”: “0s” – This will reduce the low memory killer delay to allow the Trino engine to unblock nodes running short on memory faster.
  • “query.remote-task.max-error-duration”: “1m” – By default, Trino waits for up to 5 minutes for the task to recover before considering it lost and rescheduling it. This timeout can be reduced for faster retrying of the failed tasks.

For more details of Trino’s fault-tolerant configuration parameters, refer to Fault-tolerant execution.

  1. Let’s also add a tag key called Name with the value MyTrinoCluster to launch EC2 instances with this tag name.

We’ll use this tag to target Spot Instances in the cluster with AWS FIS.

The EMR cluster will take few minutes to be ready in the Waiting state.

Configure an FIS experiment template to target Spot Instances with interruptions in the EMR Trino cluster

We now use the AWS FIS console to simulate interruptions of Spot Instances in the EMR Trino cluster and showcase the fault-tolerance of the Trino engine. Complete the following steps:

  1. On the AWS FIS console, create an experiment template.

  1. Under Actions, choose Add action.
  2. Create an AWS FIS action with Action type as aws:ec2:send-spot-instance-interruptions and Duration Before Interruption as 2 minutes.
  3. Choose Save.

This means FIS will interrupt targeted Spot Instances after 2 minutes of running the experiment.

  1. Under Targets, choose Edit to target all Spot Instances running in the EMR cluster.
  2. For Resource tags, use Name= MyTrinoCluster.
  3. For Resource filters, use as State.Name=running.
  4. For Selection mode, set to ALL.
  5. Choose Save.

  1. Create a new AWS Identity and Access Management (IAM) role automatically to provide permissions to AWS FIS.

  1. Choose Create experiment template.

Launch Hue and Trino web interfaces

When your EMR cluster is in the Waiting state, connect to the Hue web interface for Trino queries and the Trino web interface for monitoring. Alternatively, you can submit your Trino queries using trino-cli after connecting via SSH to your EMR cluster’s primary node. In this post, we will use the Hue web interface for running queries on the EMR Trino engine.

  1. To connect to Hue interface on the primary node from your local computer, navigate to the EMR cluster’s Properties, Network and security, and EC2 security groups (firewall) section.
  2. Edit the primary node security group’s inbound rule to add your IP address and port (port 22).
  3. Retrieve your EMR cluster’s primary node public DNS from your EMR cluster’s Summary tab.

Refer to View web interfaces hosted on Amazon EMR clusters for details on connecting to web interfaces in the primary node from your local computer. You can set up an SSH tunnel with dynamic port forwarding between your local computer and the EMR primary node. Then you can configure proxy settings for your internet browser by using an add-ons such as FoxyProxy for Firefox or SwitchyOmega for Chrome to manage your SOCKS proxy settings.

  1. Connect to Hue by copying the URL (http://<youremrcluster-primary-node-public-dns>:8888/) in your web browser.
  2. Create an account with your choice of user name and password.

After you log in to your account, you can see the query editor on Hue’s web interface.

By default, Amazon EMR configures the Trino web interface on the Trino coordinator (EMR primary node) to use port 8889.

  1. To connect to the Trino web interface, copy the URL (http://<youremrcluster-primary-node-public-dns>:8889/) in your web browser, where you can monitor the Trino cluster and query performance.

In the following screenshot, we can see six active Trino workers (two core and four task nodes of EMR cluster) and no running queries.

  1. Let’s run the Trino query

    select * from system.runtime.nodes from the Hue query editor to see the coordinator and worker nodes’ status and details.

We can see all cluster nodes are in the active state.

Test fault tolerance on Spot interruptions

To test the fault tolerance on Spot interruptions, complete the following steps:

  1. Run the following Trino query using Hue’s query editor:
with inv as
(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy
,stdev,mean, case mean when 0 then null else stdev/mean end cov
from(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy
,stddev_samp(inv_quantity_on_hand) stdev,avg(inv_quantity_on_hand) mean
from tpcds.sf100.inventory
,tpcds.sf100.item
,tpcds.sf100.warehouse
,tpcds.sf100.date_dim
where inv_item_sk = i_item_sk
and inv_warehouse_sk = w_warehouse_sk
and inv_date_sk = d_date_sk
and d_year =1999
group by w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy) foo
where case mean when 0 then 0 else stdev/mean end > 1)
select inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean, inv1.cov
,inv2.w_warehouse_sk,inv2.i_item_sk,inv2.d_moy,inv2.mean, inv2.cov
from inv inv1,inv inv2
where inv1.i_item_sk = inv2.i_item_sk
and inv1.w_warehouse_sk = inv2.w_warehouse_sk
and inv1.d_moy=4
and inv2.d_moy=4+1
and inv1.cov > 1.5
order by inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean,inv1.cov ,inv2.d_moy,inv2.mean, inv2.cov

When you go to the Trino web interface, you can see the query running on six active worker nodes (two core On-Demand and four task nodes on Spot Instances).

  1. On the AWS FIS console, choose Experiment templates in the navigation pane.
  2. Select the experiment template EMR_Trino_Interrupter and choose Start experiment.

After a few seconds, the experiment will be in the Completed state and it will trigger stopping all four Spot Instances (four Trino workers) after 2 minutes.

After some time, we can observe in the Trino web UI that we have lost four Trino workers (task nodes running on Spot Instances) but the query is still running with the two remaining On-Demand worker nodes (core nodes). Without the fault-tolerant configuration in EMR Trino, the whole query would fail with even a single worker node failure.

  1. Run the select * from system.runtime.nodes query again in Hue to check the Trino cluster nodes status.

We can see four Spot worker nodes with the status shutting_down.

Trino starts shutting down the four Spot worker nodes as soon as they receive the 2-minute Spot interruption notification sent by the AWS FIS experiment. It will start retrying any failed tasks of these four Spot workers on the remaining active workers (two core nodes) of the cluster. The Trino engine will also not schedule tasks of any new queries on Spot worker nodes in the shutting_down state.

The Trino query will keep running on the remaining two worker nodes and succeed despite the interruption of the four Spot worker nodes. Soon after the Spot nodes stop, Amazon EMR will replenish the stopped capacity (four task nodes) by launching four replacement Spot nodes.

Achieve faster query performance for lower cost with more Trino workers on Spot

Now let’s increase Trino workers capacity from 6 to 10 nodes by manually resizing EMR task nodes on Spot Instances (from 4 to 8 nodes).

We run the same query on a larger cluster with 10 Trino workers. Let’s compare the query completion time (wall time in the Trino Web UI) with the earlier smaller cluster with six workers. We can see 32% faster query performance (1.57 minutes vs. 2.33 minutes).

You can run more Trino workers on Spot Instances to run queries faster to meet your SLAs or process a larger number of queries. With Spot Instances available at discounts up to 90% off On-Demand prices, your cluster costs will not increase significantly vs. running the whole compute capacity on On-Demand Instances.

Clean up

To avoid ongoing charges for resources, navigate to the Amazon EMR console and delete the cluster emr-trino-cluster.

Conclusion

In this post, we showed how you can configure and launch EMR clusters with the Trino engine using its fault-tolerant configuration. With the fault tolerant feature, Trino worker nodes can be run as EMR task nodes on Spot Instances with resilience. You can configure a well-diversified task fleet with multiple instance types using the price-capacity optimized allocation strategy. This will make Amazon EMR request and launch task nodes from the most available, lower-priced Spot capacity pools to minimize costs, interruptions, and capacity challenges. We also demonstrated the resilience of EMR Trino against Spot interruptions using an AWS FIS Spot interruption experiment. EMR Trino continues to run queries by retrying failed tasks on remaining available worker nodes in the event of any Spot node interruption. With fault-tolerant EMR Trino and Spot Instances, you can run big data queries with resilience, while saving costs. For your SLA-driven workloads, you can also add more compute on Spot to adhere to or exceed your SLAs for faster query performance with lower costs compared to On-Demand Instances.


About the Authors

Ashwini Kumar is a Senior Specialist Solutions Architect at AWS based in Delhi, India. Ashwini has more than 18 years of industry experience in systems integration, architecture, and software design, with more recent experience in cloud architecture, DevOps, containers, and big data engineering. He helps customers optimize their cloud spend, minimize compute waste, and improve performance at scale on AWS. He focuses on architectural best practices for various workloads with services including EC2 Spot, AWS Graviton, EC2 Auto Scaling, Amazon EKS, Amazon ECS, and AWS Fargate.

Dipayan Sarkar is a Specialist Solutions Architect for Analytics at AWS, where he helps customers modernize their data platform using AWS Analytics services. He works with customers to design and build analytics solutions, enabling businesses to make data-driven decisions.

Simplify operational data processing in data lakes using AWS Glue and Apache Hudi

Post Syndicated from Ravi Itha original https://aws.amazon.com/blogs/big-data/simplify-operational-data-processing-in-data-lakes-using-aws-glue-and-apache-hudi/

The Analytics specialty practice of AWS Professional Services (AWS ProServe) helps customers across the globe with modern data architecture implementations on the AWS Cloud. A modern data architecture is an evolutionary architecture pattern designed to integrate a data lake, data warehouse, and purpose-built stores with a unified governance model. It focuses on defining standards and patterns to integrate data producers and consumers and move data between data lakes and purpose-built data stores securely and efficiently. Out of the many data producer systems that feed data to a data lake, operational databases are most prevalent, where operational data is stored, transformed, analyzed, and finally used to enhance business operations of an organization. With the emergence of open storage formats such as Apache Hudi and its native support from AWS Glue for Apache Spark, many AWS customers have started adding transactional and incremental data processing capabilities to their data lakes.

AWS has invested in native service integration with Apache Hudi and published technical contents to enable you to use Apache Hudi with AWS Glue (for example, refer to Introducing native support for Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue for Apache Spark, Part 1: Getting Started). In AWS ProServe-led customer engagements, the use cases we work on usually come with technical complexity and scalability requirements. In this post, we discuss a common use case in relation to operational data processing and the solution we built using Apache Hudi and AWS Glue.

Use case overview

AnyCompany Travel and Hospitality wanted to build a data processing framework to seamlessly ingest and process data coming from operational databases (used by reservation and booking systems) in a data lake before applying machine learning (ML) techniques to provide a personalized experience to its users. Due to the sheer volume of direct and indirect sales channels the company has, its booking and promotions data are organized in hundreds of operational databases with thousands of tables. Of those tables, some are larger (such as in terms of record volume) than others, and some are updated more frequently than others. In the data lake, the data to be organized in the following storage zones:

  1. Source-aligned datasets – These have an identical structure to their counterparts at the source
  2. Aggregated datasets – These datasets are created based on one or more source-aligned datasets
  3. Consumer-aligned datasets – These are derived from a combination of source-aligned, aggregated, and reference datasets enriched with relevant business and transformation logics, usually fed as inputs to ML pipelines or any consumer applications

The following are the data ingestion and processing requirements:

  1. Replicate data from operational databases to the data lake, including insert, update, and delete operations.
  2. Keep the source-aligned datasets up to date (typically within the range of 10 minutes to a day) in relation to their counterparts in the operational databases, ensuring analytics pipelines refresh consumer-aligned datasets for downstream ML pipelines in a timely fashion. Moreover, the framework should consume compute resources as optimally as possible per the size of the operational tables.
  3. To minimize DevOps and operational overhead, the company wanted to templatize the source code wherever possible. For example, to create source-aligned datasets in the data lake for 3,000 operational tables, the company didn’t want to deploy 3,000 separate data processing jobs. The smaller the number of jobs and scripts, the better.
  4. The company wanted the ability to continue processing operational data in the secondary Region in the rare event of primary Region failure.

As you can guess, the Apache Hudi framework can solve the first requirement. Therefore, we will put our emphasis on the other requirements. We begin with a Data lake reference architecture followed by an overview of operational data processing framework. By showing you our open-source solution on GitHub, we delve into framework components and walk through their design and implementation aspects. Finally, by testing the framework, we summarize how it meets the aforementioned requirements.

Data lake reference architecture

Let’s begin with a big picture: a data lake solves a variety of analytics and ML use cases dealing with internal and external data producers and consumers. The following diagram represents a generic data lake architecture. To ingest data from operational databases to an Amazon Simple Storage Service (Amazon S3) staging bucket of the data lake, either AWS Database Migration Service (AWS DMS) or any AWS partner solution from AWS Marketplace that has support for change data capture (CDC) can fulfill the requirement. AWS Glue is used to create source-aligned and consumer-aligned datasets and separate AWS Glue jobs to do feature engineering part of ML engineering and operations. Amazon Athena is used for interactive querying and AWS Lake Formation is used for access controls.

Data Lake Reference Architecture

Operational data processing framework

The operational data processing (ODP) framework contains three components: File Manager, File Processor, and Configuration Manager. Each component runs independently to solve a portion of the operational data processing use case. We have open-sourced this framework on GitHub—you can clone the code repo and inspect it while we walk you through the design and implementation of the framework components. The source code is organized in three folders, one for each component, and if you customize and adopt this framework for your use case, we recommend promoting these folders as separate code repositories in your version control system. Consider using the following repository names:

  1. aws-glue-hudi-odp-framework-file-manager
  2. aws-glue-hudi-odp-framework-file-processor
  3. aws-glue-hudi-odp-framework-config-manager

With this modular approach, you can independently deploy the components to your data lake environment by following your preferred CI/CD processes. As illustrated in the preceding diagram, these components are deployed in conjunction with a CDC solution.

Component 1: File Manager

File Manager detects files emitted by a CDC process such as AWS DMS and tracks them in an Amazon DynamoDB table. As shown in the following diagram, it consists of an Amazon EventBridge event rule, an Amazon Simple Queue Service (Amazon SQS) queue, an AWS Lambda function, and a DynamoDB table. The EventBridge rule uses Amazon S3 Event Notifications to detect the arrival of CDC files in the S3 bucket. The event rule forwards the object event notifications to the SQS queue as messages. The File Manager Lambda function consumes those messages, parses the metadata, and inserts the metadata to the DynamoDB table odpf_file_tracker. These records will then be processed by File Processor, which we discuss in the next section.

ODPF Component: File Manager

Component 2: File Processor

File Processor is the workhorse of the ODP framework. It processes files from the S3 staging bucket, creates source-aligned datasets in the raw S3 bucket, and adds or updates metadata for the datasets (AWS Glue tables) in the AWS Glue Data Catalog.

We use the following terminology when discussing File Processor:

  1. Refresh cadence – This represents the data ingestion frequency (for example, 10 minutes). It usually goes with AWS Glue worker type (one of G.1X, G.2X, G.4X, G.8X, G.025X, and so on) and batch size.
  2. Table configuration – This includes the Hudi configuration (primary key, partition key, pre-combined key, and table type (Copy on Write or Merge on Read)), table data storage mode (historical or current snapshot), S3 bucket used to store source-aligned datasets, AWS Glue database name, AWS Glue table name, and refresh cadence.
  3. Batch size – This numeric value is used to split tables into smaller batches and process their respective CDC files in parallel. For example, a configuration of 50 tables with a 10-minute refresh cadence and a batch size of 5 results in a total of 10 AWS Glue job runs, each processing CDC files for 5 tables.
  4. Table data storage mode – There are two options:
    • Historical – This table in the data lake stores historical updates to records (always append).
    • Current snapshot – This table in the data lake stores latest versioned records (upserts) with the ability to use Hudi time travel for historical updates.
  5. File processing state machine – It processes CDC files that belong to tables that share a common refresh cadence.
  6. EventBridge rule association with the file processing state machine – We use a dedicated EventBridge rule for each refresh cadence with the file processing state machine as target.
  7. File processing AWS Glue job – This is a configuration-driven AWS Glue extract, transform, and load (ETL) job that processes CDC files for one or more tables.

File Processor is implemented as a state machine using AWS Step Functions. Let’s use an example to understand this. The following diagram illustrates running File Processor state machine with a configuration that includes 18 operational tables, a refresh cadence of 10 minutes, a batch size of 5, and an AWS Glue worker type of G.1X.

ODP framework component: File Processor

The workflow includes the following steps:

  1. The EventBridge rule triggers the File Processor state machine every 10 minutes.
  2. Being the first state in the state machine, the Batch Manager Lambda function reads configurations from DynamoDB tables.
  3. The Lambda function creates four batches: three of them will be mapped to five operational tables each, and the fourth one is mapped to three operational tables. Then it feeds the batches to the Step Functions Map state.
  4. For each item in the Map state, the File Processor Trigger Lambda function will be invoked, which in turn runs the File Processor AWS Glue job.
  5. Each AWS Glue job performs the following actions:
    • Checks the status of an operational table and acquires a lock when it is not processed by any other job. The odpf_file_processing_tracker DynamoDB table is used for this purpose. When a lock is acquired, it inserts a record in the DynamoDB table with the status updating_table for the first time; otherwise, it updates the record.
    • Processes the CDC files for the given operational table from the S3 staging bucket and creates a source-aligned dataset in the S3 raw bucket. It also updates technical metadata in the AWS Glue Data Catalog.
    • Updates the status of the operational table to completed in the odpf_file_processing_tracker table. In case of processing errors, it updates the status to refresh_error and logs the stack trace.
    • It also inserts this record into the odpf_file_processing_tracker_history DynamoDB table along with additional details such as insert, update, and delete row counts.
    • Moves the records that belong to successfully processed CDC files from odpf_file_tracker to the odpf_file_tracker_history table with file_ingestion_status set to raw_file_processed.
    • Moves to the next operational table in the given batch.
    • Note: a failure to process CDC files for one of the operational tables of a given batch does not impact the processing of other operational tables.

Component 3: Configuration Manager

Configuration Manager is used to insert configuration details to the odpf_batch_config and odpf_raw_table_config tables. To keep this post concise, we provide two architecture patterns in the code repo and leave the implementation details to you.

Solution overview

Let’s test the ODP framework by replicating data from 18 operational tables to a data lake and creating source-aligned datasets with 10-minute refresh cadence. We use Amazon Relational Database Service (Amazon RDS) for MySQL to set up an operational database with 18 tables, upload the New York City Taxi – Yellow Trip Data dataset, set up AWS DMS to replicate data to Amazon S3, process the files using the framework, and finally validate the data using Amazon Athena.

Create S3 buckets

For instructions on creating an S3 bucket, refer to Creating a bucket. For this post, we create the following buckets:

  1. odpf-demo-staging-EXAMPLE-BUCKET – You will use this to migrate operational data using AWS DMS
  2. odpf-demo-raw-EXAMPLE-BUCKET – You will use this to store source-aligned datasets
  3. odpf-demo-code-artifacts-EXAMPLE-BUCKET – You will use this to store code artifacts

Deploy File Manager and File Processor

Deploy File Manager and File Processor by following instructions from this README and this README, respectively.

Set up Amazon RDS for MySQL

Complete the following steps to set up Amazon RDS for MySQL as the operational data source:

  1. Provision Amazon RDS for MySQL. For instructions, refer to Create and Connect to a MySQL Database with Amazon RDS.
  2. Connect to the database instance using MySQL Workbench or DBeaver.
  3. Create a database (schema) by running the SQL command CREATE DATABASE taxi_trips;.
  4. Create 18 tables by running the SQL commands in the ops_table_sample_ddl.sql script.

Populate data to the operational data source

Complete the following steps to populate data to the operational data source:

  1. To download the New York City Taxi – Yellow Trip Data dataset for January 2021 (Parquet file), navigate to NYC TLC Trip Record Data, expand 2021, and choose Yellow Taxi Trip records. A file called yellow_tripdata_2021-01.parquet will be downloaded to your computer.
  2. On the Amazon S3 console, open the bucket odpf-demo-staging-EXAMPLE-BUCKET and create a folder called nyc_yellow_trip_data.
  3. Upload the yellow_tripdata_2021-01.parquet file to the folder.
  4. Navigate to the bucket odpf-demo-code-artifacts-EXAMPLE-BUCKET and create a folder called glue_scripts.
  5. Download the file load_nyc_taxi_data_to_rds_mysql.py from the GitHub repo and upload it to the folder.
  6. Create an AWS Identity and Access Management (IAM) policy called load_nyc_taxi_data_to_rds_mysql_s3_policy. For instructions, refer to Creating policies using the JSON editor. Use the odpf_setup_test_data_glue_job_s3_policy.json policy definition.
  7. Create an IAM role called load_nyc_taxi_data_to_rds_mysql_glue_role. Attach the policy created in the previous step.
  8. On the AWS Glue console, create a connection for Amazon RDS for MySQL. For instructions, refer to Adding a JDBC connection using your own JDBC drivers and Setting up a VPC to connect to Amazon RDS data stores over JDBC for AWS Glue. Name the connection as odpf_demo_rds_connection.
  9. In the navigation pane of the AWS Glue console, choose Glue ETL jobs, Python Shell script editor, and Upload and edit an existing script under Options.
  10. Choose the file load_nyc_taxi_data_to_rds_mysql.py and choose Create.
  11. Complete the following steps to create your job:
    • Provide a name for the job, such as load_nyc_taxi_data_to_rds_mysql.
    • For IAM role, choose load_nyc_taxi_data_to_rds_mysql_glue_role.
    • Set Data processing units to 1/16 DPU.
    • Under Advanced properties, Connections, select the connection you created earlier.
    • Under Job parameters, add the following parameters:
      • input_sample_data_path = s3://odpf-demo-staging-EXAMPLE-BUCKET/nyc_yellow_trip_data/yellow_tripdata_2021-01.parquet
      • schema_name = taxi_trips
      • table_name = table_1
      • rds_connection_name = odpf_demo_rds_connection
    • Choose Save.
  12. On the Actions menu, run the job.
  13. Go back to your MySQL Workbench or DBeaver and validate the record count by running the SQL command select count(1) row_count from taxi_trips.table_1. You will get an output of 1369769.
  14. Populate the remaining 17 tables by running the SQL commands from the populate_17_ops_tables_rds_mysql.sql script.
  15. Get the row count from the 18 tables by running the SQL commands from the ops_data_validation_query_rds_mysql.sql script. The following screenshot shows the output.
    Record volumes (for 18 Tables) in Operational Database

Configure DynamoDB tables

Complete the following steps to configure the DynamoDB tables:

  1. Download file load_ops_table_configs_to_ddb.py from the GitHub repo and upload it to the folder glue_scripts in the S3 bucket odpf-demo-code-artifacts-EXAMPLE-BUCKET.
  2. Create an IAM policy called load_ops_table_configs_to_ddb_ddb_policy. Use the odpf_setup_test_data_glue_job_ddb_policy.json policy definition.
  3. Create an IAM role called load_ops_table_configs_to_ddb_glue_role. Attach the policy created in the previous step.
  4. On the AWS Glue console, choose Glue ETL jobs, Python Shell script editor, and Upload and edit an existing script under Options.
  5. Choose the file load_ops_table_configs_to_ddb.py and choose Create.
  6. Complete the following steps to create a job:
    • Provide a name, such as load_ops_table_configs_to_ddb.
    • For IAM role, choose load_ops_table_configs_to_ddb_glue_role.
    • Set Data processing units to 1/16 DPU.
    • Under Job parameters, add the following parameters
      • batch_config_ddb_table_name = odpf_batch_config
      • raw_table_config_ddb_table_name = odpf_demo_taxi_trips_raw
      • aws_region = e.g., us-west-1
    • Choose Save.
  7. On the Actions menu, run the job.
  8. On the DynamoDB console, get the item count from the tables. You will find 1 item in the odpf_batch_config table and 18 items in the odpf_demo_taxi_trips_raw table.

Set up a database in AWS Glue

Complete the following steps to create a database:

  1. On the AWS Glue console, under Data catalog in the navigation pane, choose Databases.
  2. Create a database called odpf_demo_taxi_trips_raw.

Set up AWS DMS for CDC

Complete the following steps to set up AWS DMS for CDC:

  1. Create an AWS DMS replication instance. For Instance class, choose dms.t3.medium.
  2. Create a source endpoint for Amazon RDS for MySQL.
  3. Create target endpoint for Amazon S3. To configure the S3 endpoint settings, use the JSON definition from dms_s3_endpoint_setting.json.
  4. Create an AWS DMS task.
    • Use the source and target endpoints created in the previous steps.
    • To create AWS DMS task mapping rules, use the JSON definition from dms_task_mapping_rules.json.
    • Under Migration task startup configuration, select Automatically on create.
  5. When the AWS DMS task starts running, you will see a task summary similar to the following screenshot.
    DMS Task Summary
  6. In the Table statistics section, you will see an output similar to the following screenshot. Here, the Full load rows and Total rows columns are important metrics whose counts should match with the record volumes of the 18 tables in the operational data source.
    DMS Task Statistics
  7. As a result of successful full load completion, you will find Parquet files in the S3 staging bucket—one Parquet file per table in a dedicated folder, similar to the following screenshot. Similarly, you will find 17 such folders in the bucket.
    DMS Output in S3 Staging Bucket for Table 1

File Manager output

The File Manager Lambda function consumes messages from the SQS queue, extracts metadata for the CDC files, and inserts one item per file to the odpf_file_tracker DynamoDB table. When you check the items, you will find 18 items with file_ingestion_status set to raw_file_landed, as shown in the following screenshot.

CDC Files in File Tracker DynamoDB Table

File Processor output

  1. On the subsequent tenth minute (since the activation of the EventBridge rule), the event rule triggers the File Processor state machine. On the Step Functions console, you will notice that the state machine is invoked, as shown in the following screenshot.
    File Processor State Machine Run Summary
  2. As shown in the following screenshot, the Batch Generator Lambda function creates four batches and constructs a Map state for parallel running of the File Processor Trigger Lambda function.
    File Processor State Machine Run Details
  3. Then, the File Processor Trigger Lambda function runs the File Processor Glue Job, as shown in the following screenshot.
    File Processor Glue Job Parallel Runs
  4. Then, you will notice that the File Processor Glue Job runs create source-aligned datasets in Hudi format in the S3 raw bucket. For Table 1, you will see an output similar to the following screenshot. There will be 17 such folders in the S3 raw bucket.
    Data in S3 raw bucket
  5. Finally, in AWS Glue Data Catalog, you will notice 18 tables created in the odpf_demo_taxi_trips_raw database, similar to the following screenshot.
    Tables in Glue Database

Data validation

Complete the following steps to validate the data:

  1. On the Amazon Athena console, open the query editor, and select a workgroup or create a new workgroup.
  2. Choose AwsDataCatalog for Data source and odpf_demo_taxi_trips_raw for Database.
  3. Run the raw_data_validation_query_athena.sql SQL query. You will get an output similar to the following screenshot.
    Raw Data Validation via Amazon Athena

Validation summary: The counts in Amazon Athena match with the counts of the operational tables and it proves that the ODP framework has processed all the files and records successfully. This concludes the demo. To test additional scenarios, refer to Extended Testing in the code repo.

Outcomes

Let’s review how the ODP framework addressed the aforementioned requirements.

  1. As discussed earlier in this post, by logically grouping tables by refresh cadence and associating them to EventBridge rules, we ensured that the source-aligned tables are refreshed by the File Processor AWS Glue jobs. With the AWS Glue worker type configuration setting, we selected the appropriate compute resources while running the AWS Glue jobs (the instances of the AWS Glue job).
  2. By applying table-specific configurations (from odpf_batch_config and odpf_raw_table_config) dynamically, we were able to use one AWS Glue job to process CDC files for 18 tables.
  3. You can use this framework to support a variety of data migration use cases that require quicker data migration from on-premises storage systems to data lakes or analytics platforms on AWS. You can reuse File Manager as is and customize File Processor to work with other storage frameworks such as Apache Iceberg, Delta Lake, and purpose-built data stores such as Amazon Aurora and Amazon Redshift.
  4. To understand how the ODP framework met the company’s disaster recovery (DR) design criterion, we first need to understand the DR architecture strategy at a high level. The DR architecture strategy has the following aspects:
    • One AWS account and two AWS Regions are used for primary and secondary environments.
    • The data lake infrastructure in the secondary Region is kept in sync with the one in the primary Region.
    • Data is stored in S3 buckets, metadata data is stored in the AWS Glue Data Catalog, and access controls in Lake Formation are replicated from the primary to secondary Region.
    • The data lake source and target systems have their respective DR environments.
    • CI/CD tooling (version control, CI server, and so on) are to be made highly available.
    • The DevOps team needs to be able to deploy CI/CD pipelines of analytics frameworks (such as this ODP framework) to either the primary or secondary Region.
    • As you can imagine, disaster recovery on AWS is a vast subject, so we keep our discussion to the last design aspect.

By designing the ODP framework with three components and externalizing operational table configurations to DynamoDB global tables, the company was able to deploy the framework components to the secondary Region (in the rare event of a single-Region failure) and continue to process CDC files from the point it last processed in the primary Region. Because the CDC file tracking and processing audit data is replicated to the DynamoDB replica tables in the secondary Region, the File Manager microservice and File Processor can seamlessly run.

Clean up

When you’re finished testing this framework, you can delete the provisioned AWS resources to avoid any further charges.

Conclusion

In this post, we took a real-world operational data processing use case and presented you the framework we developed at AWS ProServe. We hope this post and the operational data processing framework using AWS Glue and Apache Hudi will expedite your journey in integrating operational databases into your modern data platforms built on AWS.


About the authors

Ravi-IthaRavi Itha is a Principal Consultant at AWS Professional Services with specialization in data and analytics and generalist background in application development. Ravi helps customers with enterprise data strategy initiatives across insurance, airlines, pharmaceutical, and financial services industries. In his 6-year tenure at Amazon, Ravi has helped the AWS builder community by publishing approximately 15 open-source solutions (accessible via GitHub handle), four blogs, and reference architectures. Outside of work, he is passionate about reading India Knowledge Systems and practicing Yoga Asanas.

srinivas-kandiSrinivas Kandi is a Data Architect at AWS Professional Services. He leads customer engagements related to data lakes, analytics, and data warehouse modernizations. He enjoys reading history and civilizations.

Derive operational insights from application logs using Automated Data Analytics on AWS

Post Syndicated from Aparajithan Vaidyanathan original https://aws.amazon.com/blogs/big-data/derive-operational-insights-from-application-logs-using-automated-data-analytics-on-aws/

Automated Data Analytics (ADA) on AWS is an AWS solution that enables you to derive meaningful insights from data in a matter of minutes through a simple and intuitive user interface. ADA offers an AWS-native data analytics platform that is ready to use out of the box by data analysts for a variety of use cases. With ADA, teams can ingest, transform, govern, and query diverse datasets from a range of data sources without requiring specialist technical skills. ADA provides a set of pre-built connectors to ingest data from a wide range of sources including Amazon Simple Storage Service (Amazon S3), Amazon Kinesis Data Streams, Amazon CloudWatch, Amazon CloudTrail, and Amazon DynamoDB as well as many others.

ADA provides a foundational platform that can be used by data analysts in a diverse set of use cases including IT, finance, marketing, sales, and security. ADA’s out-of-the-box CloudWatch data connector allows data ingestion from CloudWatch logs in the same AWS account in which ADA has been deployed, or from a different AWS account.

In this post, we demonstrate how an application developer or application tester is able to use ADA to derive operational insights of applications running in AWS. We also demonstrate how you can use the ADA solution to connect to different data sources in AWS. We first deploy the ADA solution into an AWS account and set up the ADA solution by creating data products using data connectors. We then use the ADA Query Workbench to join the separate datasets and query the correlated data, using familiar Structured Query Language (SQL), to gain insights. We also demonstrate how ADA can be integrated with business intelligence (BI) tools such as Tableau to visualize the data and to build reports.

Solution overview

In this section, we present the solution architecture for the demo and explain the workflow. For the purposes of demonstration, the bespoke application is simulated using an AWS Lambda function that emits logs in Apache Log Format at a preset interval using Amazon EventBridge. This standard format can be produced by many different web servers and be read by many log analysis programs. The application (Lambda function) logs are sent to a CloudWatch log group. The historical application logs are stored in an S3 bucket for reference and for querying purposes. A lookup table with a list of HTTP status codes along with the descriptions is stored in a DynamoDB table. These three serve as sources from which data is ingested into ADA for correlation, query, and analysis. We deploy the ADA solution into an AWS account and set up ADA. We then create the data products within ADA for the CloudWatch log group, S3 bucket, and DynamoDB. As the data products are configured, ADA provisions data pipelines to ingest the data from the sources. With the ADA Query Workbench, you can query the ingested data using plain SQL for application troubleshooting or issue diagnosis.

The following diagram provides an overview of the architecture and workflow of using ADA to gain insights into application logs.

The workflow includes the following steps:

  1. A Lambda function is scheduled to be triggered at 2-minute intervals using EventBridge.
  2. The Lambda function emits logs that are stored at a specified CloudWatch log group under /aws/lambda/CdkStack-AdaLogGenLambdaFunction. The application logs are generated using the Apache Log Format schema but stored in the CloudWatch log group in JSON format.
  3. The data products for CloudWatch, Amazon S3, and DynamoDB are created in ADA. The CloudWatch data product connects to the CloudWatch log group where the application (Lambda function) logs are stored. The Amazon S3 connector connects to an S3 bucket folder where the historical logs are stored. The DynamoDB connector connects to a DynamoDB table where the status codes that are referred by the application and historical logs are stored.
  4. For each of the data products, ADA deploys the data pipeline infrastructure to ingest data from the sources. When the data ingestion is complete, you can write queries using SQL via the ADA Query Workbench.
  5. You can log in to the ADA portal and compose SQL queries from the Query Workbench to gain insights in to the application logs. You can optionally save the query and share the query with other ADA users in the same domain. The ADA query feature is powered by Amazon Athena, which is a serverless, interactive analytics service that provides a simplified, flexible way to analyze petabytes of data.
  6. Tableau is configured to access the ADA data products via ADA egress endpoints. You then create a dashboard with two charts. The first chart is a heat map that shows the prevalence of HTTP error codes correlated with the application API endpoints. The second chart is a bar chart that shows the top 10 application APIs with a total count of HTTP error codes from the historical data.

Prerequisites

For this post, you need to complete the following prerequisites:

  1. Install the AWS Command Line Interface (AWS CLI), AWS Cloud Development Kit (AWS CDK) prerequisites, TypeScript-specific prerequisites, and git.
  2. Deploy the ADA solution in your AWS account in the us-east-1 Region.
    1. Provide an admin email while launching the ADA AWS CloudFormation stack. This is needed for ADA to send the root user password. An admin phone number is required to receive a one-time password message if multi-factor authentication (MFA) is enabled. For this demo, MFA is not enabled.
  3. Build and deploy the sample application (available on the GitHub repo) solution so that the following resources can be provisioned in your account in the us-east-1 Region:
    1. A Lambda function that simulates the logging application and an EventBridge rule that invokes the application function at 2-minute intervals.
    2. An S3 bucket with the relevant bucket policies and a CSV file that contains the historical application logs.
    3. A DynamoDB table with the lookup data.
    4. Relevant AWS Identity and Access Management (IAM) roles and permissions required for the services.
  4. Optionally, install Tableau Desktop, a third-party BI provider. For this post, we use Tableau Desktop version 2021.2. There is a cost involved in using a licensed version of the Tableau Desktop application. For additional details, refer to the Tableau licensing information.

Deploy and set up ADA

After ADA is deployed successfully, you can log in using the admin email provided during the installation. You then create a domain named CW_Domain. A domain is a user-defined collection of data products. For example, a domain might be a team or a project. Domains provide a structured way for users to organize their data products and manage access permissions.

  1. On the ADA console, choose Domains in the navigation pane.
  2. Choose Create domain.
  3. Enter a name (CW_Domain) and description, then choose Submit.

Set up the sample application infrastructure using AWS CDK

The AWS CDK solution that deploys the demo application is hosted on GitHub. The steps to clone the repo and to set up the AWS CDK project are detailed in this section. Before you run these commands, be sure to configure your AWS credentials. Create a folder, open the terminal, and navigate to the folder where the AWS CDK solution needs to be installed. Run the following code:

gh repo clone aws-samples/operational-insights-with-automated-data-analytics-on-aws
cd operational-insights-with-automated-data-analytics-on-aws
npm install
npm run build
cdk synth
cdk deploy

These steps perform the following actions:

  • Install the library dependencies
  • Build the project
  • Generate a valid CloudFormation template
  • Deploy the stack using AWS CloudFormation in your AWS account

The deployment takes about 1–2 minutes and creates the DynamoDB lookup table, Lambda function, and S3 bucket containing the historical log files as outputs. Copy these values to a text editing application, such as Notepad.

Create ADA data products

We create three different data products for this demo, one for each data source that you’ll be querying to gain operational insights. A data product is a dataset (a collection of data such as a table or a CSV file) that has been successfully imported into ADA and that can be queried.

Create a CloudWatch data product

First, we create a data product for the application logs by setting up ADA to ingest the CloudWatch log group for the sample application (Lambda function). Use the CdkStack.LambdaFunction output to get the Lambda function ARN and locate the corresponding CloudWatch log group ARN on the CloudWatch console.

Then complete the following steps:

  1. On the ADA console, navigate to the ADA domain and create a CloudWatch data product.
  2. For Name¸ enter a name.
  3. For Source type, choose Amazon CloudWatch.
  4. Disable Automatic PII.

ADA has a feature that automatically detects personally identifiable information (PII) data during import that is enabled by default. For this demo, we disable this option for the data product because the discovery of PII data is not in the scope of this demo.

  1. Choose Next.
  2. Search for and choose the CloudWatch log group ARN copied from the previous step.
  3. Copy the log group ARN.
  4. On the data product page, enter the log group ARN.
  5. For CloudWatch Query, enter a query that you want ADA to get from the log group.

In this demo, we query the @message field because we’re interested in getting the application logs from the log group.

  1. Select how the data updates are triggered after initial import.

ADA can be configured to ingest the data from the source at flexible intervals (up to 15 minutes or later) or on demand. For the demo, we set the data updates to run hourly.

  1. Choose Next.

Next, ADA will connect to the log group and query the schema. Because the logs are in Apache Log Format, we transform the logs into separate fields so that we can run queries on the specific log fields. ADA provides four default transformations and supports custom transformation through a Python script. In this demo, we run a custom Python script to transform the JSON message field into Apache Log Format fields.

  1. Choose Transform schema.
  2. Choose Create new transform.
  3. Upload the apache-log-extractor-transform.py script from the /asset/transform_logs/ folder.
  4. Choose Submit.

ADA will transform the CloudWatch logs using the script and present the processed schema.

  1. Choose Next.
  2. In the last step, review the steps and choose Submit.

ADA will start the data processing, create the data pipelines, and prepare the CloudWatch log groups to be queried from the Query Workbench. This process will take a few minutes to complete and will be shown on the ADA console under Data Products.

Create an Amazon S3 data product

We repeat the steps to add the historical logs from the Amazon S3 data source and look up reference data from the DynamoDB table. For these two data sources, we don’t create custom transforms because the data formats are in CSV (for historical logs) and key attributes (for reference lookup data).

  1. On the ADA console, create a new data product.
  2. Enter a name (hist_logs) and choose Amazon S3.
  3. Copy the Amazon S3 URI (the text after arn:aws:s3:::) from the CdkStack.S3 output variable and navigate to the Amazon S3 console.
  4. In the search box, enter the copied text, open the S3 bucket, select the /logs folder, and choose Copy S3 URI.

The historical logs are stored in this path.

  1. Navigate back to the ADA console and enter the copied S3 URI for S3 location.
  2. For Update Trigger, select On Demand because the historical logs are updated at an unspecified frequency.
  3. For Update Policy, select Append to append newly imported data to the existing data.
  4. Choose Next.

ADA processes the schema for the files in the selected folder path. Because the logs are in CSV format, ADA is able to read the column names without requiring additional transformations. However, the columns status_code and request_size are inferred as long type by ADA. We want to keep the column data types consistent among the data products so that we can join the data tables and query the data. The column status_code will be used to create joins across the data tables.

  1. Choose Transform schema to change the data types of the two columns to string data type.

Note the highlighted column names in the Schema preview pane prior to applying the data type transformations.

  1. In the Transform plan pane, under Built-in transforms, choose Apply Mapping.

This option allows you to change the data type from one type to another.

  1. In the Apply Mapping section, deselect Drop other fields.

If this option is not disabled, only the transformed columns will be preserved and all other columns will be dropped. Because we want to retain all the columns, we disable this option.

  1. Under Field Mappings¸ for Old name and New name, enter status_code and for New type, enter string.
  2. Choose Add Item.
  3. For Old name and New name¸ enter request_size and for New data type, enter string.
  4. Choose Submit.

ADA will apply the mapping transformation on the Amazon S3 data source. Note the column types in the Schema preview pane.

  1. Choose View sample to preview the data with the transformation applied.

ADA will display the PII data acknowledgement to ensure that either only authorized users can view the data or that the dataset doesn’t contain any PII data.

  1. Choose Agree to continue to view the sample data.

Note that the schema is identical to the CloudWatch log group schema because both the current application and historical application logs are in Apache Log Format.

  1. In the final step, review the configuration and choose Submit.

ADA starts processing the data from the Amazon S3 source, creates the backend infrastructure, and prepares the data product. This process takes a few minutes depending upon the size of the data.

Create a DynamoDB data product

Lastly, we create a DynamoDB data product. Complete the following steps:

  1. On the ADA console, create a new data product.
  2. Enter a name (lookup) and choose Amazon DynamoDB.
  3. Enter the Cdk.DynamoDBTable output variable for DynamoDB Table ARN.

This table contains key attributes that will be used as a lookup table in this demo. For the lookup data, we are using the HTTP codes and long and short descriptions of the codes. You can also use PostgreSQL, MySQL, or a CSV file source as an alternative.

  1. For Update Trigger, select On-Demand.

The updates will be on demand because the lookup is mostly for reference purpose while querying and any updates to the lookup data can be updated in ADA using on-demand triggers.

  1. Choose Next.

ADA reads the schema from the underlying DynamoDB schema and presents the column name and type for optional transformation. We will proceed with the default schema selection because the column types are consistent with the types from the CloudWatch log group and Amazon S3 CSV data source. Having data types that are consistent across the data sources allows us to write queries to fetch records by joining the tables using the column fields. For example, the column key in the DynamoDB schema corresponds to the status_code in the Amazon S3 and CloudWatch data products. We can write queries that can join the three tables using the column name key. An example is shown in the next section.

  1. Choose Continue with current schema.
  2. Review the configuration and choose Submit.

ADA will process the data from the DynamoDB table data source and prepare the data product. Depending upon the size of the data, this process takes a few minutes.

Now we have all the three data products processed by ADA and available for you to run queries.

Use the Query Workbench to query the data

ADA allows you to run queries against the data products while abstracting the data source and making it accessible using SQL (Structured Query Language). You can write queries and join the tables just as you would query against tables in a relational database. We demonstrate ADA’s querying capability via two user scenarios. In both the scenarios, we join an application log dataset to the error codes lookup table. In the first use case, we query the current application logs to identify the top 10 most accessed application endpoints along with the corresponding HTTP status codes:

--Query the top 10 Application endpoints along with the corresponding HTTP request type and HTTP status code.

SELECT logs.endpoint AS Application_EndPoint, logs.http_request AS REQUEST, count(logs.endpoint) as Endpoint_Count, ref.key as HTTP_Status_Code, ref.short as Description
FROM cw_domain.cloud_watch_application_logs logs
INNER JOIN cw_domain.lookup ref ON logs.status_code = ref.key
where logs.status_code LIKE '4%%' OR logs.status_code LIKE '5%%' -- = '/v1/server'
GROUP BY logs.endpoint, logs.http_request, ref.key, ref.short
ORDER BY Endpoint_Count DESC
LIMIT 10

In the second example, we query the historical logs table to get the top 10 application endpoints with the most errors to understand the endpoint call pattern:

-- Query Historical Logs to get the top 10 Application Endpoints with most number of errors along with an explanation of the error code.

SELECT endpoint as Application_EndPoint, count(status_code) as Error_Count, ref.long as Description FROM cw_domain.hist_logs hist
INNER JOIN cw_domain.lookup ref ON hist.status_code = ref.key
WHERE hist.status_code LIKE '4%%' OR hist.status_code LIKE '5%%'
GROUP BY endpoint, status_code, ref.long
ORDER BY Error_Count desc
LIMIT 10

In addition to querying, you can optionally save the query and share the saved query with other users in the same domain. The shared queries are accessible directly from the Query Workbench. The query results can also be exported to CSV format.

Visualize ADA data products in Tableau

ADA offers the ability to connect to third-party BI tools to visualize data and create reports from the ADA data products. In this demo, we use ADA’s native integration with Tableau to visualize the data from the three data products we configured earlier. Using Tableau’s Athena connector and following the steps in Tableau configuration, you can configure ADA as a data source in Tableau. After a successful connection has been established between Tableau and ADA, Tableau will populate the three data products under the Tableau catalog cw_domain.

We then establish a relationship across the three databases using the HTTP status code as the joining column, as shown in the following screenshot. Tableau allows us to work in online and offline mode with the data sources. In online mode, Tableau will connect to ADA and query the data products live. In offline mode, we can use the Extract option to extract the data from ADA and import the data in to Tableau. In this demo, we import the data in to Tableau to make the querying more responsive. We then save the Tableau workbook. We can inspect the data from the data sources by choosing the database and Update Now.

With the data source configurations in place in Tableau, we can create custom reports, charts, and visualizations on the ADA data products. Let’s consider two use cases for visualizations.

As shown in the following figure, we visualized the frequency of the HTTP errors by application endpoints using Tableau’s built-in heat map chart. We filtered out the HTTP status codes to only include error codes in the 4xx and 5xx range.

We also created a bar chart to depict the application endpoints from the historical logs ordered by the count of HTTP error codes. In this chart, we can see that the /v1/server/admin endpoint has generated the most HTTP error status codes.

Clean up

Cleaning up the sample application infrastructure is a two-step process. First, to remove the infrastructure provisioned for the purposes of this demo, run the following command in the terminal:

cdk destroy

For the following question, enter y and AWS CDK will delete the resources deployed for the demo:

Are you sure you want to delete: CdkStack (y/n)? y

Alternatively, you can remove the resources via the AWS CloudFormation console by navigating to the CdkStack stack and choosing Delete.

The second step is to uninstall ADA. For instructions, refer to Uninstall the solution.

Conclusion

In this post, we demonstrated how to use the ADA solution to derive insights from application logs stored across two different data sources. We demonstrated how to install ADA on an AWS account and deploy the demo components using AWS CDK. We created data products in ADA and configured the data products with the respective data sources using the ADA’s built-in data connectors. We demonstrated how to query the data products using standard SQL queries and generate insights on the log data. We also connected the Tableau Desktop client, a third-party BI product, to ADA and demonstrated how to build visualizations against the data products.

ADA automates the process of ingesting, transforming, governing, and querying diverse datasets and simplifying the lifecycle management of data. ADA’s pre-built connectors allow you to ingest data from diverse data sources. Software teams with basic knowledge of AWS products and services will be able to set up an operational data analytics platform in a few hours and provide secure access to the data. The data can then be easily and quickly queried using an intuitive and standalone web user interface.

Try out ADA today to easily manage and gain insights from data.


About the authors

Aparajithan Vaidyanathan is a Principal Enterprise Solutions Architect at AWS. He supports enterprise customers migrate and modernize their workloads on AWS cloud. He is a Cloud Architect with 23+ years of experience designing and developing enterprise, large-scale and distributed software systems. He specializes in Machine Learning & Data Analytics with focus on Data and Feature Engineering domain. He is an aspiring marathon runner and his hobbies include hiking, bike riding and spending time with his wife and two boys.

Rashim Rahman is a Software Developer based out of Sydney, Australia with 10+ years of experience in software development and architecture. He works primarily on building large scale open-source AWS solutions for common customer use cases and business problems. In his spare time, he enjoys sports and spending time with friends and family.

Hafiz Saadullah is a Principal Technical Product Manager at Amazon Web Services. Hafiz focuses on AWS Solutions, designed to help customers by addressing common business problems and use cases.

The art and science of data product portfolio management

Post Syndicated from Faris Haddad original https://aws.amazon.com/blogs/big-data/the-art-and-science-of-data-product-portfolio-management/

This post is the first in a series dedicated to the art and science of practical data mesh implementation (for an overview of data mesh, read the original whitepaper The data mesh shift). The series attempts to bridge the gap between the tenets of data mesh and its real-life implementation by deep-diving into the functional and non-functional capabilities essential to a working operating model, laying out the decisions that need to be made for each capability, and describing the key business and technical processes required to implement them. Taken together, the posts in this series lay out some possible operating models for data mesh within an organization.

Kudzu

Kudzu—or kuzu (クズ)—is native to Japan and southeast China. First introduced to the southeastern United States in 1876 as a promising solution for erosion control, it now represents a cautionary tale about unintended consequences, as Kudzu’s speed of growth outcompetes everything from native grasses to tree systems by growing over and shading them from the sunlight they need to photosynthesize—eventually leading to species extinction and loss of biodiversity. The story of Kudzu offers a powerful analogy to the dangers and consequences of implementing data mesh architectures without fully understanding or appreciating how they are intended to be used. When the “Kudzu” of unmanaged pseudo-data products (methods of sharing data that masquerade as data products while failing to fulfill the myriad obligations associated with them) has overwhelmed the local ecosystem of true data products, eradication is costly and prone to failure, and can represent significant wasted effort and resources, as well as lost time.

Desert

While Kudzu was taking over the south in the 1930s, desertification caused by extensive deforestation was overwhelming the Midwest, with large tracts of land becoming barren and residents forced to leave and find other places to make a living. In the same way, overly restrictive data governance practices that either prevent data products from taking root at all, or pare them back too aggressively (deforestation), can over time create “data deserts” that drive both the producers and consumers of data within an organization to look elsewhere for their data needs. At the same time, unstructured approaches to data mesh management that don’t have a vision for what types of products should exist and how to ensure they are developed are at high risk of creating the same effect through simple neglect. This is due to a  common misconception about data mesh as a data strategy, which is that it is effectively self-organizing—meaning that once presented with the opportunity, data owners within the organization will spring to the responsibilities and obligations associated with publishing high-quality data products. In reality, the work of a data producer is often thankless, and without clear incentive strategies, organizations may end up with data deserts that create more data governance issues as producers and consumers go elsewhere to seek out the data they need to perform work.

Bonsai

Bonsai (盆栽) is an art form originating from an ancient Chinese tradition called penjing (盆景), and later shaped by the minimalist teachings of Zen Buddhism into the practice we know and recognize today. The patient practice of Bonsai offers useful analogies to the concepts and processes required to avoid the chaos of Kudzu as well as the specter of organizational data deserts. Bonsai artists carefully observe the naturally occurring buds that are produced by the tree and encourage those that add to the overall aesthetics of the tree, while pruning those that don’t work well with their neighbors. The same ideas apply equally well to data products within a data mesh—by encouraging the growth and adoption of those data products that add value to our data mesh, and continuously pruning those that do not, we maximize the value and sustainability of our data mesh implementations. In a similar vein, Bonsai artists must balance their vision for the shape of the tree with a respect for the natural characteristics and innate structure of the species they have chosen to work with—to ignore the biology of the tree would be disastrous to the longevity of the tree, as well as to the quality of the art itself. In the same way, organizations seeking to implement successful data mesh strategies must respect the nature and structure (legal, political, commercial, technology) of their organizations in their implementation.

Of the key capabilities proposed for the implementation of a sustainable data mesh operating model, the one that is most relevant to the problems we’ve described—and explore later in this post—is data product portfolio management.

Overview of data product portfolio management

Data mesh architectures are, by their nature, ideal for implementation within federated organizations, with decentralized ownership of data and clear legal, regulatory, or commercial boundaries between entities or lines of business. The same organizational characteristics that make data mesh architectures valuable, however, also put them at risk of turning into one of the twin nightmares of Kudzu or data deserts.

To define the shape and nature of an organizational data mesh, a number of key questions need to be answered, including but not limited to:

  • What are the key data domains within the organization? What are the key data products within these domains needed to solve current business problems? How do we iterate on this discovery process to add value while we are mapping our domains?
  • Who are the consumers in our organization, and what logical, regulatory, physical, or commercial boundaries might separate them from producers and their data products?
  • How do we encourage the development and maintenance of key data products in a decentralized organization?
  • How do we monitor data products against their SLAs, and ensure alerting and escalation on failure so that the organization is protected from bad data?
  • How do we enable those we see as being autonomous producers and consumers with the right skills, the right tools, and the right mindset to actually want to (and be able to) take more ownership of independently publishing data as a product and consuming it responsibly?
  • What is the lifecycle of a data product? When do new data products get created, and who is allowed to create them? When are data products deprecated, and who is accountable for the consequences to their consumers?
  • How do we define “risk” and “value” in the context of data products, and how can we measure this? Whose responsibility is it to justify the existence of a given data product?

To answer questions such as these and plan accordingly, organizations must implement data product portfolio management (DPPM). DPPM does not exist in a vacuum—by its nature, DPPM is closely related to and interdependent with enterprise architecture practices like business capability management and project portfolio management. DPPM itself may therefore also be considered, in part, an enterprise architecture practice.

As an enterprise architecture practice, DPPM is responsible for its implementation, which should reside within a function whose remit is appropriately global and cross-functional. This may be within the CDO office for those organizations that have a CDO or equivalent central data function, or the enterprise architecture team in organizations that do not.

Goals of DPPM

The goals of DPPM can be summarized as follows:

  • Protect value – DPPM protects the value of the organizational data strategy by developing, implementing, and enforcing frameworks to measure the contribution of data products to organizational goals in objective terms. Examples may include associated revenue, savings, or reductions in operational losses. Earlier in their lifecycle, data products may be measured by alternative metrics, including adoption (number of consumers) and level of activity (releases, interaction with consumers, and so on). In the pursuit of this goal, the DPPM capability is accountable for engaging with the business to continuously priorities where data as a product can add value and align delivery priority accordingly. Strategies for measuring value and prioritizing data products are explored later in this post.
  • Manage risk – All data products introduce risk to the organization—risk of wasted money and effort through non-adoption, risk of operational loss associated with improper use, and risk of failure on the part of the data product to meet requirements on availability, completeness, or quality. These risks are exacerbated in the case of proliferation of low-quality or unsupervised data products. DPPM seeks to understand and measure these risks on an individual and aggregated basis. This is a particularly challenging goal because what constitutes risk associated with the existence of a particular data product is determined largely by its consumers and is likely to change over time (though like entropy, is only ever likely to increase).
  • Guide evolution – The final goal of DPPM is to guide the evolution of the data product landscape to meet overarching organizational data goals, such as mutually exclusive or collectively exhaustive domains and data products, the identification and enablement of single-threaded ownership of product definitions, or the agile inclusion of new sources of data and creation of products to serve tactical or strategic business goals. Some principles for the management of data mesh evolution, and the evaluation of data products against organizational goals, are explored later in this post.

Challenges of DPPM

In this section, we explore some of the challenges of DPPM, and the pragmatic ways some of these challenges could be addressed.

Infancy

Data mesh as a concept is still relatively new. As such, there is little standardization associated with practical operating models for building and managing data mesh architectures, and no access to fully fledged out-of-the-box reference operating models, frameworks, or tools to support the practice of DPPM.

Some elements of DPPM are supported in disparate tools (for example, some data catalogs include basic community features that contribute to measuring value), but not in a holistic way. Over time, standardization of the processes associated with DPPM will likely occur as a side-effect of commoditization, driven by the popularity and adoption of new services that take on and automate more of the undifferentiated heavy lifting associated with mesh supervision. In the meantime, however, organizations adopting data mesh architectures are left largely to their own devices around how to operate them effectively.

Resistance

The purest expression of democracy is anarchy, and the more federated an organization is (itself a supporting factor in choosing data mesh architectures), the more resistance may be observed to any forms of centralized governance. This is a challenge for DPPM, because in some way it must come together in one place. Just as the Bonsai artist knows the vision for the entire tree, there must be a cohesive vision for and ability to guide the evolution of a data mesh, no matter how broadly federated and autonomous individual domains or data products might be.

Balancing this with the need to respect the natural shape (and culture) of an organization, however, requires organizations that implement DPPM to think about how to do so in a way that doesn’t conflict with the reality of the organization. This might mean, for example, that DPPM may need to happen at several layers—at minimum within data domains, possibly within lines of business, and then at an enterprise level through appropriate data committees, guilds, or other structures that bring stakeholders together. All of this complicates the processes and collaboration needed to perform DPPM effectively.

Maturity

Data mesh architectures, and therefore DPPM, presume relatively high levels of data maturity within an organization—a clear data strategy, understanding of data ownership and stewardship, principles and policies that govern the use of data, and a moderate-to-high level of education and training around data within the organization. A lack of data maturity within the organization, or a weak or immature enterprise architecture function, will face significant hurdles in the implementation of any data mesh architecture, let alone a strong and useful DPPM practice.

In reality, however, data maturity is not uniform across organizations. Even in seemingly low-maturity organizations, there are often teams who are more mature and have a higher appetite to engage. By leaning into these teams and showing value through them first, then using them as evangelists, organizations can gain maturity while benefitting earlier from the advantages of data mesh strategies.

The following sections explore the implementation of DPPM along the lines of people, process, and technology, as well as describing the key characteristics of data products—scope, value, risk, uniqueness, and fitness—and how they relate to data mesh practices.

People

To implement DPPM effectively, a wide variety of stakeholders in the organization may need to be involved in one capacity or another. The following table suggests some key roles, but it’s up to an individual organization to determine how and if these map to their own roles and functions.

Function RACI Role Responsibility
Senior Leadership A Chief Data Officer Ultimately accountable for organizational data strategy and implementation. Approves changes to DPPM principles and operating model. Acts as chair of, and appoints members to, the data council.
. R Data Council** Stakeholder body representing organizational governance around data strategy. Acts as steering body for the governance of DPPM as a practice (KPI monitoring, maturity assessments, auditing, and so on). Approves changes to guidelines and methodologies. Approves changes to data product portfolio (discussed later in this post). Approves and governs centrally funded and prioritized data product development activities.
Enterprise Architecture AR Head of Enterprise Architecture Responsible for development and enforcement of data strategy. Accountable and responsible for the design and implementation of DPPM as an organizational capability.
. R Domain Architect Responsible for the implementing screening, data product analysis, periodic evaluation, and optimal portfolio selection practices. Responsible for the development of methodologies and their selection criteria.
Legal & Compliance C Legal & Compliance Officer Consults on permissibility of data products with reference to local regulation. Consults on permissibility of data sharing with reference to local regulation or commercial agreements.
. C Data Privacy Officer Consults on permissibility of data use with reference to local data privacy law. Consults on permissibility of cross-entity or border data sharing with reference to data privacy law.
Information Security RC Information Security Officer Consults on maturity assessments (discussed later in this post) for information security-relevant data product capabilities. Approves changes to data product technology architecture. Approves changes to IAM procedures relating to data products.
Business Functions A Data Domain Owner Ultimately accountable for the appropriate use of domain data, as well as its quality and availability. Accountable for domain data products. Approves changes to the domain data model and domain data product portfolio.
c R Data Domain Steward Responsible for implementing data domain responsibilities, including operational (day-to-day) governance of domain data products. Approves use of domain data in new data products, and performs regular (such as yearly) attestation of data products using domain data.
. A Data Owner Ultimately accountable for the appropriate use of owned data (for example, CRM data), as well as its quality and availability.
. R Data Steward Responsible for implementing data responsibilities. Approves use of owned data in new data products, and performs regular (such as yearly) attestation of data products using owned data.
. AR Data Product Owner Accountable and responsible for the design, development, and delivery of data products against their stated SLOs. Contributes to data product analysis and portfolio adjustment practices for own data products.

** The data council typically consists of permanent representatives from each function (data domain owners), enterprise architecture, and the chief data officer or equivalent.

Process

The following diagram illustrates the strategic, tactical, and operational practices associated with DPPM. Some considerations for the implementation of these practices is explored in more detail in this post, though their specific interpretation and implementation is dependent on the individual organization.

Boundaries

When reading this section, it’s important to bear in mind the impact of boundaries—although strategy development may be established as a global practice, other practices within DPPM must respect relevant organizational boundaries (which may be physical, geographical, operational, legal, commercial, or regulatory in nature). In some cases, the existence of boundaries may require some or all tactical and operational practices to be duplicated within each associated boundary. For example, an insurance company with a property and casualty legal entity in North America and a life entity in Germany may need to implement DPPM separately within each entity.

Strategy development

This practice deals with answering questions associated with the overall data mesh strategy, including the following:

  • The overall scope (data domains, participating entities, and so on) of the data mesh
  • The degree of freedom of participating entities in their definition and implementation of the data mesh (for example, a mesh of meshes vs. a single mesh)
  • The distribution of responsibilities for activities and capabilities associated with the data mesh (degree of democratization)
  • The definition and documentation of key performance indicators (KPIs) against which the data mesh should be governed (such as risk and value)
  • The governance operating model (including this practice)

Key deliverables include the following:

  • Organizational guidelines for operational processes around pre-screening and screening of data products
  • Well-defined KPIs that guide methodology development and selection for practices like data product analysis, screening, and optimal portfolio selection
  • Allocation of organizational resources (people, budget, time) to the implementation of tactical processes around methodology development, optimal portfolio selection, and portfolio adjustment

Key considerations

In this section, we discuss some key considerations for strategy development.

Data mesh structure

This diagram illustrates the analogous relationship between data products in a data mesh, and the structure of the mesh itself.

The following considerations relate to screening, data product analysis, and optimal portfolio selection.

  • Trunk (core data products) – Core data products are those that are central to the organization’s ability to function, and from which the majority of other data products are derived. These may be data products consumed in the implementation of key business activities, or associated with critical processes such as regulatory reporting and risk management. Organizational governance for these data products typically favors availability and data accuracy over agility.
  • Branch (cross-domain data products) – Cross-domain data products represent the most common cross-domain use cases for data (for example, joining customer data with product data). These data products may be widely used across business functions to support reporting and analytics, and—to a lesser extent—operational processes. Because these data products may consume a variety of sources, organizational governance may favor a balanced view on agility vs. reliability, accepting some degree of risk in return for being able to adapt to changes in data sources. Data product versioning can offer mitigation of risks associated with change.
  • Leaf (everything else) – These are the myriad data products that may arise within a data mesh, either as permanent additions to support individual teams and use cases or as temporary data products to fill data gaps or support time-limited initiatives. Because the number of these data products may be high and risks are typically limited to a single process or a small part of the organization, organizational governance typically favors a light touch and may prefer to govern through guidelines and best practices, rather than through active participation in the data product lifecycle.

Data products vs. data definitions

The following figure illustrates how data definitions are defined and inherited throughout the lineage of data products.

In a data mesh architecture, data products may inherit data from each other (one data product consumes another in its data pipeline) or independently publish data within (or related to) the same domain. For example, a customer data product may be inherited by a customer support data product, while another the customer journey data product may directly publish customer-relevant data from independent sources. When no standards are applied to how domain data attributes are used and published, data products even within the same data domain may lose interoperability because it becomes difficult or impossible to join them together for reporting or analytics purposes.

To prevent this, it can be useful to distinguish between data products and data definitions. Typically, organizations will select a single-threaded owner (often a data owner or steward, or a domain data owner or steward) who is responsible for defining minimal data definitions for common and reusable data entities within data domains. For example, a data owner responsible for the sales and marketing data domain may identify a customer data product as a reusable data entity within the domain and publish a minimal data definition that all producers of customer-relevant data must incorporate within their data products, to ensure that all data products associated with customer data are interoperable.

DPPM can assist in the identification and production of data definitions as part of its data product analysis activities, as well as enforce their incorporation as part of oversight of data product development.

Service management thinking

These considerations relate to data product analysis, periodic evaluation, and methodology selection.

Data products are services provided to the organization or externally to customers and partners. As such, it may make sense to adapt a service management framework like ITIL, in combination with the ITIL Maturity Model, for use in evaluating the fitness of data products for their scope and audience, as well as in describing the roles, processes, and acceptable technologies that should form the operating model for any data product.

At the operational level, the stakeholders required to implement each practice may change depending on the scope of the data product. For example, the release management practice for a core data product may require involvement of the data council, whereas the same practice for a team data product may only involve the team or functional head. To avoid creating decision-making bottlenecks, organizations should aim to minimize the number of stakeholders in each case and focus on single-threaded owners wherever possible.

The following table proposes a subset of capabilities and how they might be applied to data products of different scopes. Suggested target maturity levels, between 1 and 5, are included for each scope. (1= Initial, 5= Optimizing)

Target Maturity Data Product Scope.
4 – 5 3 – 4 2 – 3 2
Capability    Core
  Cross-Domain
  Function / Team
  Personal
Information Security Management X X X X
Knowledge Management X X X .
Release Management X X X .
Service-Level Management X X X .
Measurement and Reporting X X . .
Availability Management X X . .
Capacity and Performance Management X X . .
Incident Management X X . .
Monitoring and Event Management X X . .
Service Validation and Testing X X . .

Methodology development

This practice deals with the development of concrete, objective frameworks, metrics, and processes for the measurement of data product value and risk. Because the driving factors behind risk and value are not necessarily the same between products, it may be necessary to develop several methodologies or variants thereof.

Key deliverables include the following:

  • Well-defined frameworks for measuring risk and value of data products, as well as for determining the optimal portfolio of data products
  • Operationally feasible, measurable metrics associated with value and risk

Key considerations

A key consideration for assessing data products is that of consumer value or risk vs. uniqueness. The following diagram illustrates how value and risk of a data product are driven by its consumers.

Data products don’t inherently present risk or add value, but rather indirectly pose—in an aggregated fashion—the risk and value created by their consumers.

In a consumer-centric value and risk model, governance of consumers ensures that all data use meets the following requirements:

  • Is associated with a business case justifying the use of data (for example, new business, cost reduction through business process automation, and so on)
  • Is regularly evaluated with reference to the risk associated with the use case (for example, regulatory reporting

The value and risk associated with the linked data products are then calculated as an aggregation. Where organizations already track use cases associated with data, either as part of data privacy governance or as a by-product of the access approval process, these existing systems and databases can be reused or extended.

Conversely, where data products overlap with each other, their value to the organization is reduced accordingly, because redundancies between data products represent an inefficient use of resources and increase organizational complexity associated with data quality management.

To ensure that the model is operationally feasible (see the key deliverables of methodology development), it may be sufficient to consider simple aggregations, rather than attempting to calculate value and risk attribution at a product or use case level.

Optimal portfolio selection

This practice deals with the determination of which combination of data products (existing, new, or potential) would best meet the organization’s current and known future needs. This practice takes input from data product analysis and data product proposals, as well as other enterprise architecture practices (for example, business architecture), and considers trade-offs between data-debt and time-to-value, as well as other considerations such as redundancy between data products to determine the optimal mix of permanent and temporary data products at any given point in time.

Because the number of data products in an organization may become significant over time, it may be useful to apply heuristics to the problem of optimal portfolio selection. For example, it may be sufficient to consider core and cross-domain data products (trunk and branches) during quarterly portfolio reviews, with other data products (leaves) audited on a yearly basis.

Key deliverables include the following:

  • A target state definition for the data mesh, including all relevant data products
  • An indication of organizational priorities for use by the portfolio adjustment practice

Key considerations

The following are key considerations regarding the data product half-life:

  • Long-term or strategic data products – These data products fill a long-term organizational need, are often associated with key source systems in various domains, and anchor the overall data strategy. Over time, as an organization’s data mesh matures, long-term data products should form the bulk of the mesh.
  • Time-bound data products – These data products fill a gap in data strategy and allow the organization to move on data opportunities until core data products can be updated. An example of this might be data products created and used in the context of mergers and acquisitions transactions and post-acquisition, to provide consistent data for reporting and business intelligence until mid-term and long-term application consolidation has taken place. Time-bound data products are considered as data-debt and should be managed accordingly.
  • Purpose-driven data products – These data products serve a narrow, finite purpose. Purpose-driven data products may or may not be time-bound, but are characterized primarily by a strict set of consumers known in advance. Examples of this might include:
    • Data products developed to support system-of-record harmonization between lines of business (for example, deduplication of customer records between insurance lines of business using separate CRM systems
    • Data products created explicitly for the monitoring of other data products (data quality, update frequency, and so on)

Portfolio adjustment

This practice implements the feasibility analysis, planning and project management, as well as communication and organizational change management activities associated with changes to the optimal portfolio. As part of this practice, a gap analysis is conducted between the current and target data product portfolio, and a set of required actions and estimated time and effort prepared for review by the organization. During such a period, data products may be marked for development (new data products to fill a need), changes, consolidation (merging two or more data products into a single data product), or deprecation. Several iterations of optimal portfolio selection and portfolio adjustment may be required to find an appropriate balance between optimality and feasibility of implementation.

Key deliverables include the following:

  • A gap analysis between the current and target data product portfolio, as well as proposed remediation activities
  • High-level project plans and effort or budget assessments associated with required changes, for approval by relevant stakeholders (such as the data council)

Data product proposals

This practice organizes the collection and prioritization of requests for new, or changes to existing, data products within the organization. Its implementation may be adapted from or managed by existing demand management processes within the organization.

Key deliverables include a registry of demand against new or existing data products, including metadata on source systems, attributes, known use cases, proposed data product owners, and suggested organizational priority.

Methodology selection

This practice is associated with the identification and application of the most appropriate methodologies (such as value and risk) during data product analysis, screening, and optimal portfolio selection. The selection of an appropriate methodology for the type, maturity, and scope of a data product (or an entire portfolio) is a key element in avoiding either a “Kudzu” mesh or a “data desert.”

Key deliverables include reusable selection criteria for mapping methodologies to data products during data product analysis, screening, and optimal portfolio selection.

Pre-screening

This optional practice is primarily a mechanism to avoid unnecessary time and effort in the comparatively expensive practice of data product analysis by offering simple self-service applications of guidelines to the evaluation of data products. An example might include the automated approval of data products that fall under the classification of personal data products, requiring only attestation on the part of the requester that they will uphold the relevant portions of the guideline that governs such data products.

Key deliverables include tools and checklists for the self-service evaluation of data products against guidelines and automated registration of approved data products.

Data product analysis

This practice incorporates guidelines, methodologies, as well as (where available) metadata relating to data products (performance against SLOs, service management metrics, degree of overlap with other data products) to establish an understanding of the value and risk associated with individual data products, as well as gaps between current and target capability maturities, and compliance with published product definitions and standards.

Key deliverables include a summary of findings for a particular data product, including scores for relevant value, risk, and maturity metrics, as well as operational gaps requiring remediation and recommendations on next steps (repair, enhance, decommission, and so on).

Screening

This optional practice is a mechanism to reduce complexity in optimal portfolio selection by ensuring the early removal of data products from consideration that fail to meet value or risk targets, or have been identified as redundant to other data products already available in the organization.

Key deliverables include a list of data products that should be slated for removal (direct-to-decommissioning).

Data product development

This practice is not performed directly under DPPM, but is managed in part by the portfolio adjustment practice, and may be governed by standards that are developed as part of DPPM. In the context of DPPM, this practice is primarily associated with ensuring that data products are developed according to the specifications agreed as part of portfolio adjustment.

Key deliverables include project management and software or service development deliverables and artefacts.

Data product decommissioning

This practice manages the decommissioning of data products and the migration of affected consumers to new or other data products where relevant. Unmanaged decommissioning of data products, especially those with many downstream consumers, can threaten the stability of the entire data mesh, as well as have significant consequences to business functions.

Key deliverables include a decommissioning plan, including stakeholder assessment and sign-off, timelines, migration plans for affected consumers, and back-out strategies.

Periodic evaluation

This practice manages the calendar and implementation of periodic reviews of the data mesh, both in its entirety as well as at the data product level, and is primarily an exercise in project management.

Key deliverables include the following:

  • yearly review calendar, published and made available to all data product owners and affected stakeholders
  • Project management deliverables and artefacts, including evidence of evaluations having been performed against each data product

Technology

Although most practices within DPPM don’t rely heavily on technology and automation, some key supporting applications and services are required to implement DPPM effectively:

  • Data catalog – Core to the delivery of DPPM is the organizational data catalog. Beyond providing transparency into what data products exist within an organization, a data catalog can provide key insights into data lineage between data products (key to the implementation of portfolio adjustment) and adoption of data products by the organization. The data catalog can also be used to capture and make available both the documented as well as the realized SLO for any given data product, and—through the use of a business glossary—assist in the identification of redundancy between data products.
  • Service management – Service management solutions (such as ServiceNOW) used in the context of data product management offer important insights into the fitness of data products by capturing and tracking incidents, problems, requests, and other metrics against data products.
  • Demand management – A demand management solution supports self-service implementation and automation of data product proposal and pre-screening activities, as well as prioritization activities associated with selection and development of data products.

Conclusion

Although this post focused on implementing DPPM in the context of a data mesh, this capability—like data product thinking—is not exclusive to data mesh architectures. The practices outlined here can be practiced at any scale to ensure that the production and use of data within the organization is always in line with its current and future needs, that governance is implemented in a consistent way, and that the organization can have Bonsai, not Kudzu.

For more information about data mesh and data management, refer to the following:

In upcoming posts, we will cover other aspects of data mesh operating models, including data mesh supervision and service management models for data product owners.


About the Authors


Maximilian Mayrhofer
is a Principal Solutions Architect working in the AWS Financial Services EMEA Go-to-Market team. He has over 12 years experience in digital transformation within private banking and asset management. In his free time, he is an avid reader of science fiction and enjoys bouldering.


Faris Haddad
is the Data & Insights Lead in the AABG Strategic Pursuits team. He helps enterprises successfully become data-driven.

Near-real-time analytics using Amazon Redshift streaming ingestion with Amazon Kinesis Data Streams and Amazon DynamoDB

Post Syndicated from Poulomi Dasgupta original https://aws.amazon.com/blogs/big-data/near-real-time-analytics-using-amazon-redshift-streaming-ingestion-with-amazon-kinesis-data-streams-and-amazon-dynamodb/

Amazon Redshift is a fully managed, scalable cloud data warehouse that accelerates your time to insights with fast, easy, and secure analytics at scale. Tens of thousands of customers rely on Amazon Redshift to analyze exabytes of data and run complex analytical queries, making it the widely used cloud data warehouse. You can run and scale analytics in seconds on all your data without having to manage your data warehouse infrastructure.

You can use the Amazon Redshift streaming ingestion capability to update your analytics databases in near-real time. Amazon Redshift streaming ingestion simplifies data pipelines by letting you create materialized views directly on top of data streams. With this capability in Amazon Redshift, you can use SQL (Structured Query Language) to connect to and directly ingest data from data streams, such as Amazon Kinesis Data Streams or Amazon Managed Streaming for Apache Kafka (Amazon MSK) data streams, and pull data directly to Amazon Redshift.

In this post, we discuss a solution that uses Amazon Redshift streaming ingestion to provide near-real-time analytics.

Overview of solution

We walk through an example pipeline to ingest data from an Amazon DynamoDB source table in near-real time using Kinesis Data Streams in combination with Amazon Redshift streaming ingestion. We also walk through using PartiQL in Amazon Redshift to unnest nested JSON documents and build fact and dimension tables that are used in your data warehouse refresh. The solution uses Kinesis Data Streams to capture item-level changes from an application DynamoDB table.

As shown in the following reference architecture, DynamoDB table data changes are streamed into Amazon Redshift through Kinesis Data Streams and Amazon Redshift streaming ingestion for near-real-time analytics dashboard visualization using Amazon QuickSight.

The process flow includes the following steps:

  1. Create a Kinesis data stream and turn on the data stream from DynamoDB to capture item-level changes in your DynamoDB table.
  2. Create a streaming materialized view in your Amazon Redshift cluster to consume live streaming data from the data stream.
  3. The streaming data gets ingested into a JSON payload. Use a combination of a PartiQL statement and dot notation to unnest the JSON document into data columns of a staging table in Amazon Redshift.
  4. Create fact and dimension tables in the Amazon Redshift cluster and keep loading the latest data at regular intervals from the staging table using transformation logic.
  5. Establish connectivity between a QuickSight dashboard and Amazon Redshift to deliver visualization and insights.

Prerequisites

You must have the following:

Set up a Kinesis data stream

To configure your Kinesis data stream, complete the following steps:

  1. Create a Kinesis data stream called demo-data-stream. For instructions, refer to Step 1 in Set up streaming ETL pipelines.

Configure the stream to capture changes from the DynamoDB table.

  1. On the DynamoDB console, choose Tables in the navigation pane.
  2. Open your table.
  3. On the Exports and streams tab, choose Turn on under Amazon Kinesis data stream details.

  1. For Destination Kinesis data stream, choose demo-data-stream.
  2. Choose Turn on stream.

Item-level changes in the DynamoDB table should now be flowing to the Kinesis data stream.

  1. To verify if the data is entering the stream, on the Kinesis Data Streams console, open demo-data-stream.
  2. On the Monitoring tab, find the PutRecord success – average (Percent) and PutRecord – sum (Bytes) metrics to validate record ingestion.

Set up streaming ingestion

To set up streaming ingestion, complete the following steps:

  1. Set up the AWS Identity and Access Management (IAM) role and trust policy required for streaming ingestion. For instructions, refer to Steps 1 and 2 in Getting started with streaming ingestion from Amazon Kinesis Data Streams.
  2. Launch the Query Editor v2 from the Amazon Redshift console or use your preferred SQL client to connect to your Amazon Redshift cluster for the next steps.
  3. Create an external schema:
CREATE EXTERNAL SCHEMA demo_schema
FROM KINESIS
IAM_ROLE { default | 'iam-role-arn' };
  1. To use case-sensitive identifiers, set enable_case_sensitive_identifier to true at either the session or cluster level.
  2. Create a materialized view to consume the stream data and store stream records in semi-structured SUPER format:
CREATE MATERIALIZED VIEW demo_stream_vw AS
    SELECT approximate_arrival_timestamp,
    partition_key,
    shard_id,
    sequence_number,
    json_parse(kinesis_data) as payload    
    FROM demo_schema."demo-data-stream";
  1. Refresh the view, which triggers Amazon Redshift to read from the stream and load data into the materialized view:
REFRESH MATERIALIZED VIEW demo_stream_vw;

You can also set your streaming materialized view to use auto refresh capabilities. This will automatically refresh your materialized view as data arrives in the stream. See CREATE MATERIALIZED VIEW for instructions on how to create a materialized view with auto refresh.

Unnest the JSON document

The following is a sample of a JSON document that was ingested from the Kinesis data stream to the payload column of the streaming materialized view demo_stream_vw:

{
  "awsRegion": "us-east-1",
  "eventID": "6d24680a-6d12-49e2-8a6b-86ffdc7306c1",
  "eventName": "INSERT",
  "userIdentity": null,
  "recordFormat": "application/json",
  "tableName": "sample-dynamoDB",
  "dynamodb": {
    "ApproximateCreationDateTime": 1657294923614,
    "Keys": {
      "pk": {
        "S": "CUSTOMER#CUST_123"
      },
      "sk": {
        "S": "TRANSACTION#2022-07-08T23:59:59Z#CUST_345"
      }
    },
    "NewImage": {
      "completionDateTime": {
        "S": "2022-07-08T23:59:59Z"
      },
      "OutofPockPercent": {
        "N": 50.00
      },
      "calculationRequirements": {
        "M": {
          "dependentIds": {
            "L": [
              {
                "M": {
                  "sk": {
                    "S": "CUSTOMER#2022-07-08T23:59:59Z#CUST_567"
                  },
                  "pk": {
                    "S": "CUSTOMER#CUST_123"
                  }
                }
              },
              {
                "M": {
                  "sk": {
                    "S": "CUSTOMER#2022-07-08T23:59:59Z#CUST_890"
                  },
                  "pk": {
                    "S": "CUSTOMER#CUST_123"
                  }
                }
              }
            ]
          }
        }
      },
      "Event": {
        "S": "SAMPLE"
      },
      "Provider": {
        "S": "PV-123"
      },
      "OutofPockAmount": {
        "N": 1000
      },
      "lastCalculationDateTime": {
        "S": "2022-07-08T00:00:00Z"
      },
      "sk": {
        "S": "CUSTOMER#2022-07-08T23:59:59Z#CUST_567"
      },
      "OutofPockMax": {
        "N": 2000
      },
      "pk": {
        "S": "CUSTOMER#CUST_123"
      }
    },
    "SizeBytes": 694
  },
  "eventSource": "aws:dynamodb"
}

We can use dot notation to unnest the JSON document. But in addition to that, we should use a PartiQL statement to handle arrays if applicable. For example, in the preceding JSON document, there is an array under the element:

"dynamodb"."NewImage"."calculationRequirements"."M"."dependentIds"."L".

The following SQL query uses a combination of dot notation and a PartiQL statement to unnest the JSON document:

select 
substring(a."payload"."dynamodb"."Keys"."pk"."S"::varchar, position('#' in "payload"."dynamodb"."Keys"."pk"."S"::varchar)+1) as Customer_ID,
substring(a."payload"."dynamodb"."Keys"."sk"."S"::varchar, position('#TRANSACTION' in "payload"."dynamodb"."Keys"."sk"."S"::varchar)+1) as Transaction_ID,
substring(b."M"."sk"."S"::varchar, position('#CUSTOMER' in b."M"."sk"."S"::varchar)+1) Dependent_ID,
a."payload"."dynamodb"."NewImage"."OutofPockMax"."N"::int as OutofPocket_Max,
a."payload"."dynamodb"."NewImage"."OutofPockPercent"."N"::decimal(5,2) as OutofPocket_Percent,
a."payload"."dynamodb"."NewImage"."OutofPockAmount"."N"::int as OutofPock_Amount,
a."payload"."dynamodb"."NewImage"."Provider"."S"::varchar as Provider,
a."payload"."dynamodb"."NewImage"."completionDateTime"."S"::timestamptz as Completion_DateTime,
a."payload"."eventName"::varchar Event_Name,
a.approximate_arrival_timestamp
from demo_stream_vw a
left outer join a."payload"."dynamodb"."NewImage"."calculationRequirements"."M"."dependentIds"."L" b on true;

The query unnests the JSON document to the following result set.

Precompute the result set using a materialized view

Optionally, to precompute and store the unnested result set from the preceding query, you can create a materialized view and schedule it to refresh at regular intervals. In this post, we maintain the preceding unnested data in a materialized view called mv_demo_super_unnest, which will be refreshed at regular intervals and used for further processing.

To capture the latest data from the DynamoDB table, the Amazon Redshift streaming materialized view needs to be refreshed at regular intervals, and then the incremental data should be transformed and loaded into the final fact and dimension table. To avoid reprocessing the same data, a metadata table can be maintained at Amazon Redshift to keep track of each ELT process with status, start time, and end time, as explained in the following section.

Maintain an audit table in Amazon Redshift

The following is a sample DDL of a metadata table that is maintained for each process or job:

create table MetaData_ETL
(
JobName varchar(100),
StartDate timestamp, 
EndDate timestamp, 
Status varchar(50)
);

The following is a sample initial entry of the metadata audit table that can be maintained at job level. The insert statement is the initial entry for the ELT process to load the Customer_Transaction_Fact table:

insert into MetaData_ETL 
values
('Customer_Transaction_Fact_Load', current_timestamp, current_timestamp,'Ready' );

Build a fact table with the latest data

In this post, we demonstrate the loading of a fact table using specific transformation logic. We are skipping the dimension table load, which uses similar logic.

As a prerequisite, create the fact and dimension tables in a preferred schema. In following example, we create the fact table Customer_Transaction_Fact in Amazon Redshift:

CREATE TABLE public.Customer_Transaction_Fact (
Transaction_ID character varying(500),
Customer_ID character varying(500),
OutofPocket_Percent numeric(5,2),
OutofPock_Amount integer,
OutofPocket_Max integer,
Provider character varying(500),
completion_datetime timestamp
);

Transform data using a stored procedure

We load this fact table from the unnested data using a stored procedure. For more information, refer to Creating stored procedures in Amazon Redshift.

Note that in this sample use case, we are using transformation logic to identify and load the latest value of each column for a customer transaction.

The stored procedure contains the following components:

  • In the first step of the stored procedure, the job entry in the MetaData_ETL table is updated to change the status to Running and StartDate as the current timestamp, which indicates that the fact load process is starting.
  • Refresh the materialized view mv_demo_super_unnest, which contains the unnested data.
  • In the following example, we load the fact table Customer_Transaction_Fact using the latest data from the streaming materialized view based on the column approximate_arrival_timestamp, which is available as a system column in the streaming materialized view. The value of approximate_arrival_timestamp is set when a Kinesis data stream successfully receives and stores a record.
  • The following logic in the stored procedure checks if the approximate_arrival_timestamp in mv_demo_super_unnest is greater than the EndDate timestamp in the MetaData_ETL audit table, so that it can only process the incremental data.
  • Additionally, while loading the fact table, we identify the latest non-null value of each column for every Transaction_ID depending on the order of the approximate_arrival_timestamp column using the rank and min
  • The transformed data is loaded into the intermediate staging table
  • The impacted records with the same Transaction_ID values are deleted and reloaded into the Customer_Transaction_Fact table from the staging table
  • In the last step of the stored procedure, the job entry in the MetaData_ETL table is updated to change the status to Complete and EndDate as the current timestamp, which indicates that the fact load process has completed successfully.

See the following code:

CREATE OR REPLACE PROCEDURE SP_Customer_Transaction_Fact()
AS $$
BEGIN

set enable_case_sensitive_identifier to true;

--Update metadata audit table entry to indicate that the fact load process is running
update MetaData_ETL
set status = 'Running',
StartDate = getdate()
where JobName = 'Customer_Transaction_Fact_Load';

refresh materialized view mv_demo_super_unnest;

drop table if exists Customer_Transaction_Fact_Stg;

--Create latest record by Merging records based on approximate_arrival_timestamp
create table Customer_Transaction_Fact_Stg as
select 
m.Transaction_ID,
min(case when m.rank_Customer_ID =1 then m.Customer_ID end) Customer_ID,
min(case when m.rank_OutofPocket_Percent =1 then m.OutofPocket_Percent end) OutofPocket_Percent,
min(case when m.rank_OutofPock_Amount =1 then m.OutofPock_Amount end) OutofPock_Amount,
min(case when m.rank_OutofPocket_Max =1 then m.OutofPocket_Max end) OutofPocket_Max,
min(case when m.rank_Provider =1 then m.Provider end) Provider,
min(case when m.rank_Completion_DateTime =1 then m.Completion_DateTime end) Completion_DateTime
from
(
select *,
rank() over(partition by Transaction_ID order by case when mqp.Customer_ID is not null then 1 end, approximate_arrival_timestamp desc ) rank_Customer_ID,
rank() over(partition by Transaction_ID order by case when mqp.OutofPocket_Percent is not null then 1 end, approximate_arrival_timestamp desc ) rank_OutofPocket_Percent,
rank() over(partition by Transaction_ID order by case when mqp.OutofPock_Amount is not null then 1 end, approximate_arrival_timestamp  desc )  rank_OutofPock_Amount,
rank() over(partition by Transaction_ID order by case when mqp.OutofPocket_Max is not null then 1 end, approximate_arrival_timestamp desc ) rank_OutofPocket_Max,
rank() over(partition by Transaction_ID order by case when mqp.Provider is not null then 1 end, approximate_arrival_timestamp  desc ) rank_Provider,
rank() over(partition by Transaction_ID order by case when mqp.Completion_DateTime is not null then 1 end, approximate_arrival_timestamp desc )  rank_Completion_DateTime
from mv_demo_super_unnest mqp
where upper(mqp.event_Name) <> 'REMOVE' and mqp.approximate_arrival_timestamp > (select mde.EndDate from MetaData_ETL mde where mde.JobName = 'Customer_Transaction_Fact_Load') 
) m
group by m.Transaction_ID 
order by m.Transaction_ID
;

--Delete only impacted Transaction_ID from Fact table
delete from Customer_Transaction_Fact  
where Transaction_ID in ( select mqp.Transaction_ID from Customer_Transaction_Fact_Stg mqp);

--Insert latest records from staging table to Fact table
insert into Customer_Transaction_Fact
select * from Customer_Transaction_Fact_Stg; 

--Update metadata audit table entry to indicate that the fact load process is completed
update MetaData_ETL
set status = 'Complete',
EndDate = getdate()
where JobName = 'Customer_Transaction_Fact_Load';
END;
$$ LANGUAGE plpgsql;

Additional considerations for implementation

There are several additional capabilities that you could utilize to modify this solution to meet your needs. Many customers utilize multiple AWS accounts, and it’s common that the Kinesis data stream may be in a different AWS account than the Amazon Redshift data warehouse. If this is the case, you can utilize an Amazon Redshift IAM role that assumes a role in the Kinesis data stream AWS account in order to read from the data stream. For more information, refer to Cross-account streaming ingestion for Amazon Redshift.

Another common use case is that you need to schedule the refresh of your Amazon Redshift data warehouse jobs so that the data warehouse’s data is continuously updated. To do this, you can utilize Amazon EventBridge to schedule the jobs in your data warehouse to run on a regular basis. For more information, refer to Creating an Amazon EventBridge rule that runs on a schedule. Another option is to use Amazon Redshift Query Editor v2 to schedule the refresh. For details, refer to Scheduling a query with query editor v2.

If you have a requirement to load data from a DynamoDB table with existing data, refer to Loading data from DynamoDB into Amazon Redshift.

For more information on Amazon Redshift streaming ingestion capabilities, refer to Real-time analytics with Amazon Redshift streaming ingestion.

Clean up

To avoid unnecessary charges, clean up any resources that you built as part of this architecture that are no longer in use. This includes dropping the materialized view, stored procedure, external schema, and tables created as part of this post. Additionally, make sure you delete the DynamoDB table and delete the Kinesis data stream.

Conclusion

After following the solution in this post, you’re now able to build near-real-time analytics using Amazon Redshift streaming ingestion. We showed how you can ingest data from a DynamoDB source table using a Kinesis data stream in order to refresh your Amazon Redshift data warehouse. With the capabilities presented in this post, you should be able to increase the refresh rate of your Amazon Redshift data warehouse in order to provide the most up-to-date data in your data warehouse for your use case.


About the authors

Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.

Matt Nispel is an Enterprise Solutions Architect at AWS. He has more than 10 years of experience building cloud architectures for large enterprise companies. At AWS, Matt helps customers rearchitect their applications to take full advantage of the cloud. Matt lives in Minneapolis, Minnesota, and in his free time enjoys spending time with friends and family.

Dan Dressel is a Senior Analytics Specialist Solutions Architect at AWS. He is passionate about databases, analytics, machine learning, and architecting solutions. In his spare time, he enjoys spending time with family, nature walking, and playing foosball.

Improved scalability and resiliency for Amazon EMR on EC2 clusters

Post Syndicated from Ravi Kumar Singh original https://aws.amazon.com/blogs/big-data/improved-scalability-and-resiliency-for-amazon-emr-on-ec2-clusters/

Amazon EMR is the cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning using open-source frameworks such as Apache Spark, Apache Hive, and Presto. Customers asked us for features that would further improve the resiliency and scalability of their Amazon EMR on EC2 clusters, including their large, long-running clusters. We have been hard at work to meet those needs. Over the past 12 months, we have worked backward from customer requirements and launched over 30 new features that improve the resiliency and scalability of your Amazon EMR on EC2 clusters. This post covers some of these key enhancements across three main areas:

  • Improved cluster utilization with optimized scaling experience
  • Minimized interruptions with enhanced resiliency and availability
  • Improved cluster resiliency with upgraded logging and debugging capabilities

Let’s dive into each of these areas.

Improved cluster utilization with optimized scaling experience

Customers use Amazon EMR to run diverse analytics workloads with varying SLAs, ranging from near-real-time streaming jobs to exploratory interactive workloads and everything in between. To cater to these dynamic workloads, you can resize your clusters either manually or by enabling automatic scaling. You can also use the Amazon EMR managed scaling feature to automatically resize your clusters for optimal performance at the lowest possible cost. To ensure swift cluster resizes, we implemented multiple improvements that are available in the latest Amazon EMR releases:

  • Enhanced resiliency of cluster scaling workflow to EC2 Spot Instance interruptions – Many Amazon EMR customers use EC2 Spot Instances for their Amazon EMR on EC2 clusters to reduce costs. Spot Instances are spare Amazon Elastic Compute Cloud (Amazon EC2) compute capacity offered at discounts of up to 90% compared to On-Demand pricing. However, Amazon EC2 can reclaim Spot capacity with a two-minute warning, which can lead to interruptions in workload. We identified an issue where the cluster’s scaling operation gets stuck when over a hundred core nodes launched on Spot Instances are reclaimed by Amazon EC2 throughout the life of the cluster. Starting with Amazon EMR version 6.8.0, we mitigated this issue by fixing a gap in the process HDFS uses to decommission nodes that caused the scaling operations to get stuck. We contributed this improvement back to the open-source community, enabling seamless recovery and efficient scaling in the event of Spot interruptions.
  • Improve cluster utilization by recommissioning recently decommissioned nodes for Spark workloads within seconds – Amazon EMR allows you to scale down your cluster without affecting your workload by gracefully decommissioning core and task nodes. Furthermore, to prevent task failures, Apache Spark ensures that decommissioning nodes are not assigned any new tasks. However, if a new job is submitted immediately before these nodes are fully decommissioned, Amazon EMR will trigger a scale-up operation for the cluster. This results in these decommissioning nodes to be immediately recommissioned and added back into the cluster. Due to a gap in Apache Spark’s recommissioning logic, these recommissioned nodes would not accept new Spark tasks for up to 60 minutes. We enhanced the recommissioning logic, which ensures recommissioned nodes would start accepting new tasks within seconds, thereby improving cluster utilization. This improvement is available in Amazon EMR release 6.11 and higher.
  • Minimized cluster scaling interruptions due to disk over-utilization – The YARN ResourceManager exclude file is a key component of Apache Hadoop that Amazon EMR uses to centrally manage cluster resources for multiple data-processing frameworks. This exclude file contains a list of nodes to be removed from the cluster to facilitate a cluster scale-down operation. With Amazon EMR release 6.11.0, we improved the cluster scaling workflow to reduce scale-down failures. This improvement minimizes failures due to partial updates or corruption in the exclude file caused by low disk space. Additionally, we built a robust file recovery mechanism to restore the exclude file in case of corruption, ensuring uninterrupted cluster scaling operations.

Minimized interruptions with enhanced resiliency and availability

Amazon EMR offers high availability and fault tolerance for your big data workloads. Let’s look at a few key improvements we launched in this area:

  • Improved fault tolerance to hardware reconfiguration – Amazon EMR offers the flexibility to decouple storage and compute. We observed that customers often increase the size of or add incremental block-level storage to their EC2 instances as their data processing volume and concurrency grow. Starting with Amazon EMR release 6.11.0, we made the EMR cluster’s local storage file system more resilient to unpredictable instance reconfigurations such as instance restarts. By addressing scenarios where an instant restart could result in the block storage device name to change, we eliminated the risk of the cluster becoming inoperable or losing data.
  • Reduce cluster startup time for Kerberos-enabled EMR clusters with long-running bootstrap actions – Multiple customers use Kerberos for authentication and run long-running bootstrap actions on their EMR clusters. In Amazon EMR 6.9.0 and higher releases, we fixed a timing sequence mismatch issue that occurs between Apache BigTop and the Amazon EMR on EC2 cluster startup sequence. This timing sequence mismatch occurs when a system attempts to perform two or more operations at the same time instead of doing them in the proper sequence. This issue caused certain cluster configurations to experience instance startup timeouts. We contributed a fix to the open-source community and made additional improvements to the Amazon EMR startup sequence to prevent this condition, resulting in cluster start time improvements of up to 200% for such clusters.

Improved cluster resiliency with upgraded logging and debugging capabilities

Effective log management is essential to ensure log availability and maintain the health of EMR clusters. This becomes especially critical when you’re running multiple custom client tools and third-party applications on your Amazon EMR on EC2 clusters. Customers depend on EMR logs, in addition to EMR events, to monitor cluster and workload health, troubleshoot urgent issues, simplify security audit, and enhance compliance. Let’s look at a few key enhancements we made in this area:

  • Upgraded on-cluster log management daemon – Amazon EMR now automatically restarts the log management daemon if it’s interrupted. The Amazon EMR on-cluster log management daemon archives logs to Amazon Simple Storage Service (Amazon S3) and deletes them from instance storage. This minimizes cluster failures due to disk over-utilization, while allowing the log files to remain accessible even after the cluster or node stops. This upgrade is available in Amazon EMR release 6.10.0 and higher. For more information, see Configure cluster logging and debugging.
  • Enhanced cluster stability with improved log rotation and monitoring – Many of our customers have long-running clusters that have been operating for years. Some open-source application logs such as Hive and Kerberos logs that are never rotated can continue to grow on these long-running clusters. This could lead to disk over-utilization and eventually result in cluster failures. We enabled log rotation for such log files to minimize disk, memory, and CPU over-utilization scenarios. Furthermore, we expanded our log monitoring to include additional log folders. These changes, available starting with Amazon EMR version 6.10.0, minimize situations where EMR cluster resources are over-utilized, while ensuring log files are archived to Amazon S3 for a wider variety of use cases.

Conclusion

In this post, we highlighted the improvements that we made in Amazon EMR on EC2 with the goal to make your EMR clusters more resilient and stable. We focused on improving cluster utilization with the improved and optimized scaling experience for EMR workloads, minimized interruptions with enhanced resiliency and availability for Amazon EMR on EC2 clusters, and improved cluster resiliency with upgraded logging and debugging capabilities. We will continue to deliver further enhancements with new Amazon EMR releases. We invite you to try new features and capabilities in the latest Amazon EMR releases and get in touch with us through your AWS account team to share your valuable feedback and comments. To learn more and get started with Amazon EMR, check out the tutorial Getting started with Amazon EMR.


About the Authors

Ravi Kumar is a Senior Product Manager for Amazon EMR at Amazon Web Services.

Kevin Wikant is a Software Development Engineer for Amazon EMR at Amazon Web Services.

How Amazon Finance Automation built a data mesh to support distributed data ownership and centralize governance

Post Syndicated from Nitin Arora original https://aws.amazon.com/blogs/big-data/how-amazon-finance-automation-built-a-data-mesh-to-support-distributed-data-ownership-and-centralize-governance/

Amazon Finance Automation (FinAuto) is the tech organization of Amazon Finance Operations (FinOps). Its mission is to enable FinOps to support the growth and expansion of Amazon businesses. It works as a force multiplier through automation and self-service, while providing accurate and on-time payments and collections. FinAuto has a unique position to look across FinOps and provide solutions that help satisfy multiple use cases with accurate, consistent, and governed delivery of data and related services.

In this post, we discuss how the Amazon Finance Automation team used AWS Lake Formation and the AWS Glue Data Catalog to build a data mesh architecture that simplified data governance at scale and provided seamless data access for analytics, AI, and machine learning (ML) use cases.

Challenges

Amazon businesses have grown over the years. In the early days, financial transactions could be stored and processed on a single relational database. In today’s business world, however, even a subset of the financial space dedicated to entities such as Accounts Payable (AP) and Accounts Receivable (AR) requires separate systems handling terabytes of data per day. Within FinOps, we can curate more than 300 datasets and consume many more raw datasets from dozens of systems. These datasets can then be used to power front end systems, ML pipelines, and data engineering teams.

This exponential growth necessitated a data landscape that was geared towards keeping FinOps operating. However, as we added more transactional systems, data started to grow in operational data stores. Data copies were common, with duplicate pipelines creating redundant and often out-of-sync domain datasets. Multiple curated data assets were available with similar attributes. To resolve these challenges, FinAuto decided to build a data services layer based on a data mesh architecture. FinAuto wanted to verify that the data domain owners would retain ownership of their datasets while users got access to the data by using a data mesh architecture.

Solution overview

Being customer focused, we started by understanding our data producers’ and consumers’ needs and priorities. Consumers prioritized data discoverability, fast data access, low latency, and high accuracy of data. Producers prioritized ownership, governance, access management, and reuse of their datasets. These inputs reinforced the need of a unified data strategy across the FinOps teams. We decided to build a scalable data management product that is based on the best practices of modern data architecture. Our source system and domain teams were mapped as data producers, and they would have ownership of the datasets. FinAuto provided the data services’ tools and controls necessary to enable data owners to apply data classification, access permissions, and usage policies. It was necessary for domain owners to continue this responsibility because they had visibility to the business rules or classifications and applied that to the dataset. This enabled producers to publish data products that were curated and authoritative assets for their domain. For example, the AR team created and governed their cash application dataset in their AWS account AWS Glue Data Catalog.

With many such partners building their data products, we needed a way to centralize data discovery, access management, and vending of these data products. So we built a global data catalog in a central governance account based on the AWS Glue Data Catalog. The FinAuto team built AWS Cloud Development Kit (AWS CDK), AWS CloudFormation, and API tools to maintain a metadata store that ingests from domain owner catalogs into the global catalog. This global catalog captures new or updated partitions from the data producer AWS Glue Data Catalogs. The global catalog is also periodically fully refreshed to resolve issues during metadata sync processes to maintain resiliency. With this structure in place, we then needed to add governance and access management. We selected AWS Lake Formation in our central governance account to help secure the data catalog, and added secure vending mechanisms around it. We also built a front-end discovery and access control application where consumers can browse datasets and request access. When a consumer requests access, the application validates the request and routes them to a respective producer via internal tickets for approval. Only after the data producer approves the request are permissions provisioned in the central governance account through Lake Formation.

Solution tenets

A data mesh architecture has its own advantages and challenges. By democratizing the data product creation, we removed dependencies on a central team. We made reuse of data possible with data discoverability and minimized data duplicates. This also helped remove data movement pipelines, thereby reducing data transfer and maintenance costs.

We realized, however, that our implementation could potentially impact day-to-day tasks and inhibit adoption. For example, data producers need to onboard their dataset to the global catalog, and complete their permissions management before they can share that with consumers. To overcome this obstacle, we prioritized self-service tools and automation with a reliable and simple-to-use interface. We made interaction, including producer-consumer onboarding, data access request, approvals, and governance, quicker through the self-service tools in our application.

Solution architecture

Within Amazon, we isolate different teams and business processes with separate AWS accounts. From a security perspective, the account boundary is one of the strongest security boundaries in AWS. Because of this, the global catalog resides in its own locked-down AWS account.

The following diagram shows AWS account boundaries for producers, consumers, and the central catalog. It also describes the steps involved for data producers to register their datasets as well as how data consumers get access. Most of these steps are automated through convenience scripts with both AWS CDK and CloudFormation templates for our producers and consumer to use.

Solution Architecture Diagram

The workflow contains the following steps:

  1. Data is saved by the producer in their own Amazon Simple Storage Service (Amazon S3) buckets.
  2. Data source locations hosted by the producer are created within the producer’s AWS Glue Data Catalog.
  3. Data source locations are registered with Lake Formation.
  4. An onboarding AWS CDK script creates a role for the central catalog to use to read metadata and generate the tables in the global catalog.
  5. The metadata sync is set up to continuously sync data schema and partition updates to the central data catalog.
  6. When a consumer requests table access from the central data catalog, the producer grants Lake Formation permissions to the consumer account AWS Identity and Access Management (IAM) role and tables are visible in the consumer account.
  7. The consumer account accepts the AWS Resource Access Manager (AWS RAM) share and creates resource links in Lake Formation.
  8. The consumer data lake admin provides grants to IAM users and roles mapping to data consumers within the account.

The global catalog

The basic building block of our business-focused solutions are data products. A data product is a single domain attribute that a business understands as accurate, current, and available. This could be a dataset (a table) representing a business attribute like a global AR invoice, invoice aging, aggregated invoices by a line of business, or a current ledger balance. These attributes are calculated by the domain team and are available for consumers who need that attribute, without duplicating pipelines to recreate it. Data products, along with raw datasets, reside within their data owner’s AWS account. Data producers register their data catalog’s metadata to the central catalog. We have services to review source catalogs to identify and recommend classification of sensitive data columns such as name, email address, customer ID, and bank account numbers. Producers can review and accept those recommendations, which results in corresponding tags applied to the columns.

Producer experience

Producers onboard their accounts when they want to publish a data product. Our job is to sync the metadata between the AWS Glue Data Catalog in the producer account with the central catalog account, and register the Amazon S3 data location with Lake Formation. Producers and data owners can use Lake Formation for fine-grained access controls on the table. It is also now searchable and discoverable via the central catalog application.

Consumer experience

When a data consumer discovers the data product that they’re interested in, they submit a data access request from the application UI. Internally, we route the request to the data owner for the disposition of the request (approval or rejection). We then create an internal ticket to track the request for auditing and traceability. If the data owner approves the request, we run automation to create an AWS RAM resource share to share with the consumer account covering the AWS Glue database and tables approved for access. These consumers can now query the datasets using the AWS analytics services of their choice like Amazon Redshift Spectrum, Amazon Athena, and Amazon EMR.

Operational excellence

Along with building the data mesh, it’s also important to verify that we can operate with efficiency and reliability. We recognize that the metadata sync process is at the heart of this global data catalog. As such, we are hypervigilant of this process and have built alarms, notifications, and dashboards to verify that this process doesn’t fail silently and create a single point of failure for the global data catalog. We also have a backup repair service that syncs the metadata from producer catalogs into the central governance account catalog periodically. This is a self-healing mechanism to maintain reliability and resiliency.

Empowering customers with the data mesh

The FinAuto data mesh hosts around 850 discoverable and shareable datasets from multiple partner accounts. There are more than 300 curated data products to which producers can provide access and apply governance with fine-grained access controls. Our consumers use AWS analytics services such as Redshift Spectrum, Athena, Amazon EMR, and Amazon QuickSight to access their data. This capability with standardized data vending from the data mesh, along with self-serve capabilities, allows you to innovate faster without dependency on technical teams. You can now get access to data faster with automation that continuously improves the process.

By serving the FinOps team’s data needs with high availability and security, we enabled them to effectively support operation and reporting. Data science teams can now use the data mesh for their finance-related AI/ML use cases such as fraud detection, credit risk modeling, and account grouping. Our finance operations analysts are now enabled to dive deep into their customer issues, which is most important to them.

Conclusion

FinOps implemented a data mesh architecture with Lake Formation to improve data governance with fine-grained access controls. With these improvements, the FinOps team is now able to innovate faster with access to the right data at the right time in a self-serve manner to drive business outcomes. The FinOps team will continue to innovate in this space with AWS services to further provide for customer needs.

To learn more about how to use Lake Formation to build a data mesh architecture, see Design a data mesh architecture using AWS Lake Formation and AWS Glue.


About the Authors

Nitin Arora PicNitin Arora is a Sr. Software Development Manager for Finance Automation in Amazon. He has over 18 years of experience building business critical, scalable, high-performance software. Nitin leads several data and analytics initiatives within Finance, which includes building Data Mesh. In his spare time, he enjoys listening to music and read.

Pradeep Misra PicPradeep Misra is a Specialist Solutions Architect at AWS. He works across Amazon to architect and design modern distributed analytics and AI/ML platform solutions. He is passionate about solving customer challenges using data, analytics, and AI/ML. Outside of work, Pradeep likes exploring new places, trying new cuisines, and playing board games with his family. He also likes doing science experiments with his daughters.

Rajesh Rao PicRajesh Rao is a Sr. Technical Program Manager in Amazon Finance. He works with Data Services teams within Amazon to build and deliver data processing and data analytics solutions for Financial Operations teams. He is passionate about delivering innovative and optimal solutions using AWS to enable data-driven business outcomes for his customers.

Andrew Long PicAndrew Long, the lead developer for data mesh, has designed and built many of the big data processing systems that have fueled Amazon’s financial data processing infrastructure. His work encompasses a range of areas, including S3-based table formats for Spark, diverse Spark performance optimizations, distributed orchestration engines and the development of data cataloging systems. Additionally, Andrew finds pleasure in sharing his knowledge of partner acrobatics.

Satyen GauravKumar Satyen Gaurav, is an experienced Software Development Manager at Amazon, with over 16 years of expertise in big data analytics and software development. He leads a team of engineers to build products and services using AWS big data technologies, for providing key business insights for Amazon Finance Operations across diverse business verticals. Beyond work, he finds joy in reading, traveling and learning strategic challenges of chess.

How AWS helped Altron Group accelerate their vision for optimized customer engagement

Post Syndicated from Jason Yung original https://aws.amazon.com/blogs/big-data/how-aws-helped-altron-group-accelerate-their-vision-for-optimized-customer-engagement/

This is a guest post co-authored by Jacques Steyn, Senior Manager Professional Services at Altron Group.

Altron is a pioneer of providing data-driven solutions for their customers by combining technical expertise with in-depth customer understanding to provide highly differentiated technology solutions. Alongside their partner AWS, they participated in AWS Data-Driven Everything (D2E) workshops and a bespoke AWS Immersion Day workshop that catered to their needs to improve their engagement with their customers.

This post discusses the journey that took Altron from their initial goals, to technical implementation, to the business value created from understanding their customers and their unique opportunities better. They were able to think big but start small with a working solution involving rich business intelligence (BI) and insights provided to their key business areas.

Data-Driven Everything engagement

Altron has provided information technology services since 1965 across South Africa, the Middle East, and Australia. Although the group saw strong results at 2022-year end, the region continues to experience challenging operating conditions with global supply chains disrupted, electronic component shortages, and scarcity of IT talent.

To reflect the needs of their customers spread across different geographies and industries, Altron has organized their operating model across individual Operating Companies (OpCos). These are run autonomously with different sales teams, creating siloed operations and engagement with customers and making it difficult to have a holistic and unified sales motion.

To succeed further, their vision of data requires it to be accessible and actionable to all, with key roles and responsibilities defined by those who produce and consume data, as shown in the following figure. This allows for transparency, speed to action, and collaboration across the group while enabling the platform team to evangelize the use of data:

Altron engaged with AWS to seek advice on their data strategy and cloud modernization to bring their vision to fruition. The D2E program was selected to help identify the best way to think big but start small by working collaboratively to ideate on the opportunities to build data as a product, particularly focused on federating customer profile data in an agile and scalable approach.

Amazon mechanisms such as Working Backwards were employed to devise the most delightful and meaningful solution and put customers at the heart of the experience. The workshop helped devise the think big solution that starting with the Systems Integration (SI) OpCo as the first flywheel turn would be the best way to start small and prototype the initial data foundation collaboratively with AWS Solutions Architects.

Preparing for an AWS Immersion Day workshop

The typical preparation of an AWS Immersion Day involves identifying examples of common use case patterns and utilizing demonstration data. To maximize its success, the Immersion Day was stretched across multiple days as a hands-on workshop to enable Altron to bring their own data, build a robust data pipeline, and scale their long-term architecture. In addition, AWS and Altron identified and resolved any external dependencies, such as network connectivity to data sources and targets, where Altron was able to put the connectivity to the sources in place.

Identifying key use cases

After a number of preparation meetings to discuss business and technical aspects of the use case, AWS and Altron identified two uses cases to resolve their two business challenges:

  • Business intelligence for business-to-business accounts – Altron wanted to focus on their business-to-business (B2B) accounts and customer data. In particular, they wanted to enable their account managers, sales executives, and analysts to use actual data and facts to get a 360 view of their accounts.
    • Goals – Grow revenue, increase the conversion ratio of opportunities, reduce the average sales cycle, improve the customer renewal rate.
    • Target – Dashboards to be refreshed on a daily basis that would provide insights on sales, gross profit, sales pipelines, and customers.
  • Data quality for account and customer data – Altron wanted to enable data quality and data governance best practices.
    • Goals – Lay the foundation for a data platform that can be used in the future by internal and external stakeholders.

Conducting the Immersion Day workshop

Altron set aside 4 days for their Immersion Day, during which time AWS had assigned a dedicated Solutions Architect to work alongside them to build the following prototype architecture:

This solution includes the following components:

  1. AWS Glue is a serverless data integration service that makes it simple to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning, and application development. The Altron team created an AWS Glue crawler and configured it to run against Azure SQL to discover its tables. The AWS Glue crawler populates the table definition with its schema in AWS Glue Data Catalog.
  2. This step consists of two components:
    1. A set of AWS Glue PySpark jobs reads the source tables from Azure SQL and outputs the resulting data in the Amazon Simple Storage Service “Raw Zone”. Basic formatting and readability of the data is standardized here. The jobs run on a scheduled basis, according to the upstream data availability (which currently is daily).
    2. Users are able to manually upload reference files (CSV and Excel format) via the Amazon Web Services console directly to the Amazon S3 buckets. Depending on the frequency of upload, the Altron team will consider automated mechanisms and remove manual upload.
  3. The reporting zone is based on a set of Amazon Athena views, which are consumed for BI purposes. The Altron team used Athena to explore the source tables and create the views in SQL language. Depending on the needs, the Altron team will materialize these views or create corresponding AWS Glue jobs.
  4. Athena exposes the content of the reporting zone for consumption.
  5. The content of the reporting zone is ingested via SPICE in Amazon QuickSight. BI users create dashboards and reports in QuickSight. Business users can access QuickSight dashboards from their mobile, thanks to the QuickSight native application, configured to use single sign-on (SSO).
  6. An AWS Step Functions state machine orchestrates the run of the AWS Glue jobs. The Altron team will expand the state machine to include automated refresh of QuickSight SPICE datasets.
  7. To verify the data quality of the sources through statistically-relevant metrics, AWS Glue Data Quality runs data quality tasks on relevant AWS Glue tables. This can be run manually or scheduled via Amazon Eventbridge (Optional).

Generating business outcomes

In 4 days, the Altron SI team left the Immersion Day workshop with the following:

  • A data pipeline ingesting data from 21 sources (SQL tables and files) and combining them into three mastered and harmonized views that are cataloged for Altron’s B2B accounts.
  • A set of QuickSight dashboards to be consumed via browser and mobile.
  • Foundations for a data lake with data governance controls and data quality checks. The datasets used for the workshop originate from different systems; by integrating the datasets during the workshop implementation, the Altron team can have a comprehensive overview of their customers.

Altron’s sales teams are now able to quickly refresh dashboards encompassing previously disparate datasets that are now centralized to get insights about sales pipelines and forecasts on their desktop or mobile. The technical teams are equally adept at adjusting to business needs by autonomously onboarding new data sources and further enriching the user experience and trust in the data.

Conclusion

In this post, we walked you through the journey the Altron team took with AWS. The outcomes to identify the opportunities that were most pressing to Altron, applying a working backward approach and coming up with a best-fit architecture, led to the subsequent AWS Immersion Day to implement a working prototype that helped them become more data-driven.

With their new focus on AWS skills and mechanisms, increasing trust in their internal data, and understanding the importance of driving change in data literacy and mindset, Altron is better set up for success to best serve their customers in the region.

To find out more about how Altron and AWS can help work backward on your data strategy and employ the agile methodologies discussed in this post, check out Data Management. To learn more about how can help you turn your ideas into solutions, visit the D2E website and the series of AWS Immersion Days that you can choose from. For more hands-on bespoke options, contact your AWS Account Manager, who can provide more details.

Special thanks to everyone at Altron Group who helped contribute to the success of the D2E and Build Lab workshops:

  • The Analysts (Liesl Kok, Carmen Kotze)
  • Data Engineers (Banele Ngemntu, James Owen, Andrew Corry, Thembelani Mdlankomo)
  • QuickSight BI Developers (Ricardo De Gavino Dias, Simei Antoniades)
  • Cloud Administrator (Shamiel Galant)

About the authors

Jacques Steyn runs the Altron Data Analytics Professional Services. He has been leading the building of data warehouses and analytic solutions for the past 20 years. In his free time, he spends time with his family, whether it be on the golf , walking in the mountains, or camping in South Africa, Botswana, and Namibia.

Jason Yung is a Principal Analytics Specialist with Amazon Web Services. Working with Senior Executives across the Europe and Asia-Pacific Regions, he helps customers become data-driven by understanding their use cases and articulating business value through Amazon mechanisms. In his free time, he spends time looking after a very active 1-year-old daughter, alongside juggling geeky activities with respectable hobbies such as cooking sub-par food.

Michele Lamarca is a Senior Solutions Architect with Amazon Web Services. He helps architect and run Solutions Accelerators in Europe to enable customers to become hands-on with AWS services and build prototypes quickly to release the value of data in the organization. In his free time, he reads books and tries (hopelessly) to improve his jazz piano skills.

Hamza is a Specialist Solutions Architect with Amazon Web Services. He runs Solutions Accelerators in EMEA regions to help customers accelerate their journey to move from an idea into a solution in production. In his free time, he spends time with his family, meets with friends, swims in the municipal swimming pool, and learns new skills.