All posts by Amit Maindola

Develop and deploy a generative AI application using Amazon SageMaker Unified Studio

Post Syndicated from Amit Maindola original https://aws.amazon.com/blogs/big-data/develop-and-deploy-a-generative-ai-application-using-amazon-sagemaker-unified-studio/

Picture this: You’re a financial analyst starting your Monday morning with a steaming cup of coffee, ready to review your investment portfolio. But instead of manually scouring dozens of news websites, financial reports, and industry analyses, you simply ask your AI assistant: “What global events happened over the weekend that might impact my technology stock holdings?” Within seconds, you receive a comprehensive analysis of relevant news, sentiment scores, and potential investment implications—all powered by a sophisticated generative AI application you built yourself.

This scenario isn’t science fiction; it’s the reality that modern financial professionals can create today. In an era where information moves at the speed of light and industry conditions can shift dramatically overnight, staying informed isn’t just an advantage—it’s essential for survival in competitive financial landscapes. The challenge lies in processing the overwhelming volume of global information that could impact investments while distinguishing reliable insights from noise.

Amazon SageMaker – Develop and scale AI use cases with the broadest set of tools

Luckily for us, technology is making this more straightforward. The next generation of Amazon SageMaker with Amazon SageMaker Unified Studio is a single data and AI development environment where you can find and access the data in your organization and act on it using the best tools across different use cases. SageMaker Unified Studio brings together the functionality and tools from existing AWS analytics and artificial intelligence and machine learning (AI/ML) services, including Amazon EMR , AWS Glue, Amazon Athena, Amazon Redshift , Amazon Bedrock, and Amazon SageMaker AI. From within SageMaker Unified Studio, you can find, access, and query data and AI assets across your organization, then work together in projects to securely build and share analytics and AI artifacts, including data, models, and generative AI applications.

With SageMaker Unified Studio, you can efficiently build generative AI applications in a trusted and secure environment using Amazon Bedrock. You can choose from a selection of high-performing foundation models (FMs) and advanced customization capabilities like Amazon Bedrock Knowledge Bases, Amazon Bedrock Guardrails, Amazon Bedrock Agents, and Amazon Bedrock Flows. You can rapidly tailor and deploy generative AI applications and share with the built-in catalog for discovery.

What makes SageMaker Unified Studio particularly powerful for organizations is its integration with Amazon Bedrock Flows to build generative AI workflows, which is changing how organizations think about AI application development.

Amazon Bedrock Flows for generative AI application development

With Amazon Bedrock Flows, you can build and execute complex generative AI workflows without writing code, using an intuitive visual interface that democratizes AI development. This capability is transformative for organizations where speed, accuracy, and adaptability are paramount. It offers the following benefits:

  • Visual workflow development – Users can design AI applications by dragging and dropping components onto a canvas, making AI logic transparent and modifiable
  • Business logic flexibility – The service supports complex business logic through conditional branching, multi-path decision trees, and dynamic routing
  • Democratizing AI development – Business experts can directly contribute to AI application development without requiring extensive technical expertise
  • Seamless integration – Amazon Bedrock Flows integrates with FMs, knowledge bases, guardrails, and other AWS services
  • Reduced development complexity – The service handles infrastructure management and scaling through serverless execution and SDK APIs

Solution overview

In this post, we explore a financial use case, in which we want to stay on top of latest global events and determine our investment or financial exposure based on this. We can use a SageMaker Unified Studio flow application to pull in latest news summaries, derive sentiment based on news summary, and determine their effects on my investments. The following diagram illustrates this use case.

In the following sections, we show how to create a new project and build a flow application using a generative AI profile in SageMaker Unified Studio.

Prerequisites

For this walkthrough, you must have the following prerequisites:

  • A demo project – Create a demo project in your SageMaker Unified Studio domain. For instructions, see Create a project. For this example, we choose All capabilities in the project profile section, which includes the generative AI project profile enabled.

Create new project and build a flow application in SageMaker Unified Studio

In this section, we create a new a flow application that uses an Amazon Bedrock knowledge base to provide information about your personal portfolio. Complete the following steps:

  1. In SageMaker Unified Studio, open the project you created as a prerequisite and choose Build and then Flow.

  1. Drag Knowledge Base from Nodes to the design panel to add a knowledge base that will include the user’s investment portfolio and news articles and other information like earnings call transcripts, financial analyst reports, and so on.

  1. Choose the Knowledge Base node and configure the knowledge base as follows:
  2. Add a name for your knowledge base name (for example, portfolio…).
  3. Choose the model (for example, Claude 3.5 Haiku).

  1. Choose Create new Knowledge Base.
  2. Enter a name for the knowledge base.
  3. Select Project data source.
  4. For Select a data source, choose the Amazon Simple Storage Service (Amazon S3) bucket location where you uploaded your data.
  5. Choose Create.

The knowledge base creation process takes a few minutes to complete.

  1. When the knowledge base is ready, choose Save to save it to the flow.

  1. Choose My components, and on the options menu (three vertical dots), choose Sync to sync the knowledge base.

Make sure the S3 bucket has all the data (user portfolio data and latest news information data) before syncing the knowledge base.

We don’t provide any financial or news information data as part of this post. Upload current events or news data and investment portfolio data from your own data sources.

Test the flow application

After the knowledge base sync is complete, you can return to the flow application and ask questions. Using SageMaker Unified Studio flows, a financial analyst can provide a more personalized and customized financial outlook to their customers using rich internal financial information on their customer’s investment portfolio and latest publicly available current events and news information. The following are some example questions that you can ask to test the knowledge base:

Check if Tesla or Apple is in any of user's investment portfolio

Please check latest news information to provide information if Tesla has positive, negative or neutral outlook in the near future

Flow-based applications offer a visual approach to creating complex AI workflows. By chaining different nodes, each optimized for specific functions, you can create sophisticated solutions that are more reliable, maintainable, and efficient than single-prompt approaches. These flows allow for conditional logic and branching paths, mimicking human decision-making processes and enabling more nuanced responses based on context and intermediate results.

Clean up

To avoid ongoing charges in your AWS account, delete the resources you created during this tutorial:

  1. Delete the project.
  2. Delete the domain created as part of the prerequisites.

Conclusion

In this post, we demonstrated how to use Amazon Bedrock Flows in SageMaker Unified Studio to build a sophisticated generative AI application for financial analysis and investment decision-making without extensive coding knowledge. With this integration, you can create sophisticated financial analysis workflows through an intuitive visual interface, where you can process industry data, analyze news sentiment, and assess investment implications in real time. The solution integrates seamlessly with AWS services and FMs while providing essential features like automatic scaling, compliance controls, and audit capabilities. The implementation process involves setting up a SageMaker Unified Studio domain, configuring knowledge bases with portfolio and news data, and creating visual workflows that can analyze complex financial information. This democratized approach to AI development allows both technical and business teams to collaborate effectively, significantly reducing development time while maintaining the sophisticated capabilities needed for modern financial analysis.

To get started, explore the SageMaker Unified Studio documentation, set up a project in your AWS environment, and discover how this solution can transform your organization’s data analytics capabilities.


About the authors

Amit Maindola is a Senior Data Architect focused on data engineering, analytics, and AI/ML at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

Arghya Banerjee is a Sr. Solutions Architect at AWS in the San Francisco Bay Area, focused on helping customers adopt and use the AWS Cloud. He is focused on big data, data lakes, streaming and batch analytics services, and generative AI technologies.

Melody Yang is a Principal Analytics Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering and DataOps.

Gaurav Parekh is a Solutions Architect at AWS, specializing in generative AI and data analytics, with extensive experience building production AI systems on AWS.

Develop and monitor a Spark application using existing data in Amazon S3 with Amazon SageMaker Unified Studio

Post Syndicated from Amit Maindola original https://aws.amazon.com/blogs/big-data/develop-and-monitor-a-spark-application-using-existing-data-in-amazon-s3-with-amazon-sagemaker-unified-studio/

Organizations face significant challenges managing their big data analytics workloads. Data teams struggle with fragmented development environments, complex resource management, inconsistent monitoring, and cumbersome manual scheduling processes. These issues lead to lengthy development cycles, inefficient resource utilization, reactive troubleshooting, and difficult-to-maintain data pipelines.These challenges are especially critical for enterprises processing terabytes of data daily for business intelligence (BI), reporting, and machine learning (ML). Such organizations need unified solutions that streamline their entire analytics workflow.

The next generation of Amazon SageMaker with Amazon EMR in Amazon SageMaker Unified Studio addresses these pain points through an integrated development environment (IDE) where data workers can develop, test, and refine Spark applications in one consistent environment. Amazon EMR Serverless alleviates cluster management overhead by dynamically allocating resources based on workload requirements, and built-in monitoring tools help teams quickly identify performance bottlenecks. Integration with Apache Airflow through Amazon Managed Workflows for Apache Airflow (Amazon MWAA) provides robust scheduling capabilities, and the pay-only-for-resources-used model delivers significant cost savings.

In this post, we demonstrate how to develop and monitor a Spark application using existing data in Amazon Simple Storage Service (Amazon S3) using SageMaker Unified Studio.

Solution overview

This solution uses SageMaker Unified Studio to execute and oversee a Spark application, highlighting its integrated capabilities. We cover the following key steps:

  1. Create an EMR Serverless compute environment for interactive applications using SageMaker Unified Studio.
  2. Create and configure a Spark application.
  3. Use TPC-DS data to build and run the Spark application using a Jupyter notebook in SageMaker Unified Studio.
  4. Monitor application performance and schedule recurring runs with Amazon MWAA integrated.
  5. Analyze results in SageMaker Unified Studio to optimize workflows.

Prerequisites

For this walkthrough, you must have the following prerequisites:

Add EMR Serverless as compute

Complete the following steps to create an EMR Serverless compute environment to build your Spark application:

  1. In SageMaker Unified Studio, open the project you created as a prerequisite and choose Compute.
  2. Choose Data processing, then choose Add compute.
  3. Choose Create new compute resources, then choose Next.

  1. Choose EMR Serverless, then choose Next.

  1. For Compute name, enter a name.
  2. For Release label, choose emr-7.5.0.
  3. For Permission mode, choose Compatibility.
  4. Choose Add compute.

It takes a few minutes to spin up the EMR Serverless application. After it’s created, you can view the compute in SageMaker Unified Studio.

The preceding steps demonstrate how you can set up an Amazon EMR Serverless application in SageMaker Unified Studio to run interactive PySpark workloads. In subsequent steps, we build and monitor Spark applications in an interactive JupyterLab workspace.

Develop, monitor, and debug a Spark application in a Jupyter notebook within SageMaker Unified Studio

In this section, we build a Spark application using the TPC-DS dataset within SageMaker Unified Studio. With Amazon SageMaker Data Processing, you can focus on transforming and analyzing your data without managing compute capacity or open source applications, saving you time and reducing costs. SageMaker Data Processing provides a unified developer experience from Amazon EMR, AWS Glue, Amazon Redshift, Amazon Athena, and Amazon MWAA in a single notebook and query interface. You can automatically provision your capacity on Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2) or EMR Serverless. Scaling rules manage changes to your compute demand to optimize performance and runtimes. Integration with Amazon MWAA simplifies workflow orchestration by alleviating infrastructure management needs. For this post, we use EMR Serverless to read and query the TPC-DS dataset within a notebook and run it using Amazon MWAA.

Complete the following steps:

  1. Upon completion of the previous steps and prerequisites, navigate to SageMaker Studio and open your project.
  2. Choose Build and then JupyterLab.

The notebook takes about 30 seconds to initialize and connect to the space.

  1. Under Notebook, choose Python 3 (ipykernel).
  2. In the first cell, next to Local Python, choose the dropdown menu and choose PySpark.
  3. Choose the dropdown menu next to Project.Spark and choose EMR-S Compute.
  4. Run the following code to develop your Spark application. This example reads a 3 TB TPC-DS dataset in Parquet format from a publicly accessible S3 bucket:
spark.read.parquet("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/store/").createOrReplaceTempView("store")

After the Spark session starts and execution logs start to populate, you can explore the Spark UI and driver logs to further debug and troubleshoot Spark progra The following screenshot shows an example of the Spark UI. The following screenshot shows an example of the driver logs. The following screenshot shows the Executors tab, which provides access to the driver and executor logs.

  1. Use the following code to read some more TPC-DS datasets. You can create temporary views and use the Spark UI to see the files being read. Refer to the appendix at the end of this for details on using the TPC-DS dataset within your buckets.
spark.read.parquet("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/item/").createOrReplaceTempView("item")
spark.read.parquet("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/store_sales/").createOrReplaceTempView("store_sales")
spark.read.parquet("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/date_dim/").createOrReplaceTempView("date_dim")
spark.read.parquet("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/customer/").createOrReplaceTempView("customer")
spark.read.parquet("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/catalog_sales/").createOrReplaceTempView("catalog_sales")
spark.read.parquet("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/web_sales/").createOrReplaceTempView("web_sales")

In each cell of your notebook, you can expand Spark Job Progress to view the stages of the job submitted to EMR Serverless for a specific cell. You can see the time taken to complete each stage. In addition, if a failure occurs, you can examine the logs, making troubleshooting a seamless experience.

Because the files are partitioned based on date key column, you can observe that Spark runs parallel tasks for reads.

  1. Next, get the count across the date time keys on data that is partitioned based on the time key using the following code:
select count(1), ss_sold_date_sk from store_sales group by ss_sold_date_sk order by ss_sold_date_sk

Monitor jobs in the Spark UI

On the Jobs tab of the Spark UI, you can see a list of complete or actively running jobs, with the following details:

  • The action that triggered the job
  • The time it took (for this example, 41 seconds, but timing will vary)
  • The number of stages (2) and tasks (3,428); these are for reference and specific to this specific example

You can choose the job to view more details, particularly around the stages. Our job has two stages; a new stage is created whenever there is a shuffle. We have one stage for the initial reading of each dataset, and one for the aggregation. In the following example, we run some TPC-DS SQL statements that are used for performance and benchmarks:

 with frequent_ss_items as
 (select substr(i_item_desc,1,30) itemdesc,i_item_sk item_sk,d_date solddate,count(*) cnt
  from store_sales, date_dim, item
  where ss_sold_date_sk = d_date_sk
    and ss_item_sk = i_item_sk
    and d_year in (2000, 2000+1, 2000+2,2000+3)
  group by substr(i_item_desc,1,30),i_item_sk,d_date
  having count(*) >4),
 max_store_sales as
 (select max(csales) tpcds_cmax
  from (select c_customer_sk,sum(ss_quantity*ss_sales_price) csales
        from store_sales, customer, date_dim
        where ss_customer_sk = c_customer_sk
         and ss_sold_date_sk = d_date_sk
         and d_year in (2000, 2000+1, 2000+2,2000+3)
        group by c_customer_sk) x),
 best_ss_customer as
 (select c_customer_sk,sum(ss_quantity*ss_sales_price) ssales
  from store_sales, customer
  where ss_customer_sk = c_customer_sk
  group by c_customer_sk
  having sum(ss_quantity*ss_sales_price) > (95/100.0) *
    (select * from max_store_sales))
 select sum(sales)
 from (select cs_quantity*cs_list_price sales
       from catalog_sales, date_dim
       where d_year = 2000
         and d_moy = 2
         and cs_sold_date_sk = d_date_sk
         and cs_item_sk in (select item_sk from frequent_ss_items)
         and cs_bill_customer_sk in (select c_customer_sk from best_ss_customer)
      union all
      (select ws_quantity*ws_list_price sales
       from web_sales, date_dim
       where d_year = 2000
         and d_moy = 2
         and ws_sold_date_sk = d_date_sk
         and ws_item_sk in (select item_sk from frequent_ss_items)
         and ws_bill_customer_sk in (select c_customer_sk from best_ss_customer))) x

You can monitor your Spark job in SageMaker Unified Studio using two methods. Jupyter notebooks provide basic monitoring, showing real-time job status and execution progress. For more detailed analysis, use the Spark UI. You can examine specific stages, tasks, and execution plans. The Spark UI is particularly useful for troubleshooting performance issues and optimizing queries. You can track estimated stages, running tasks, and task timing details. This comprehensive view helps you understand resource utilization and track job progress in depth.

In this section, we explained how you can EMR Serverless compute in SageMaker Unified Studio to build an interactive Spark application. Through the Spark UI, the interactive application provides fine-grained task-level status, I/O, and shuffle details, as well as links to corresponding logs of the task for this stage directly from your notebook, enabling a seamless troubleshooting experience.

Clean up

To avoid ongoing charges in your AWS account, delete the resources you created during this tutorial:

  1. Delete the connection.
  2. Delete the EMR job.
  3. Delete the EMR output S3 buckets.
  4. Delete the Amazon MWAA resources, such as workflows and environments.

Conclusion

In this post, we demonstrated how the next generation of SageMaker, combined with EMR Serverless, provides a powerful solution for developing, monitoring, and scheduling Spark applications using data in Amazon S3. The integrated experience significantly reduces complexity by offering a unified development environment, automatic resource management, and comprehensive monitoring capabilities through Spark UI, while maintaining cost-efficiency through a pay-as-you-go model. For businesses, this means faster time-to-insight, improved team collaboration, and reduced operational overhead, so data teams can focus on analytics rather than infrastructure management.

To get started, explore the Amazon SageMaker Unified Studio User Guide, set up a project in your AWS environment, and discover how this solution can transform your organization’s data analytics capabilities.

Appendix

In the following sections, we discuss how to run a workload on a schedule and provide details about the TPC-DS dataset for building the Spark application using EMR Serverless.

Run a workload on a schedule

In this section, we deploy a JupyterLab notebook and create a workflow using Amazon MWAA. You can use workflows to orchestrate notebooks, querybooks, and more in your project repositories. With workflows, you can define a collection of tasks organized as a directed acyclic graph (DAG) that can run on a user-defined schedule.Complete the following steps:

  1. In SageMaker Unified Studio, choose Build, and under Orchestration, choose Workflows.

  1. Choose Create Workflow in Editor.

You will be redirected to the JupyterLab notebook with a new DAG called untitled.py created under the /src/workflows/dag folder.

  1. We rename this notebook to tpcds_data_queries.py.
  2. You can reuse the existing template with the following updates:
    1. Update line 17 with the schedule you want your code to run.
    2. Update line 26 with your NOTEBOOK_PATH. This should be in src/<notebook_name>.ipynb. Note the name of the automatically generated dag_id; you can name it based on your requirements.

  1. Choose File and Save notebook.

To test, you can trigger a manual run of your workload.

  1. In SageMaker Unified Studio, choose Build, and under Orchestration, choose Workflows.
  2. Choose your workflow, then choose Run.

You can monitor the success of your job on the Runs tab.

To debug your notebook job by accessing the Spark UI within your Airflow job console, you must use EMR Serverless Airflow Operators to submit your job. The link is available on the Details tab of your query.

This option has the following key limitations: it’s not available for Amazon EMR on EC2, and SageMaker notebook job operators don’t work.

You can configure the operator to generate one-time links to the application UIs and Spark stdout logs by passing enable_application_ui_links=True as a parameter. After the job starts running, these links are available on the Details tab of the relevant task. If enable_application_ui_links=False, then the links will be present but grayed out.

Make sure you have the emr-serverless:GetDashboardForJobRun AWS Identity and Access Management (IAM) permissions to generate the dashboard link.

Open the Airflow UI for your job. The Spark UI and history server dashboard options are visible on the Details tab, as shown in the following screenshot.

The following screenshot shows the Jobs tab of the Spark UI.

Use the TPC-DS dataset to build the Spark application using EMR Serverless

To use the TPC-DS dataset to run the Spark application against a dataset in an S3 bucket, you need to copy the TPC-DS dataset into your S3 bucket:

  1. Create a new S3 bucket in your test account if needed. In the following code, replace $YOUR_S3_BUCKET with your S3 bucket name. We suggest you export YOUR_S3_BUCKET as an environment variable:
<Your bucket name>
  1. Copy the TPC-DS source data as input to your S3 bucket. If it’s not exported as an environment variable, replace $YOUR_S3_BUCKET with your S3 bucket name:
aws s3 sync s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/ s3://$YOUR_S3_BUCKET/blog/BLOG_TPCDS-TEST-3T-partitioned/

About the Authors

Amit Maindola is a Senior Data Architect focused on data engineering, analytics, and AI/ML at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

Abhilash is a senior specialist solutions architect at Amazon Web Services (AWS), helping public sector customers on their cloud journey with a focus on AWS Data and AI services. Outside of work, Abhilash enjoys learning new technologies, watching movies, and visiting new places.

How Stifel built a modern data platform using AWS Glue and an event-driven domain architecture

Post Syndicated from Amit Maindola original https://aws.amazon.com/blogs/big-data/how-stifel-built-a-modern-data-platform-using-aws-glue-and-an-event-driven-domain-architecture/

Stifel Financial Corp. is an American multinational independent investment bank and financial services company, founded in 1890 and headquartered in downtown St. Louis, Missouri. Stifel offers securities-related financial services in the United States and Europe through several wholly owned subsidiaries. Stifel provides both equity and fixed income research and is the largest provider of US equity research.

In this post, we show you how Stifel implemented a modern data platform using AWS services and open data standards, building an event-driven architecture for domain data products while centralizing the metadata to facilitate discovery and sharing of data products.

Stifel’s modern data platform use case

Stifel envisioned a data platform that delivers accurate, timely, and properly governed data, providing consistency throughout the organization whenever users access the information. This approach showed limitations as the data complexity increased, data volumes grew, and demand for quick, business-driven insights rose. These challenges are encountered by financial institutions worldwide, leading to a reassessment of traditional data management practices. Under the federated governance model, Stifel developed a modern data strategy based on the following objectives:

  • Managing ingestion and metadata
  • Creating source-aligned data products complying with Stifel business streams
  • Integrating source-aligned data products from other domains (Stifel business units)
  • Producing consumer-aligned data products for specific business purposes
  • Publishing data products to a centralized data catalog

Some of the Stifel challenges highlighted in the preceding list required building a data platform that can:

  • Boost agility by democratizing data, thus reducing time to market and enhancing the customer experience
  • Improve data quality and trust in the data
  • Standardize tools and eliminate the shadow information technology (IT) culture to increase scalability, reduce risk, and minimize operational inefficiencies

Following the federated governance model, Stifel has organized its domain structure to provide autonomy to various functional teams while preserving the core values of data mesh. The following diagram depicts a high-level architecture of the data mesh implementation at Stifel.

Each data domain has the flexibility to create data products that can be published to the centralized catalog, while maintaining the autonomy for teams to develop data products that are exclusively accessible to teams within the domain. These products aren’t available to others until they are deemed ready for broader enterprise use. Domains have the freedom to decide which data they want to share. They can either:

  • Make their data products visible to everyone through the central catalog
  • Keep their data products visible only within their own domain

By implementing an event-driven domain architecture, organizations can achieve significant business advantages while positioning themselves for future growth and innovation. Stifel data products refreshes were dependent on data assets with variable cadence. Event-driven architecture enables real-time or near real-time updates by allowing data products to automatically respond to changes in underlying data assets as they occur, rather than relying on fixed batch schedules that might miss critical updates or waste resources on unnecessary refreshes. The key is to carefully plan the implementation and make sure of alignment with business objectives while considering both technical and organizational factors. This architecture style particularly suits organizations that:

  • Need real-time processing capabilities
  • Have complex domain interactions
  • Require high scalability
  • Want to improve business agility
  • Need better system integration
  • Are pursuing digital transformation

The following are some of the key AWS Services that helped Stifel to build their modern data platform.

  • AWS Glue is a serverless data integration service that’s used for data processing to build data assets and data products in the domains. Data is also cataloged in AWS Glue Catalog, making it straightforward to discover and query with supported engines.
  • Amazon EventBridge provides a scalable and flexible serverless event bus that facilitates seamless communication between different domains and services. By using EventBridge, Stifel was able to implement a publish-subscribe model where domain events can be emitted, filtered, and routed to appropriate consumers based on configurable rules. EventBridge supports custom event buses for domain-specific events, enabling clear separation of concerns and improved manageability.
  • AWS Lake Formation helped in providing centralized security, governance, and catalog capabilities while preserving domain autonomy in data product creation and management. With Lake Formation, data domains were able to maintain their independent data products within a federated structure while enforcing consistent access controls, data quality standards, and metadata management across the organization.
  • Apache Hudi on Amazon Simple Storage Service (Amazon S3) offers an optimized way to store data assets and products and promotes interoperability across other services.

Stifel’s solution architecture

The following diagram illustrates the data mesh architecture that Stifel uses to build a domain-driven architecture. In this system, various domains create data products and share them with other domains through a central governance account that uses Lake Formation.

Let’s look at some of the key design components that are being used to enable and implement data mesh and event driven design

Data ingestion framework

The data ingestion framework consists of several processor modules that are built using several AWS services and metadata driven architecture. The following diagram shows the architecture of the raw data ingestion framework.

The framework gets raw data files from both internal Stifel systems and third-party data sources. These files are processed and stored in a raw data ingestion account on Amazon S3 in open table format Apache Hudi. This stored data is then shared with different parts of the organization, called data domains. Each domain can use this shared data to create their own data products.

As a file (in CSV, XML, JSON and custom formats) lands into the landing bucket, an Amazon S3 event notification is created and placed in an Amazon Simple Queue Service (Amazon SQS)queue. The Amazon SQS queue triggers an AWS Lambda function and saves the metadata (such as the name of the file, date and time the file was received, and the file size) to a file audit data store (Amazon Aurora PostgreSQL-Compatible Edition).

An EventBridge time scheduler invokes an AWS Step Functions workflow at pre-determined intervals. The Step Functions workflow orchestrates the batch ingestion from raw to staging layer.

  1. The Step Functions workflow orchestrates a set of Lambda functions to get the list of unprocessed raw files from the audit data store and create batches of raw files to process them in parallel. The Step Functions workflow then triggers parallel AWS Glue jobs that process each batch of raw files.
  2. Each raw file is validated for any data quality checks and the data is saved to staging tables in Hudi format. Any errors encountered are logged into an audit table and a notification is generated for support team. For all successfully processed raw files, the file status is updated to PROCESSED and logged into an audit table.
  3. After the Hudi table is updated, a data refresh event is sent to EventBridge and then passed to the Central Mesh Account. The Central Mesh Account forwards these events to the data domains to notify them that the raw tables are refreshed, allowing the data domains to use this data for creating their own data products.

Event driven data product refresh

The Stifel data lake is based on a data mesh architecture where several data producers share data across data domains. A mechanism is needed to alert consumers who depend on other data producers’ data products when those source data products are refreshed, so that the consumers can update their own data products accordingly. The following diagram describes the technical architecture of event-based data processing. The central governance account acts as the central event bus, which receives all data refresh events from all data producers. The central event bus forwards the events to consumer accounts. The consumer accounts filter the events consumers are interested in from data producers for their data processing needs.

Orchestration design

Stifel designed and implemented an event-based data pipeline orchestration system that triggers data pipelines when specific events occur. This system processes data immediately after receiving all required dependency events, enabling efficient workflow management.

The following diagram describes the logical architecture of the domain data pipeline orchestration framework.

The orchestration framework includes the components described in the following list. The data dependencies and data pipeline state management metadata are hosted in an Aurora PostgreSQL database.

  1. Data refresh processor: Receives data refresh events from central mesh and local data domain and evaluates if the domain data products data dependencies are met
  2. Data product dependency processor: Retrieves metadata for the product, kicks off a corresponding data domain AWS Glue job, and updates metadata with the job information
  3. Data pipeline state change processor: Monitors the domain data jobs and takes actions based on the job’s final status (SUCCEED or FAILED) and then creates incident tickets for failed jobs

Conclusion

Stifel has improved its data management and reduced data silos by adopting a data product approach. This strategy has positioned Stifel to become a data-driven, customer-centric organization. The company combines federated platform practices with AWS and open standards. As a result, Stifel is achieving its decentralization objectives through a scalable data platform. This platform empowers domain teams to make informed decisions, drive innovation, and maintain a competitive edge. Here are the some of the advantages Stifel got from an event-driven domain architecture (EDDA):

  • Business agility: Rapid market response, new business capability integration, scalable domains, quicker feature deployment, and flexible process modification
  • Customer experience: Real-time processing, responsive interactions, personalized services, consistent omnichannel presence, and enhanced service availability
  • Operational efficiency: Reduced system coupling, optimal resource use, scalable systems, lower maintenance overhead, and efficient data processing
  • Cost benefits: Lower development costs, reduced infrastructure expenses, decreased maintenance costs, efficient resource usage, and a better ROI on technology investments

In this post, we demonstrated how Stifel is building a modern data platform by recognizing the critical importance of data in today’s financial landscape. This strategic approach not only enhances operational efficiency but also positions Stifel at the forefront of technological innovation in the financial services industry. To learn more and get started, see the following resources:


About the authors

Amit Maindola is a Senior Data Architect focused on data engineering, analytics, and AI/ML at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

Srinivas Kandi is a Senior Architect at Stifel focusing on delivering the next generation of cloud data platform on AWS. Prior to joining Stifel, Srini was a delivery specialist in cloud data analytics at AWS helping several customers in their transformational journey into AWS cloud. In his free time, Srini likes to explore cooking, travel and learn new trends and innovations in AI and cloud computing.

Hossein Johari is a seasoned data and analytics leader with over 25 years of experience architecting enterprise-scale platforms. As Lead and Senior Architect at Stifel Financial Corp. in St. Louis, Missouri, he spearheads initiatives in Data Platforms and Strategic Solutions, driving the design and implementation of innovative frameworks that support enterprise-wide analytics, strategic decision-making, and digital transformation. Known for aligning technical vision with business objectives, he works closely with cross-functional teams to deliver scalable, forward-looking solutions that advance organizational agility and performance.

Ahmad Rawashdeh is a Senior Architect at Stifel Financial. He supports Stifel and its clients in designing, implementing, and building scalable and reliable data architectures on Amazon Web Services (AWS), with a strong focus on data lake strategies, database services, and efficient data ingestion and transformation pipelines.

Lei Meng is a data architect at Stifel. His focus is working in designing and implementing scalable and secure data solutions on the AWS and helping Stifel’s cloud migration from on-premises systems.

Run Apache Hive workloads using Spark SQL with Amazon EMR on EKS

Post Syndicated from Amit Maindola original https://aws.amazon.com/blogs/big-data/run-apache-hive-workloads-using-spark-sql-with-amazon-emr-on-eks/

Apache Hive is a distributed, fault-tolerant data warehouse system that enables analytics at a massive scale. Using Spark SQL to run Hive workloads provides not only the simplicity of SQL-like queries but also taps into the exceptional speed and performance provided by Spark. Spark SQL is an Apache Spark module for structured data processing. One of its most popular use cases is to read and write Hive tables with connectivity to a persistent Hive metastore, supporting Hive SerDes and user-defined functions.

Starting from version 1.2.0, Apache Spark has supported queries written in HiveQL. HiveQL is a SQL-like language that produces data queries containing business logic, which can be converted to Spark jobs. However, this feature is only supported by YARN or standalone Spark mode. To run HiveQL-based data workloads with Spark on Kubernetes mode, engineers must embed their SQL queries into programmatic code such as PySpark, which requires additional effort to manually change code.

Amazon EMR on Amazon EKS provides a deployment option for Amazon EMR that you can use to run open-source big data frameworks on Amazon Elastic Kubernetes Service (Amazon EKS).

Amazon EMR on EKS release 6.7.0 and later include the ability to run SparkSQL through the StartJobRun API. As a result of this enhancement, customers will now be able to supply SQL entry-point files and run HiveQL queries as Spark jobs on EMR on EKS directly. The feature is available in all AWS Regions where EMR on EKS is available.

Use case

FINRA is one of the largest Amazon EMR customers that is running SQL-based workloads using the Hive on Spark approach. FINRA, Financial Industry Regulatory Authority, is a private sector regulator responsible for analyzing equities and option trading activity in the US. To look for fraud, market manipulation, insider trading, and abuse, FINRA’s technology group has developed a robust set of big data tools in the AWS Cloud to support these activities.

FINRA centralizes all its data in Amazon Simple Storage Service (Amazon S3) with a remote Hive metastore on Amazon Relational Database Service (Amazon RDS) to manage their metadata information. They use various AWS analytics services, such as Amazon EMR, to enable their analysts and data scientists to apply advanced analytics techniques to interactively develop and test new surveillance patterns and improve investor protection. To make these interactions more efficient and productive, FINRA modernized their hive workloads in Amazon EMR from its legacy Hive on MapReduce to Hive on Spark, which resulted in query performance gains between 50 and 80 percent.

Going forward, FINRA wants to further innovate the interactive big data platform by moving from a monolithic design pattern to a job-centric paradigm, so that it can fulfill future capacity requirements as its business grows. The capability of running Hive workloads using SparkSQL directly with EMR on EKS is one of the key enablers that helps FINRA continuously pursue that goal.

Additionally, EMR on EKS offers the following benefits to accelerate adoption:

  • Fine-grained access controls (IRSA) that are job-centric to harden customers’ security posture
  • Minimized adoption effort as it enables direct Hive query submission as a Spark job without code changes
  • Reduced run costs by consolidating multiple software versions for Hive or Spark, unifying artificial intelligence and machine learning (AI/ML) and exchange, transform, and load (ETL) pipelines into a single environment
  • Simplified cluster management through multi-Availability Zone support and highly responsive autoscaling and provisioning
  • Reduced operational overhead by hosting multiple compute and storage types or CPU architectures (x86 & Arm64) in a single configuration
  • Increased application reusability and portability supported by custom docker images, which allows them to encapsulate all necessary dependencies

Running Hive SQL queries on EMR on EKS

Prerequisites

Make sure that you have AWS Command Line Interface (AWS CLI) version 1.25.70 or later installed. If you’re running AWS CLI version 2, you need version 2.7.31 or later. Use the following command to check your AWS CLI version:

aws --version

If necessary, install or update the latest version of the AWS CLI.

Solution Overview

To get started, let’s look at the following diagram. It illustrates a high-level architectural design and different services that can be used in the Hive workload. To match with FINRA’s use case, we chose an Amazon RDS database as the remote Hive metastore. Alternatively, you can use AWS Glue Data Catalog as the metastore for Hive if needed. For more details, see the aws-sample github project.

The minimum required infrastructure is:

  • An S3 bucket to store a Hive SQL script file
  • An Amazon EKS cluster with EMR on EKS enabled
  • An Amazon RDS for MySQL database in the same virtual private cloud (VPC) as the Amazon EKS cluster
  • A standalone Hive metastore service (HMS) running on the EKS cluster or a small Amazon EMR on EC2 cluster with the Hive application installed

To have a quick start, run the sample CloudFormation deployment. The infrastructure deployment includes the following resources:

Create a Hive script file

Store a few lines of Hive queries in a single file, then upload the file to your S3 bucket, which can be found in your AWS Management Console in the AWS CloudFormation Outputs tab. Search for the key value of CODEBUCKET as shown in preceding screenshot. For a quick start, you can skip this step and use the sample file stored in s3://<YOUR_S3BUCKET>/app_code/job/set-of-hive-queries.sql. The following is a code snippet from the sample file :

-- drop database in case switch between different hive metastore

DROP DATABASE IF EXISTS hiveonspark CASCADE;
CREATE DATABASE hiveonspark;
USE hiveonspark;

--create hive managed table
DROP TABLE IF EXISTS testtable purge;
CREATE TABLE IF NOT EXISTS testtable (`key` INT, `value` STRING) using hive;
LOAD DATA LOCAL INPATH '/usr/lib/spark/examples/src/main/resources/kv1.txt' INTO TABLE testtable;
SELECT * FROM testtable WHERE key=238;

-- test1: add column
ALTER TABLE testtable ADD COLUMNS (`arrayCol` Array<int>);
-- test2: insert
INSERT INTO testtable VALUES 
(238,'val_238',array(1,3)),
(238,'val_238',array(2,3));
SELECT * FROM testtable WHERE key=238;
-- test3: UDF
CREATE TEMPORARY FUNCTION hiveUDF AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDTFExplode';
SELECT `key`,`value`,hiveUDF(arrayCol) FROM testtable WHERE key=238;
-- test4: CTAS table with parameter
DROP TABLE IF EXISTS ctas_testtable purge;
CREATE TABLE ctas_testtable 
STORED AS ORC
AS
SELECT * FROM testtable;
SELECT * FROM ctas_testtable WHERE key=${key_ID};
-- test5: External table mapped to S3
CREATE EXTERNAL TABLE IF NOT EXISTS amazonreview
( 
  marketplace string, 
  customer_id string, 
  review_id  string, 
  product_id  string, 
  product_parent  string, 
  product_title  string, 
  star_rating  integer, 
  helpful_votes  integer, 
  total_votes  integer, 
  vine  string, 
  verified_purchase  string, 
  review_headline  string, 
  review_body  string, 
  review_date  date, 
  year  integer
) 
STORED AS PARQUET 
LOCATION 's3://${S3Bucket}/app_code/data/toy/';
SELECT count(*) FROM amazonreview;

Submit the Hive script to EMR on EKS

First, set up the required environment variables. See the shell script post-deployment.sh:

stack_name='HiveEMRonEKS'
export VIRTUAL_CLUSTER_ID=$(aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?OutputKey=='VirtualClusterId'].OutputValue" --output text)
export EMR_ROLE_ARN=$(aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?OutputKey=='EMRExecRoleARN'].OutputValue" --output text)
export S3BUCKET=$(aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?OutputKey=='CODEBUCKET'].OutputValue" --output text)

Connect to the demo EKS cluster:

echo `aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?starts_with(OutputKey,'eksclusterEKSConfig')].OutputValue" --output text` | bash
kubectl get svc

Ensure the entryPoint path is correct, then submit the set-of-hive-queries.sql to EMR on EKS.

aws emr-containers start-job-run \
--virtual-cluster-id $VIRTUAL_CLUSTER_ID \
--name sparksql-test \
--execution-role-arn $EMR_ROLE_ARN \
--release-label emr-6.8.0-latest \
--job-driver '{
  "sparkSqlJobDriver": {
      "entryPoint": "s3://'$S3BUCKET'/app_code/job/set-of-hive-queries.sql",
      "sparkSqlParameters": "-hivevar S3Bucket='$S3BUCKET' -hivevar Key_ID=238"}}' \
--configuration-overrides '{
    "applicationConfiguration": [
      {
        "classification": "spark-defaults", 
        "properties": {
          "spark.sql.warehouse.dir": "s3://'$S3BUCKET'/warehouse/",
          "spark.hive.metastore.uris": "thrift://hive-metastore:9083"
        }
      }
    ], 
    "monitoringConfiguration": {
      "s3MonitoringConfiguration": {"logUri": "s3://'$S3BUCKET'/elasticmapreduce/emr-containers"}}}'

Note that the shell script referenced the set-of-hive-queries.sql Hive script file as an entry point script. It uses the sparkSqlJobDriver attribute, not the usual sparkSubmitJobDriver designed for Spark applications. In the sparkSqlParameters section, we pass in two environment variables S3Bucket and key_ID to the Hive script.

The property "spark.hive.metastore.uris": "thrift://hive-metastore:9083" sets a connection to a Hive Metastore Service (HMS) called hive-metastore, which is running as a Kubernetes service on the demo EKS cluster as shown in the follow screenshot. If you’re running the thrift service on Amazon EMR on EC2, the URI should be thrift://<YOUR_EMR_MASTER_NODE_DNS_NAME>:9083. If you chose AWS Glue Data Catalog as your Hive metastore, replace the entire property with "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory".

Finally, check the job status using the kubectl command line tool: kubectl get po -n emr --watch

Expected output

  1. Go to the Amazon EMR console.
  2. Navigate to the side menu Virtual clusters, then select the HiveDemo cluster, You can see an entry for the SparkSQL test job.
  3. Click Spark UI hyperlink to monitor each query’s duration and status on a web interface.
  4. To query the Amazon RDS based Hive metastore, you need a MYSQL client tool installed. To make it easier, the sample CloudFormation template has installed the query tool on master node of a small Amazon EMR on EC2 cluster.
  5. Find the EMR master node by running the following command:
aws ec2 describe-instances --filter Name=tag:project,Values=$stack_name Name=tag:aws:elasticmapreduce:instance-group-role,Values=MASTER --query 'Reservations[].Instances[].InstanceId[]'

  1. Go to the Amazon EC2 console and connect to the master node through the Session Manager.
  2. Before querying the MySQL RDS database (the Hive metastore), run the following commands on your local machine to get the credentials:
    stack_name='HiveEMRonEKS' 
    export secret_name=$(aws cloudformation describe-stacks --stack-name $stack_name --query "Stacks[0].Outputs[?OutputKey=='HiveSecretName'].OutputValue" --output text) 
    export HOST_NAME=$(aws secretsmanager get-secret-value --secret-id $secret_name --query SecretString --output text | jq -r '.host')
    export PASSWORD=$(aws secretsmanager get-secret-value --secret-id $secret_name --query SecretString --output text | jq -r '.password')
    export DB_NAME=$(aws secretsmanager get-secret-value --secret-id $secret_name --query SecretString --output text | jq -r '.dbname')
    export USER_NAME=$(aws secretsmanager get-secret-value --secret-id $secret_name --query SecretString --output text | jq -r '.username')
    echo -e "\n host: $HOST_NAME\n DB: $DB_NAME\n passowrd: $PASSWORD\n username: $USER_NAME\n"
    

  3. After connected through Session Manager, query the Hive metastore from your Amazon EMR master node.
    mysql -u admin -P 3306 -p -h <YOUR_HOST_NAME>
    Enter password:<YOUR_PASSWORD>
    
    # Query the metastore
    MySQL[(none)]> Use HiveEMRonEKS;
    MySQL[HiveEMRonEKS]> select * from DBS;
    MySQL[HiveEMRonEKS]> select * from TBLS;
    MySQL[HiveEMRonEKS]> exit();

  4. Validate the Hive tables (created by set-of-hive-queries.sql) through the interactive Hive CLI tool.

Note:-Your query environment must have the Hive Client tool installed and a connection to your Hive metastore or AWS Glue Data Catalog. For the testing purpose, you can connect to the same Amazon EMR on EC2 master node and query your Hive tables. The EMR cluster has been pre-configured with the required setups.

sudo su
hive
hive> show databases;

Clean up

To avoid incurring future charges, delete the resources generated if you don’t need the solution anymore. Run the following cleanup script.

curl https://raw.githubusercontent.com/aws-samples/hive-emr-on-eks/main/deployment/app_code/delete_all.sh | bash

Go to the CloudFormation console and manually delete the remaining resources if needed.

Conclusion

Amazon EMR on EKS releases 6.7.0 and higher include a Spark SQL job driver so that you can directly run Spark SQL scripts via the StartJobRun API. Without any modifications to your existing Hive scripts, you can directly execute them as a SparkSQL job on Amazon EMR on EKS.

FINRA is one of the largest Amazon EMR customers. It runs over 400 Hive clusters for its analysts who need to interactively query multi-petabyte data sets. Modernizing its Hive workloads with SparkSQL gives FINRA a 50 to 80 percent query performance improvement. The support to run Spark SQL through the StartJobRun API in EMR on EKS has further enabled FINRA’s innovation in data analytics.

In this post, we demonstrated how to submit a Hive script to Amazon EMR on EKS and run it as a SparkSQL job. We encourage you to give it a try and are keen to hear your feedback and suggestions.


About the authors

Amit Maindola is a Senior Data Architect focused on big data and analytics at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

Melody Yang is a Senior Big Data Solutions Architect for Amazon EMR at AWS. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering, and DataOps.

Get started with Apache Hudi using AWS Glue by implementing key design concepts – Part 1

Post Syndicated from Amit Maindola original https://aws.amazon.com/blogs/big-data/part-1-get-started-with-apache-hudi-using-aws-glue-by-implementing-key-design-concepts/

Many organizations build data lakes on Amazon Simple Storage Service (Amazon S3) using a modern architecture for a scalable and cost-effective solution. Open-source storage formats like Parquet and Avro are commonly used, and data is stored in these formats as immutable files. As the data lake is expanded to additional use cases, there are still some use cases that are very difficult with data lakes, such as CDC (change data capture), time travel (querying point-in-time data), privacy regulation requiring deletion of data, concurrent writes, and consistency regarding handling small file problems.

Apache Hudi is an open-source transactional data lake framework that greatly simplifies incremental data processing and streaming data ingestion. However, organizations new to data lakes may struggle to adopt Apache Hudi due to unfamiliarity with the technology and lack of internal expertise.

In this post, we show how to get started with Apache Hudi, focusing on the Hudi CoW (Copy on Write) table type on AWS using AWS Glue, and implementing key design concepts for different use cases. We expect readers to have a basic understanding of data lakes, AWS Glue, and Amazon S3. We walk you through common batch data ingestion use cases with actual test results using a TPC-DS dataset to show how the design decisions can influence the outcome.

Apache Hudi key concepts

Before diving deep into the design concepts, let’s review the key concepts of Apache Hudi, which is important to understand before you make design decisions.

Hudi table and query types

Hudi supports two table types: Copy on Write (CoW) and Merge on Read (MoR). You have to choose the table type in advance, which influences the performance of read and write operations.

The difference in performance depends on the volume of data, operations, file size, and other factors. For more information, refer to Table & Query Types.

When you use the CoW table type, committed data is implicitly compacted, meaning it’s updated to columnar file format during write operation. With the MoR table type, data isn’t compacted with every commit. As a result, for the MoR table type, compacted data lives in columnar storage (Parquet) and deltas are stored in a log (Avro) raw format until compaction merges changes the data to columnar file format. Hudi supports snapshot, incremental, and read-optimized queries for Hudi tables, and the output of the result depends on the query type.

Indexing

Indexing is another key concept for the design. Hudi provides efficient upserts and deletes with fast indexing for both CoW and MoR tables. For CoW tables, indexing enables fast upsert and delete operations by avoiding the need to join against the entire dataset to determine which files to rewrite. For MoR, this design allows Hudi to bound the amount of records any given base file needs to be merged against. Specifically, a given base file needs to be merged only against updates for records that are part of that base file. In contrast, designs without an indexing component could end up having to merge all the base files against all incoming update and delete records.

Solution overview

The following diagram describes the high-level architecture for our solution. We ingest the TPC-DS (store_sales) dataset from the source S3 bucket in CSV format and write it to the target S3 bucket using AWS Glue in Hudi format. We can query the Hudi tables on Amazon S3 using Amazon Athena and AWS Glue Studio Notebooks.

The following diagram illustrates the relationships between our tables.

For our post, we use the following tables from the TPC-DS dataset: one fact table, store_sales, and the dimension tables store, item, and date_dim. The following table summarizes the table row counts.

Table Approximate Row Counts
store_sales 2.8 billion
store 1,000
item 300,000
date_dim 73,000

Set up the environment

After you sign in to your test AWS account, launch the provided AWS CloudFormation template by choosing Launch Stack:

Launch Button

This template configures the following resources:

  • AWS Glue jobs hudi_bulk_insert, hudi_upsert_cow, and hudi_bulk_insert_dim. We use these jobs for the use cases covered in this post.
  • An S3 bucket to store the output of the AWS Glue job runs.
  • AWS Identity and Access Management (IAM) roles and policies with appropriate permissions.

Before you run the AWS Glue jobs, you need to subscribe to the AWS Glue Apache Hudi Connector (latest version: 0.10.1). The connector is available on AWS Marketplace. Follow the connector installation and activation process from the AWS Marketplace link, or refer to Process Apache Hudi, Delta Lake, Apache Iceberg datasets at scale, part 1: AWS Glue Studio Notebook to set it up.

After you create the Hudi connection, add the connector name to all the AWS Glue scripts under Advanced properties.

Bulk insert job

To run the bulk insert job, choose the job hudi_bulk_insert on the AWS Glue console.

The job parameters as shown in the following screenshot are added as part of the CloudFormation stack setup. You can use different values to create CoW partitioned tables with different bulk insert options.

The parameters are as follows:

  • HUDI_DB_NAME – The database in the AWS Glue Data Catalog where the catalog table is created.
  • HUDI_INIT_SORT_OPTION – The options for bulk_insert include GLOBAL_SORT, which is the default. Other options include NONE and PARTITION_SORT.
  • HUDI_TABLE_NAME – The table name prefix that you want to use to identify the table created. In the code, we append the sort option to the name you specify in this parameter.
  • OUTPUT_BUCKET – The S3 bucket created through the CloudFormation stack where the Hudi table datasets are written. The bucket name format is <account number><bucket name>. The bucket name is the one given while creating the CloudFormation stack.
  • CATEGORY_ID – The default for this parameter is ALL, which processes categories of test data in a single AWS Glue job. To test the parallel on the same table, change the parameter value to one of categories from 3, 5, or 8 for the dataset that we use for each parallel AWS Glue job.

Upsert job for the CoW table

To run the upsert job, choose the job hudi_upsert_cow on the AWS Glue console.

The following job parameters are added as part of the CloudFormation stack setup. You can run upsert and delete operations on CoW partitioned tables with different bulk insert options based on the values provided for these parameters.

  • OUTPUT-BUCKET – The same value as the previous job parameter.
  • HUDI_TABLE_NAME – The name of the table created in your AWS Glue Data Catalog.
  • HUDI_DB_NAME – The same value as the previous job parameter. The default value is Default.

Bulk insert job for the Dimension tables

To test the queries on the CoW tables, the fact table that is created using the bulk insert operation needs supplemental dimensional tables. This AWS Glue job has to be run before you can test the TPC queries provided later in this post. To run this job, choose hudi_bulk_insert_dim on the AWS Glue console and use the parameters shown in the following screenshot.

The parameters are as follows:

  • OUTPUT-BUCKET – The same value as the previous job parameter.
  • HUDI_INIT_SORT_OPTION – The options for bulk_insert include GLOBAL_SORT, which is the default. Other available options are NONE and PARTITION_SORT.
  • HUDI_DB_NAME – The Hudi database name. Default is the default value.

Hudi design considerations

In this section, we walk you through a few use cases to demonstrate the difference in the outcome for different settings and operations.

Data migration use case

In Apache Hudi, you ingest the data into CoW or MoR tables types using either insert, upsert, or bulk insert operations. Data migration initiatives often involve one-time initial loads into the target datastore, and we recommend using the bulk insert operation for initial loads.

The bulk insert option provides the same semantics as insert, while implementing a sort-based data writing algorithm, which can scale very well for several hundred TBs of initial load. However, this just does a best-effort job at sizing files vs. guaranteeing file sizes like inserts and upserts do. Also, the primary keys aren’t sorted during the insert, therefore it’s not advised to use insert during the initial data load. By default, a Bloom index is created for the table, which enables faster lookups for upsert and delete operations.

Bulk insert has the following three sort options, which have different outcomes.

  • GLOAL_SORT – Sorts the record key for the entire dataset before writing.
  • PARTITION_SORT – Applies only to partitioned tables. In this option, the record key is sorted within each partition, and the insert time is faster than the default sort.
  • NONE – Doesn’t sort data before writing.

For testing the bulk insert with the three sort options, we use the following AWS Glue job configuration, which is part of the script hudi_bulk_insert:

  • AWS Glue version: 3.0
  • AWS Glue worker type: G1.X
  • Number of AWS Glue workers: 200
  • Input file: TPC-DS/2.13/1TB/store_sales
  • Input file format: CSV (TPC-DS)
  • Number of input files: 1,431
  • Number of rows in the input dataset: Approximately 2.8 billion

The following charts illustrate the behavior of the bulk insert operations with GLOBAL_SORT, PARTITION_SORT, and NONE as sort options for a CoW table. The statistics in the charts are created by using an average of 10 bulk insert operation runs for each sort option.

Because bulk insert does a best-effort job to pack the data in files, you see a different number of files created with different sort options.

We can observe the following:

  • Bulk insert with GLOBAL_SORT has the least number of files, because Hudi tried to create the optimal sized files. However, it takes the most time.
  • Bulk insert with NONE as the sort option has the fastest write time, but resulted in a greater number of files.
  • Bulk insert with PARTITION_SORT also has a faster write time compared to GLOBAL SORT, but also results in a greater number of files.

Based on these results, although GLOBAL_SORT takes more time to ingest the data, it creates a smaller number of files, which has better upsert and read performance.

The following diagrams illustrate the Spark run plans for the bulk_insert operation using various sort options.

The first shows the Spark run plan for bulk_insert when the sort option is PARTITION_SORT.

The next is the Spark run plan for bulk_insert when the sort option is NONE.

The last is the Spark run plan for bulk_insert when the sort option is GLOBAL_SORT.

The Spark run plan for bulk_insert with GLOBAL_SORT involves shuffling of data to create optimal sized files. For the other two sort options, data shuffling isn’t involved. As a result, bulk_insert with GLOBAL_SORT takes more time compared to the other sort options.

To test the bulk insert with various bulk insert sort data options on a partitioned table, modify the Hudi AWS Glue job (hudi_bulk_insert) parameter --HUDI_INIT_SORT_OPTION.

We change the parameter --HUDI_INIT_SORT_OPTION to PARTITION_SORT or NONE to test the bulk insert with different data sort options. You need to run the job hudi_bulk_insert_dim, which loads the rest of the tables needed to test the SQL queries.

Now, look at the query performance difference between these three options. For query runtime, we ran two TPC-DS queries (q52.sql and q53.sql, as shown in the following query snippets) using interactive session with AWS Glue Studio Notebook with the following notebook configuration to compare the results.

  • AWS Glue version: 3.0
  • AWS Glue worker type: G1.X
  • Number of AWS Glue workers: 50

Before executing the following queries, replace the table names in the queries with the tables you generate in your account.
q52

SELECT
  dt.d_year,
  item.i_brand_id brand_id,
  item.i_brand brand,
  sum(ss_ext_sales_price) ext_price
FROM date_dim dt, store_sales, item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
  AND store_sales.ss_item_sk = item.i_item_sk
  AND item.i_manager_id = 1
  AND dt.d_moy = 11
  AND dt.d_year = 2000
GROUP BY dt.d_year, item.i_brand, item.i_brand_id
ORDER BY dt.d_year, ext_price DESC, brand_id
LIMIT 100
SELECT *
FROM
  (SELECT
    i_manufact_id,
    sum(ss_sales_price) sum_sales,
    avg(sum(ss_sales_price))
    OVER (PARTITION BY i_manufact_id) avg_quarterly_sales
  FROM item, store_sales, date_dim, store
  WHERE ss_item_sk = i_item_sk AND
    ss_sold_date_sk = d_date_sk AND
    ss_store_sk = s_store_sk AND
    d_month_seq IN (1200, 1200 + 1, 1200 + 2, 1200 + 3, 1200 + 4, 1200 + 5, 1200 + 6,
                          1200 + 7, 1200 + 8, 1200 + 9, 1200 + 10, 1200 + 11) AND
    ((i_category IN ('Books', 'Children', 'Electronics') AND

As you can see in the following chart, the performance of the GLOBAL_SORT table outperforms NONE and PARTITION_SORT due to a smaller number of files created in the bulk insert operation.

Ongoing replication use case

For ongoing replication, updates and deletes usually come from transactional databases. As you saw in the previous section, the bulk operation with GLOBAL_SORT took the most time and the operation with NONE took the least time. When you anticipate a higher volume of updates and deletes on an ongoing basis, the sort option is critical for your write performance.

To illustrate the ongoing replication using Apache Hudi upsert and delete operations, we tested using the following configuration:

  • AWS Glue version: 3.0
  • AWS Glue worker type: G1.X
  • Number of AWS Glue workers: 100

To test the upsert and delete operations, we use the store_sales CoW table, which was created using the bulk insert operation in the previous section with all three sort options. We make the following changes:

  • Insert data into a new partition (month 1 and year 2004) using the existing data from month 1 of year 2002 with a new primary key; total of 32,164,890 records
  • Update the ss_list_price column by $1 for the existing partition (month 1 and year 2003); total of 5,997,571 records
  • Delete month 5 data for year 2001; total of 26,997,957 records

The following chart illustrates the runtimes for the upsert operation for the CoW table with different sort options used during the bulk insert.

As you can see from the test run, the runtime of the upsert is higher for NONE and PARTITION_SORT CoW tables. The Bloom index, which is created by default during the bulk insert operation, enables faster lookup for upsert and delete operations.

To test the upsert and delete operations on a CoW table for tables with different data sort options, modify the AWS Glue job (hudi_upsert_cow) parameter HUDI_TABLE_NAME to the desired table, as shown in the following screenshot.

For workloads where updates are performed on the most recent partitions, a Bloom index works fine. For workloads where the update volume is less but the updates are spread across partitions, a simple index is more efficient. You can specify the index type while creating the Hudi table by using the parameter hoodie.index.type. Both the Bloom index and simple index enforce uniqueness of table keys within a partition. If you need uniqueness of keys for the entire table, you must create a global Bloom index or global simple index based on the update workloads.

Multi-tenant partitioned design use case

In this section, we cover Hudi optimistic concurrency using a multi-tenant table design, where each tenant data is stored in a separate table partition. In a real-world scenario, you may encounter a business need to process different tenant data simultaneously, such as a strict SLA to make the data available for downstream consumption as quickly as possible. Without Hudi optimistic concurrency, you can’t have concurrent writes to the same Hudi table. In such a scenario, you can speed up the data writes using Hudi optimistic concurrency when each job operates on a different table dataset. In our multi-tenant table design using Hudi optimistic concurrency, you can run concurrent jobs, where each job writes data to a separate table partition.

For AWS Glue, you can implement Hudi optimistic concurrency using an Amazon DynamoDB lock provider, which was introduced with Apache Hudi 0.10.0. The initial bulk insert script has all the configurations needed to allow multiple writes. The role being used for AWS Glue needs to have DynamoDB permissions added to make it work. For more information about concurrency control and alternatives for lock providers, refer to Concurrency Control.

To simulate concurrent writes, we presume your tenant is based on the category field from the TPC DC test dataset and accordingly partitioned based on the category id field (i_category_id). Let’s modify the script hudi_bulk_insert to run an initial load for different categories. You need to configure your AWS Glue job to run concurrently based on the Maximum concurrency parameter, located under the advanced properties. We describe the Hudi configuration parameters that are needed in the appendix at the end of this post.

The TPC-DS dataset includes data from years 1998–2003. We use i_catagory_id as the tenant ID. The following screenshot shows the distribution of data for multiple tenants (i_category_id). In our testing, we load the data for i_category_id values 3, 5, and 8.

The AWS Glue job hudi_bulk_insert is designed to insert data into specific partitions based on the parameter CATEGORY_ID. If bulk insert job for dimension tables is not run before you need to run the job hudi_bulk_insert_dim, which loads the rest of the tables needed to test the SQL queries.

Now we run three concurrent jobs, each with respective values 3, 5, and 8 to simulate concurrent writes for multiple tenants. The following screenshot illustrates the AWS Glue job parameter to modify for CATEGORY_ID.

We used the following AWS Glue job configuration for each of the three parallel AWS Glue jobs:

  • AWS Glue version: 3.0
  • AWS Glue worker type: G1.X
  • Number of AWS Glue workers: 100
  • Input file: TPC-DS/2.13/1TB/store_sales
  • Input file format: CSV (TPC-DS)

The following screenshot shows all three concurrent jobs started around the same time for three categories, which loaded 867 million rows (50.1 GB of data) into the store_sales table. We used the GLOBAL_SORT option for all three concurrent AWS Glue jobs.

The following screenshot shows the data from the Hudi table where all three concurrent writers inserted data into different partitions, which is illustrated by different colors. All the AWS Glue jobs were run in US Central Time zone (UTC -5). The _hoodie_commit_time is in UTC.

The first two results highlighted in blue corresponds to the AWS Glue job CATEGORY_ID = 3, which had the start time of 09/27/2022 21:23:39 US CST (09/28/2022 02:23:39 UTC).

The next two results highlighted in green correspond to the AWS Glue job CATEGORY_ID = 8, which had the start time of 09/27/2022 21:23:50 US CST (09/28/2022 02:23:50 UTC).

The last two results highlighted in green correspond to the AWS Glue job CATEGORY_ID = 5, which had the start time of 09/27/2022 21:23:44 US CST (09/28/2022 02:23:44 UTC).

The sample data from the Hudi table has _hoodie_commit_time values corresponding to the AWS Glue job run times.

As you can see, we were able to load data into multiple partitions of the same Hudi table concurrently using Hudi optimistic concurrency.

Key findings

As the results show, bulk_insert with GLOBAL_SORT scales well for loading TBs of data in the initial load process. This option is recommended for use cases that require frequent changes after a large migration. Also, when query performance is critical in your use case, we recommend the GLOBAL_SORT option because of the smaller number of files being created with this option.

PARTITION_SORT has better performance for data load compared to GLOBAL_SORT, but it generates a significantly larger number of files, which negatively impacts query performance. You can use this option when the query involves a lot of joins between partitioned tables on record key columns.

The NONE option doesn’t sort the data, but it’s useful when you need the fastest initial load time and requires minimal updates, with the added capability of supporting record changes.

Clean up

When you’re done with this exercise, complete the following steps to delete your resources and stop incurring costs:

  1. On the Amazon S3 console, empty the buckets created by the CloudFormation stack.
  2. On the CloudFormation console, select your stack and choose Delete.

This cleans up all the resources created by the stack.

Conclusion

In this post, we covered some of the Hudi concepts that are important for design decisions. We used AWS Glue and the TPC-DS dataset to collect the results of different use cases for comparison. You can learn from the use cases covered in this post to make the key design decisions, particularly when you’re at the early stage of Apache Hudi adoption. You can go through the steps in this post to start a proof of concept using AWS Glue and Apache Hudi.

References

Appendix

The following table summarizes the Hudi configuration parameters that are needed.

Configuration Value Description Required
hoodie.write.
concurrency.mode
optimistic_concurrency_control Property to turn on optimistic concurrency control. Yes
hoodie.cleaner.
policy.failed.writes
LAZY Property to turn on optimistic concurrency control. Yes
hoodie.write.
lock.provider
org.apache.
hudi.client.
transaction.lock.
DynamoDBBasedLockProvider
Lock provider implementation to use. Yes
hoodie.write.
lock.dynamodb.table
<String> The DynamoDB table name to use for acquiring locks. If the table doesn’t exist, it will be created. You can use the same table across all your Hudi jobs operating on the same or different tables. Yes
hoodie.write.
lock.dynamodb.partition_key
<String> The string value to be used for the locks table partition key attribute. It must be a string that uniquely identifies a Hudi table, such as the Hudi table name. Yes: ‘tablename’
hoodie.write.
lock.dynamodb.region
<String> The AWS Region in which the DynamoDB locks table exists, or must be created. Yes:
Default: us-east-1
hoodie.write.
lock.dynamodb.billing_mode
<String> The DynamoDB billing mode to be used for the locks table while creating. If the table already exists, then this doesn’t have an effect. Yes: Default
PAY_PER_REQUEST
hoodie.write.
lock.dynamodb.endpoint_url
<String> The DynamoDB URL for the Region where you’re creating the table. Yes: dynamodb.us-east-1.amazonaws.com
hoodie.write.
lock.dynamodb.read_capacity
<Integer> The DynamoDB read capacity to be used for the locks table while creating. If the table already exists, then this doesn’t have an effect. No: Default 20
hoodie.write.
lock.dynamodb.
write_capacity
<Integer> The DynamoDB write capacity to be used for the locks table while creating. If the table already exists, then this doesn’t have an effect. No: Default 10

About the Authors

About the author Amit MaindolaAmit Maindola is a Data Architect focused on big data and analytics at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

About the author Srinivas KandiSrinivas Kandi is a Data Architect with focus on data lake and analytics at Amazon Web Services. He helps customers to deploy data analytics solutions in AWS to enable them with prescriptive and predictive analytics.

About the author Amit MaindolaMitesh Patel is a Principal Solutions Architect at AWS. His main area of depth is application and data modernization. He helps customers to build scalable, secure and cost effective solutions in AWS.