Tag Archives: Analytics

Workload management in OpenSearch-based multi-tenant centralized logging platforms

Post Syndicated from Ezat Karimi original https://aws.amazon.com/blogs/big-data/workload-management-in-opensearch-based-multi-tenant-centralized-logging-platforms/

Modern architectures use many different technologies to achieve their goals. Service-oriented architectures, cloud services, distributed tracing, and more create streams of telemetry and other signal data. Each of these data streams becomes a tenant in your logging backend. If your company runs more than one application, the IT team will frequently centralize the storage and processing of log data, making each application a tenant in the overall observability system.

When you use Amazon OpenSearch Service to store and analyze log data, whether as a developer or an IT admin, you must balance these tenants to make sure you deliver the resources to each tenant so they can ingest, store, and query their data. In this post, we present a multi-layered workload management framework with a rules-based proxy and OpenSearch workload management that can effectively address these challenges.

Example use case

In this post, we discuss GlobalLog, a fictional company supporting healthcare, finance, retail, security, and internal tenants, that built a centralized logging system with OpenSearch Service. Each tenant has unique logging patterns based on their business requirements. Financial tenants generate complex, high-volume queries, healthcare tenants focus on compliance with moderate volume logs and queries, and retail tenants experience seasonal spikes with heavy dashboard usage. Internal operation has steady, low-volume logs and infrequent, simple queries. Security monitoring has a constant, high-volume presence throughout the system.

As the GlobalLog’s tenants scaled, operational challenges emerged: high-priority tenant performance suffered during peak hours, resource-intensive queries caused node crashes, and unpredictable traffic created instability. Limited visibility into tenant resource usage complicated troubleshooting and cross-domain security investigations. The platform required robust handling of varied workload patterns and peak usage times, strong performance isolation to prevent tenant interference, and scalability to manage 30% annual data growth.

Solution overview

GlobalLog implemented a comprehensive workload management strategy to handle the diverse demands of its tenants. The solution manages the tenancy with a tiered tenant placement, a rule-based proxy layer that shapes incoming traffic based on the tenant profile and the status of the OpenSearch cluster, and an OpenSearch workload management plugin that provides granular resource governance, allocating resources such as CPU and memory proportionally to each tenant’s tier. The monitoring component provides the intelligence that the solution needs to do its assessment and make reactive and proactive scaling and performance-related decisions by adjusting the traffic governance rules and policies in a timely manner.

The following diagram illustrates the architecture.

GlobalLog multi tier workload management

GlobalLog multi tier workload management

Tenant tiering and placement

GlobalLog categorized tenants into four tiers based on their logging requirements (volume, retention, query frequency) and allocated resources accordingly. The tiering system, enforced through the integrated proxy layer and OpenSearch workload management, prevents resource over-allocation while making sure service levels match business priorities. The specification for each tier is detailed in the following table.

 
Tier SLA Resources Limits Behavior

Tier 1 (Enterprise Critical)

High-volume complex queries (over 100 concurrent)

24/7 SLA with 99.99% availability

50% CPU

50%  Memory

100 concurrent requests

20 MB request size

180-second timeout

Priority query routing and dedicated search threads

Tier 2 (Business Critical)

Moderate volume

compliance-oriented queries

Business hours SLA with 99.9% availability

30% CPU

25% memory

50 concurrent requests

10 MB request size

120-second timeout

Compliance-optimized search pipelines

Tier 3 (Business Standard)

Variable volume

dashboard-heavy usage

Standard business hours support no SLA

10% CPU

20% Memory

25 concurrent requests

5 MB request size

60-second timeout

Burst capacity for seasonal peaks

Tier 4 (Basic)

Internal IT operations

development environments

Best-effort support

no SLA

10% CPU

5%Memory

10 concurrent requests,

2 MB request size

30-second timeout

Automated query optimization for efficiency

Operations, seasonal businesses

GlobalLog’s integrated architecture streamlines its cost allocation and resource distribution model. Financial industry tenants pay premium rates for their guaranteed high-performance resources, effectively subsidizing the infrastructure that supports more variable workloads. These tenants are categorized into Tier 1. Healthcare tenants benefit from isolation that enforces compliance without bearing the full cost of dedicated infrastructure. These tenants are categorized into Tier 2. Retail tenants are categorized into Tier 3 because they appreciate the elastic capacity during peak seasons without maintaining excess capacity year-round. Tier 4 includes the administrative tenants with access to enterprise-grade logging at affordable rates through efficient resource sharing.

This balanced ecosystem helps GlobalLog maintain profitability while delivering appropriate service levels to every tenant regardless of their industry-specific workload characteristics.

In the next sections, we discuss GlobalLog’s workload management system.

Proxy layer

GlobalLog’s continuous feedback loop architecture creates a dynamic ecosystem that optimizes resource allocation across diverse tenant workloads in OpenSearch Service. Rather than depending on static configurations, the architecture monitors performance metrics and tenant usage patterns to drive scaling and remediation decisions. This makes sure the system evolves as workloads change over time.

The proxy layer core component is the OpenSearch Traffic Gateway, which functions as an intermediary between clients and OpenSearch clusters. It features the following key capabilities:

  • Rule-based traffic shaping through pattern matching for request paths and parameters
  • Metrics for resource cost allocation
  • Traffic replay

GlobalLog expanded the capabilities of their OpenSearch Traffic Gateway through a comprehensive set of enhancements focused on centralization, dynamism, and adaptability. At the core of this evolution, they used Amazon DynamoDB as the centralized repository for critical gateway data. This central database houses the complete ecosystem of rules, policies, and tenant profiles, alongside crucial operational data including metrics, usage patterns, SLA requirements, tier configurations, and real-time cluster status information.

Beyond this centralization effort, GlobalLog transformed the gateway with a dynamic mechanism capable of real-time adjustments and responsive decision-making. This architectural shift allows the gateway to react intelligently to changing conditions rather than following predetermined pathways.

Additionally, GlobalLog implemented an adaptive rule system with sophisticated contextual awareness. The system now activates specific rules based on current cluster states and tenant usage patterns, enabling precise resource allocation and protection mechanisms that respond to actual conditions rather than hypothetical scenarios. The system implements time-based rule scheduling, providing flexibility by allowing different limits and policies to automatically engage during specific periods such as maintenance windows. This provides optimal performance while accommodating necessary system operations.

The solution implements a continuous feedback loop between the monitoring system, the OpenSearch cluster, and the proxy layer, where the flow of performance metrics and tenant usage patterns drive automated, rule-based scaling and optimization decisions, helping the system evolve as workloads change over time. In this architecture, Amazon EventBridge triggers an AWS Lambda function when predefined criteria are met (for example, an anomaly is detected in OpenSearch Service), resulting in the Lambda function taking steps to remediate the issues by adjusting the traffic shaping rules and uploading them to the OpenSearch Traffic Gateway. To stabilize the feedback loop, GlobalLog took the following steps:

  • Added dampening mechanisms to prevent rapid rule changes
  • Implemented gradual adjustment patterns instead of binary switches
  • Created circuit breakers for automatic fallback to baseline rules

OpenSearch workload management layer

GlobalLog implemented tenant-level admission control and reactive query management through OpenSearch workload management. The system uses workload management to define resource limits, based on tenant criticality, providing efficient resource allocation and preventing bottlenecks.

A key component of OpenSearch’s workload management is its workload groups. A workload group refers to a logical grouping of queries, typically used for managing resources and prioritizing workloads. GlobalLog uses workload groups to manage resource allocation based on the previously defined tenant tiers. Enterprise-critical workloads receive substantial CPU and memory guarantees, providing consistent performance for financial operations. Business Critical tenants operate with moderate resource guarantees, and Standard and Basic tiers function with more constrained resources, reflecting their lower priority status. The following example shows the workload group setup for Enterprise Critical and Business Critical tiers:

PUT _wlm/workload_group
{
  “name”: “Enterprise Critical”,
  “resiliency_mode”: “enforced”,
  “resource_limits”: {
    “cpu”: 0.5,
    “memory”: 0.5
  }

PUT _wlm/workload_group
{
  “name”: “Business Critical”,
  “resiliency_mode”: “enforced”,
  “resource_limits”: {
    “cpu”: 0.3,
    “memory”: 0.25
  }

OpenSearch responds with the set resource limits and the ID for the workload group for Enterprise Critical tier tenants:

{
"_id":"preXpc67RbKKeCyka72_Gw",
  "name":"analytics",
 "resiliency_mode":"enforced",
 "resource_limits":{
"cpu":0.5,
 "memory":0.5
  },
 "updated_at":1726270204642
}

To use a workload group, use the following code:

GET finindex/_search
Host: localhost:9200
Content-Type: application/json
workloadGroupId: preXpc67RbKKeCyka72_Gw
{
 "query": {
      "match": {
             "field_name": "value"
     }
}
}

Real-world use cases

In this section, we discuss two scenarios where GlobalLog’s workload management system helped the company overcome various challenges.

Scenario 1: Security incident response

During a critical security incident, GlobalLog faced a complex challenge of managing simultaneous log access requests from multiple business units, each with different priority levels. At the highest tier were security and financial operations (Tier 1), followed by healthcare operations (Tier 2), retail operations (Tier 3), and internal operations (Tier 4).

At the proxy layer, GlobalLog gave precedence to security and financial tenant queries while implementing specific limitations for other units. Healthcare operations were capped at 15 concurrent queries, retail operations were restricted to 5 queries per minute, and internal operations had their date ranges narrowed.

OpenSearch workload management and the proxy layer played a crucial role by maintaining the security team’s query priority while managing resource pressure, including the cancellation of complex retail queries during high CPU usage.

Scenario 2: End-of-month reporting

During month-end reporting periods, GlobalLog successfully handled intensive analytical workloads from multiple tenants. The implementation of time-based rules proved particularly effective, with prioritizing Tier 4 tenants for batch reporting during regular end-of-month off-peak business hours. The following code shows an example of GlobalLog rules in this context. The first rule allows Tier 4 tenants to run reports during off-peak business hours, and the second rule denies Tier 4 tenants’ requests during business hours:

monthlyReportAllowRule",
"ruleConfig": {
"tenantTier": "tier4$",
"timeWindow": {
     		"dayOfMonth": "25-30",
      		"hours": "18:00-8:00"
    	      }
               }
monthlyReportDenyRule",
"ruleConfig": {
"tenantTier": "^tier4$",
"timeWindow": {
     	       "dayOfMonth": "25-30",
      	       "hours": "9:00-18:00"
    	      }
               }

The system dynamically adjusted resource allocation for Tier 4 tenants for the off-peak hours (6:00 PM – 8:00 AM) using the OpenSearch workload management API.

This comprehensive approach proved highly successful in managing peak reporting periods, facilitating both system stability and optimal performance across all tenant tiers.

Conclusion

The integration of proxy-layer traffic shaping with the OpenSearch workload management plugin in a continuous feedback loop architecture achieved resiliency, stable performance, and fair resource allocation while supporting diverse business priorities. The implementation discussed in this post demonstrates that large-scale, multi-tenant logging environments can effectively serve diverse business needs on shared infrastructure while maintaining performance and cost-efficiency.

Try out these workload management techniques for your own use case and share your feedback and questions in the comments.


About the Authors

Ezat Karimi is a Senior Solutions Architect at AWS, based in Austin, TX. Ezat specializes in designing and delivering modernization solutions and strategies for database applications. Working closely with multiple AWS teams, Ezat helps customers migrate their database workloads to the AWS Cloud.

Jon Handler is a Senior Principal Solutions Architect at AWS based in Palo Alto, CA. Jon works closely with OpenSearch and Amazon OpenSearch Service, providing help and guidance to a broad range of customers who have vector, search, and log analytics workloads that they want to move to the AWS Cloud. Prior to joining AWS, Jon’s career as a software developer included 4 years of coding a large-scale, ecommerce search engine. Jon holds a Bachelor’s of the Arts from the University of Pennsylvania, and a Master’s of Science and a PhD in Computer Science and Artificial Intelligence from Northwestern University.

Optimizing vector search using Amazon S3 Vectors and Amazon OpenSearch Service

Post Syndicated from Sohaib Katariwala original https://aws.amazon.com/blogs/big-data/optimizing-vector-search-using-amazon-s3-vectors-and-amazon-opensearch-service/

NOTE: As of July 15, the Amazon S3 Vectors Integration with Amazon OpenSearch Service is in preview release and is subject to change.

The way we store and search through data is evolving rapidly with the advancement of vector embeddings and similarity search capabilities. Vector search has become essential for modern applications such as generative AI and agentic AI, but managing vector data at scale presents significant challenges. Organizations often struggle with the trade-offs between latency, cost, and accuracy when storing and searching through millions or billions of vector embeddings. Traditional solutions either require substantial infrastructure management or come with prohibitive costs as data volumes grow.

We now have a public preview of two integrations between Amazon Simple Storage Service (Amazon S3) Vectors and Amazon OpenSearch Service that give you more flexibility in how you store and search vector embeddings:

  1. Cost-optimized vector storage: OpenSearch Service managed clusters using service-managed S3 Vectors for cost-optimized vector storage. This integration will support OpenSearch workloads that are willing to trade off higher latency for ultra-low cost and still want to use advanced OpenSearch capabilities (such as hybrid search, advanced filtering, geo filtering, and so on).
  2. One-click export from S3 Vectors: One-click export from an S3 vector index to OpenSearch Serverless collections for high-performance vector search. Customers who build natively on S3 Vectors will benefit from being able to use OpenSearch for faster query performance.

By using these integrations, you can optimize cost, latency, and accuracy by intelligently distributing your vector workloads by keeping infrequent queried vectors in S3 Vectors and using OpenSearch for your most time-sensitive operations that require advanced search capabilities such as hybrid search and aggregations. Further, OpenSearch performance tuning capabilities (that is, quantization, k-nearest neighbor (knn) algorithms, and method-specific parameters) help to improve the performance with little compromise of cost or accuracy.

In this post, we walk through this seamless integration, providing you with flexible options for vector search implementation. You’ll learn how to use the new S3 Vectors engine type in OpenSearch Service managed clusters for cost-optimized vector storage and how to use one-click export from S3 Vectors to OpenSearch Serverless collections for high-performance scenarios requiring sustained queries with latency as low as 10ms. By the end of this post, you’ll understand how to choose and implement the right integration pattern based on your specific requirements for performance, cost, and scale.

Service overview

Amazon S3 Vectors is the first cloud object store with native support to store and query vectors with sub-second search capabilities, requiring no infrastructure management. It combines the simplicity, durability, availability, and cost-effectiveness of Amazon S3 with native vector search functionality, so you can store and query vector embeddings directly in S3. Amazon OpenSearch Service provides two complementary deployment options for vector workloads: Managed Clusters and Serverless Collections. Both harness Amazon OpenSearch’s powerful vector search and retrieval capabilities, though each excels in different scenarios. For OpenSearch users, the integration between S3 Vectors and Amazon OpenSearch Service offers unprecedented flexibility in optimizing your vector search architecture. Whether you need ultra-fast query performance for real-time applications or cost-effective storage for large-scale vector datasets, this integration lets you choose the approach that best fits your specific use case.

Understanding Vector Storage Options

OpenSearch Service provides multiple options for storing and searching vector embeddings, each optimized for different use cases. The Lucene engine, which is OpenSearch’s native search library, implements the Hierarchical Navigable Small World (HNSW) method, offering efficient filtering capabilities and strong integration with OpenSearch’s core functionality. For workloads requiring additional optimization options, the Faiss engine (Facebook AI Similarity Search) provides implementations of both HNSW and IVF (Inverted File Index) methods, along with vector compression capabilities. HNSW creates a hierarchical graph structure of connections between vectors, enabling efficient navigation during search, while IVF organizes vectors into clusters and searches only relevant subsets during query time. With the introduction of the S3 engine type, you now have a cost-effective option that uses Amazon S3’s durability and scalability while maintaining sub-second query performance. With this variety of options, you can choose the most suitable approach based on your specific requirements for performance, cost, and accuracy. For instance, if your application requires sub-50 ms query responses with efficient filtering, Faiss’s HNSW implementation is the best choice. Alternatively, if you need to optimize storage costs while maintaining reasonable performance, the new S3 engine type would be more appropriate.

Solution overview

In this post, we explore two primary integration patterns:

OpenSearch Service managed clusters using service-managed S3 Vectors for cost-optimized vector storage.

For customers already using OpenSearch Service domains who want to optimize costs while maintaining sub-second query performance, the new Amazon S3 engine type offers a compelling solution. OpenSearch Service automatically manages vector storage in Amazon S3, data retrieval, and cache optimization, eliminating operational overhead.

One-click export from an S3 vector index to OpenSearch Serverless collections for high-performance vector search.

For use cases requiring faster query performance, you can migrate your vector data from an S3 vector index to an OpenSearch Serverless collection. This approach is ideal for applications that require real-time response times and gives you the benefits that come with Amazon OpenSearch Serverless, including advanced query capabilities and filters, automatic scaling and high availability, and no administration. The export process automatically handles schema mapping, vector data transfer, index optimization, and connection configuration.

The following illustration shows the two integration patterns between Amazon OpenSearch Service and S3 Vectors.

Prerequisites

Before you begin, make sure you have:

  • An AWS account
  • Access to Amazon S3 and Amazon OpenSearch Service
  • An OpenSearch Service domain (for the first integration pattern)
  • Vector data stored in S3 Vectors (for the second integration pattern)

Integration pattern 1: OpenSearch Service managed cluster using S3 Vectors

To implement this pattern:

  1. Create an OpenSearch Service Domain using OR1 instances on OpenSearch version 2.19.
    1. While creating the OpenSearch Service domain, choose the Enable S3 Vectors as an engine option in the Advanced features section.
  2. Sign in to OpenSearch Dashboards and open Dev tools. Then create your knn index and specify s3vector as the engine.
PUT my-first-s3vector-index
{
  "settings": {
    "index": {
      "knn": true
    }
  },
  "mappings": {
    "properties": {
        "my_vector1": {
          "type": "knn_vector",
          "dimension": 2,
          "space_type": "l2",
          "method": {
            "engine": "s3vector"
          }
        },
        "price": {
          "type": "float"
        }
    }
  }
} 
  1. Index your vectors using the Bulk API:
POST _bulk
{ "index": { "_index": "my-first-s3vector-index", "_id": "1" } }
{ "my_vector1": [2.5, 3.5], "price": 7.1 }
{ "index": { "_index": "my-first-s3vector-index", "_id": "3" } }
{ "my_vector1": [3.5, 4.5], "price": 12.9 }
{ "index": { "_index": "my-first-s3vector-index", "_id": "4" } }
{ "my_vector1": [5.5, 6.5], "price": 1.2 }
{ "index": { "_index": "my-first-s3vector-index", "_id": "5" } }
{ "my_vector1": [4.5, 5.5], "price": 3.7 }
{ "index": { "_index": "my-first-s3vector-index", "_id": "6" } }
{ "my_vector1": [1.5, 2.5], "price": 12.2 }
  1. Run a knn query as usual:
GET my-first-s3vector-index/_search
{
  "size": 2,
  "query": {
    "knn": {
      "my_vector1": {
        "vector": [2.5, 3.5],
        "k": 2
      }
    }
  }
}

The following animation demonstrates steps 2-4 above.

Integration pattern 2: Export S3 vector indexes to OpenSearch Serverless

To implement this pattern:

  1. Navigate to the AWS Management Console for Amazon S3 and select your S3 vector bucket.

  1. Select a vector index that you want to export. Under Advanced search export, select Export to OpenSearch.

Alternatively, you can:

  1. Navigate to the OpenSearch Service console.
  2. Select Integrations from the navigation pane.
  3. Here you will see a new Integration Template to Import S3 vectors to OpenSearch vector engine – preview. Select Import S3 vector index.

  1. You will now be brought to the Amazon OpenSearch Service integration console with the Export S3 vector index to OpenSearch vector engine template pre-selected and pre-populated with your S3 vector index Amazon Resource Name (ARN). Select an existing role that has the necessary permissions or create a new service role.

  1. Scroll down and choose Export to start the steps to create a new OpenSearch Serverless collection and copy data from your S3 vector index into an OpenSearch knn index.

  1. You will now be taken to the Import history page in the OpenSearch Service console. Here you will see the new job that was created to migrate your S3 vector index into the OpenSearch serverless knn index. After the status changes from In Progress to Complete, you can connect to the new OpenSearch serverless collection and query your new OpenSearch knn index.

The following animation demonstrates how to connect to the new OpenSearch serverless collection and query your new OpenSearch knn index using Dev tools.

Cleanup

To avoid ongoing charges:

  1. For Pattern 1:
  1. For Pattern 2:
    • Delete the import task from the Import history section of the OpenSearch Service console. Deleting this task will remove both the OpenSearch vector collection and the OpenSearch Ingestion pipeline that was automatically created by the import task.

Conclusion

The innovative integration between Amazon S3 Vectors and Amazon OpenSearch Service marks a transformative milestone in vector search technology, offering unprecedented flexibility and cost-effectiveness for enterprises. This powerful combination delivers the best of both worlds: The renowned durability and cost efficiency of Amazon S3 merged seamlessly with the advanced AI search capabilities of OpenSearch. Organizations can now confidently scale their vector search solutions to billions of vectors while maintaining control over their latency, cost, and accuracy. Whether your priority is ultra-fast query performance with latency as low as 10ms through OpenSearch Service, or cost-optimized storage with impressive sub-second performance using S3 Vectors or implementing advanced search capabilities in OpenSearch, this integration provides the perfect solution for your specific needs. We encourage you to get started today by trying S3 Vectors engine in your OpenSearch managed clusters and testing the one-click export from S3 vector indexes to OpenSearch Serverless.

For more information, visit:


About the Authors

Sohaib Katariwala is a Senior Specialist Solutions Architect at AWS focused on Amazon OpenSearch Service based out of Chicago, IL. His interests are in all things data and analytics. More specifically he loves to help customers use AI in their data strategy to solve modern day challenges.

Mark Twomey is a Senior Solutions Architect at AWS focused on storage and data management. He enjoys working with customers to put their data in the right place, at the right time, for the right cost. Living in Ireland, Mark enjoys walking in the countryside, watching movies, and reading books.

Sorabh Hamirwasia is a senior software engineer at AWS working on the OpenSearch Project. His primary interest include building cost optimized and performant distributed systems.

Pallavi Priyadarshini is a Senior Engineering Manager at Amazon OpenSearch Service leading the development of high-performing and scalable technologies for search, security, releases, and dashboards.

Bobby Mohammed is a Principal Product Manager at AWS leading the Search, GenAI, and Agentic AI product initiatives. Previously, he worked on products across the full lifecycle of machine learning, including data, analytics, and ML features on SageMaker platform, deep learning training and inference products at Intel.

Unifying data insights with Amazon QuickSight and Amazon SageMaker

Post Syndicated from Ramon Lopez original https://aws.amazon.com/blogs/big-data/unifying-data-insights-with-amazon-quicksight-and-amazon-sagemaker/

Amazon SageMaker has announced an integration with Amazon QuickSight, bringing together data in SageMaker seamlessly with QuickSight capabilities like interactive dashboards, pixel perfect reports and generative business intelligence (BI)—all in a governed and automated manner. With this integration users can go from exploring data in SageMaker to visualizing it in QuickSight with a single click.

“The integration between Amazon SageMaker and Amazon QuickSight will help us streamline how our teams move from data exploration to insights. Our analysts can go from data discovery to building and sharing dashboards through a unified, governed experience. Dashboards are no longer siloed, one-off reports. They’re cataloged, discoverable assets that others can find and access. This has made insight delivery faster, more consistent, and far easier to scale across the business.”

– Lingam Chockalingam, Chief Data Architect, Maryland Department of Human Services – MD THINK

About QuickSight

QuickSight is a cloud-powered BI service that revolutionizes data analysis and visualization. It seamlessly integrates data from various sources, including AWS services, third-party applications, and software as a service (SaaS) platforms into a single, intuitive dashboard. As a fully managed service, QuickSight offers enterprise-grade security, global accessibility, and scalability without the hassle of infrastructure management. Amazon Q in QuickSight transforms access to data insights for the entire organization using generative AI. Using Amazon Q, business analysts can generate dashboards and reports using natural language prompts. With Amazon Q, business users can ask and answer questions of data using data Q&A, get natural language executive summaries of data to see trends and insights, and use the powerful new agentic data analysis experience of scenarios to discover patterns and outliers in data and perform what-if analysis.

About SageMaker

Amazon SageMaker Unified Studio provides a unified, end-to-end experience consisting of data, analytics, and AI capabilities. You can use familiar AWS services for model development, generative AI, data processing, and analytics—all within a single, governed environment. Users can now build, deploy, and execute end-to-end workflows from a single interface. SageMaker is built on the foundations of Amazon DataZone, where it uses domains to categorize and structure the data assets, while offering project-based collaboration features that teams can use to securely share artifacts and work together across various compute services. This experience allows multiple personas to seamlessly collaborate, while operating under appropriate access controls and governance policies.

Dashboard and insight workflows simplified

Today administrators can configure SageMaker projects with QuickSight to streamline the flow of building insights from your data lake. After being set up, the integration automatically creates a restricted folders that provides a governed context to share assets and data sources, pre-configured with secure connections to data lake tables. This serves as the foundation for any project member securely building and sharing insights. When exploring data in your project the integration allows for one-click access to building a dashboard from any table. Behind the scenes, SageMaker creates a QuickSight dataset in the project’s restricted folder that’s accessible only to members within the project. Not only do dashboards you build in QuickSight stay within this folder, they’re also automatically added as assets to your SageMaker project. There, you can add custom metadata, publish to the SageMaker Catalog and share with users or groups in your corporate directory for broader access—all within SageMaker Unified Studio. This keeps your dashboards organized, discoverable, shareable, and governed, making cross-team collaboration and asset reuse straightforward.

Configure SageMaker and QuickSight

To get started with SageMaker and QuickSight integration, you enable the QuickSight blueprint and create project profiles in the AWS Management Console.

Note that both your SageMaker Unified Studio domain and QuickSight account must be integrated with AWS IAM Identity Center using the same Identity Center instance. Additionally, your QuickSight account must exist in the same AWS account.

  1. Go to the SageMaker console and choose Domain in the navigation pane.
  2. Select the Blueprints tab.
  3. To enable the QuickSight Blueprint, select it from the list, then choose Enable.
  4. On the Enable QuickSight page:
        1. For Provisioning role, select your provisioning role.
        2. For QuickSight VPC manager role, select the AmazonSageMakerQuickSightVPC role.
  5. Choose Enable blueprint.
  6. A confirmation message will appear after the blueprint is successfully enabled.
  7. Go back to the Domains page and select the Project profiles tab and then select the SQL analytics project profile.
  8. Choose Add blueprint deployment settings.
  9. Configure the blueprint deployment settings as follows:
    • Blueprint deployment settings name: Enter a name for your settings. For this post, we used QuickSight-BDS.
    • Blueprint: Select the QuickSight blueprint from the list.
    • Other parameters: Adjust these based on your use case. For this post, we kept the default values.
  10. Scroll down and choose Add blueprint deployment settings to save your configuration.
  11. You’ll receive a confirmation message, and you’ll see that the QuickSight Blueprint deployment setting (QuickSight-BDS) has been added to the list.

Create a SageMaker project with QuickSight enabled:

After the QuickSight integration has been set up by the administrator, data consumers such as analysts and data scientists can begin using it in the SageMaker portal by creating a new project.

  1. Go to the SageMaker portal.
  2. Choose Select a project, then, choose Create project.
  3. On the Create project page:
    1. Project name: Enter the name of your project. For this post, we’re using KPI-Analysis.
    2. Project profile: Select the SQL Analytics project profile.
    3. Choose Continue.
  4. Leave the remaining parameters set to their default values and choose Continue.
  5. Review the information displayed, then choose Create project.
  6. You’ll be redirected to the Creating new project page. Wait for the process to complete.
  7. After the project creation process is complete, you’ll be taken to the Project overview page.

Create a data asset to build the analysis

  1. For this post, you’ll use the transactions.csv file, which contains financial transaction data from various departments.
  2. Choose Build in the top-right menu.
  3. Then select Query Editor from the dropdown.
  4. Choose the plus (+) icon
  5. Select Create table, then choose Next.
  6. On the Set table properties page:
    1. Upload file: Upload the transactions.csv file.
    2. Table type: Select S3/external table.
    3. Leave the remaining parameters at the default values.
    4. Choose Next.
  7. On the Preview schema page, verify that the schema matches the expected structure, then choose Create table.
  8. The Transactions table has now been successfully created.

Create a dashboard using QuickSight

  1. Choose the KPI-Analysis project, then choose Data.
  2. On the Data page: Select the Transactions table, choose Actions, then select Open in QuickSight.
  3. This step redirects you to the QuickSight UI, specifically to the transactions dataset page.
  4. Choose USE IN ANALYSIS to begin exploring the data.
  5. Choose a folder to save your new analysis—for this post, we selected the Assets folder.
  6. Choose Add to save the analysis.
  7. On the New sheet page, leave all parameters at the default values, then choose CREATE.
  8. You’ll now be taken to the Analysis page. In this example, you analyze credit card spending at gas stations, focusing on identifying the most popular fuel type among your cardholders. The goal is to use this insight to design targeted promotions.
  9. Under Visuals, select Pie chart.
  10. Under GROUP/COLOR, select fuel_type.
  11. Under Value, select amount[Sum].
  12. You will see that credit card holders of AWSome-Bank prefer the Premium fuel type.
  13. Publish this new dashboard to the enterprise data catalog. To do that, choose PUBLISH located in the top right corner.
  14. On the Publish Dashboard page:
    1. Enter a name for the dashboard. For this post, we’re using gas_consumption_analysis.
    2. Leave the remaining parameters set to their default values.
    3. Choose PUBLISH DASHBOARD.

Documenting and publishing a QuickSight asset

After the dashboard is created, it’s automatically added to the SageMaker project. From there, analysts or BI engineers can enrich it with business metadata, make it discoverable across the organization, and share it with other users or groups in their corporate directory.

  1. Go back to the Amazon SageMaker portal
  2. Select the Assets tab.
  3. On the Inventory tab, select the gas_consumption_analysis asset.
  4. This will take you to the main asset page, where you can add business metadata, view the lineage diagram, and review the asset history.
  5. For this post, you will only add a README section.
  6. Choose CREATE README to get started.
  7. Add a description for the asset. For this POST, we used the following:
Overview
This Amazon QuickSight dashboard provides insights into the fuel type preferences of a bank’s credit card holders. It helps business stakeholders and analysts understand customer behavior at fuel stations, supporting data-driven marketing strategies and product personalization.
Purpose
The goal of this dashboard is to:
Analyze which fuel types (for example, Regular, Premium, Diesel, Electric) are most frequently purchased using the bank’s credit cards.
Identify customer segments (for example, age groups, locations, income brackets) that prefer specific fuel types.
Understand transaction patterns such as frequency, average spend per fuel type, and purchase timeframes.
  1. Choose SAVE README to save the description.
  2. On this page, you can also add glossary terms and metadata forms to provide additional business context to the asset. For this post, leave these fields empty.
  3. Now you’re ready to publish the QuickSight asset to the enterprise data catalog. To do this, choose PUBLISH ASSET.
  4. A confirmation prompt will appear. Choose PUBLISH ASSET again to complete the publishing process.

Search for a QuickSight asset

  1. For this post, we created a second project called Marketing, but you can use any other project within your domain or even reuse the one created in the earlier steps.
  2. Navigate to the SageMaker home page.
  3. In the catalog search field, enter gas to find the published asset.
  4. Select the relevant result for the published asset from the search results.
  5. This will take you to the asset’s main page, where you can view the metadata added by the producer.

Sharing a QuickSight asset

You can share the QuickSight dashboard with users and groups in your organization directly from within SageMaker.

  1. Go back to the KPI-Analysis project.
  2. Choose the Data tab.
  3. Then, select Assets from the Project catalog.
  4. Go to the PUBLISHED tab, then select the gas_consumption_analysis asset.
  5. Choose Actions, then select Share.
  6. You can share the asset with individual SSO users or with groups. For this post, we selected an SSO group named quicksight-users, but you can choose any user or group you have previously created.
  7. Choose Share.
  8. A confirmation message will appear after the asset has been successfully shared.

Clean up

When you’re done with these exercises, complete the following steps to delete your resources to avoid incurring costs:

  1. Delete the QuickSight assets that you created.
    1. If QuickSight is enabled solely for testing, make sure to cancel the QuickSight account.
  2. Delete the project created in SageMaker.
    1. If SageMaker is enabled solely for testing, make sure to cancel the SageMaker account.

Conclusion

This post walked through the complete process of integrating Amazon QuickSight with Amazon SageMaker Unified Studio, demonstrating how teams can move from raw data to published dashboards in a secure and governed environment. By combining the advanced analytics capabilities of QuickSight with the collaborative project-based structure of SageMaker, organizations can accelerate insight delivery while maintaining clear control over data access and governance.

The integration simplifies creating datasets directly from Amazon Athena or Amazon Redshift tables, enrich them with business metadata, and publish dashboards to the SageMaker Catalog. When published, these dashboards can be shared with users or groups across the organization, making insights both discoverable and actionable.

With the added power of Amazon Q in QuickSight and generative BI, users can ask questions in plain English and receive real-time visualizations and insights. This makes data exploration intuitive and inclusive, empowering more users to make informed decisions. Combined with the unified analytics and AI environment of SageMaker Unified Studio, this solution supports secure, scalable, and collaborative data-driven innovation.


About the authors

Ramon Lopez is a Principal Solutions Architect for Amazon QuickSight. With many years of experience building BI solutions and a background in accounting, he loves working with customers, creating solutions, and making world-class services. When not working, he prefers to be outdoors in the ocean or up on a mountain.

Leonardo Gomez is a Principal Analytics Specialist Solutions Architect at AWS. He has over a decade of experience in data management, helping customers around the globe address their business and technical needs. Connect with him on LinkedIn.

Integrating Amazon OpenSearch Ingestion with Amazon RDS and Amazon Aurora

Post Syndicated from Michael Torio original https://aws.amazon.com/blogs/big-data/integrating-amazon-opensearch-ingestion-with-amazon-rds-and-amazon-aurora/

Unlocking powerful search capabilities for millions of items should be fast, accurate, and effortless while maintaining high relevance. Relational databases are a popular storage method for structured data, and organizations use them extensively to store their core business information. Although relational databases excel at storing and retrieving structured data, they often struggle with searching through large blocks of unstructured text and, for performance reasons, typically don’t index all columns.

In contrast, search engines such as OpenSearch index all fields, enabling rich search capabilities, including semantic search, and powerful aggregations for summarizing and analyzing numeric data. Traditionally, organizations have managed complex, inefficient, and expensive data synchronization processes, including extract, transform, and load (ETL) pipelines, to keep their search indices up to date with their databases. Those looking to enhance their applications with advanced search features need a simpler solution that can maintain search index synchronization with their databases without the overhead of managing custom data sync processes.

We are happy to announce the general availability of the integration of Amazon OpenSearch Service with Amazon Relational Database Service (Amazon RDS) and Amazon Aurora. This new integration eliminates complex data pipelines and enables near real-time data synchronization between Amazon Aurora (including Amazon Aurora MySQL-Compatible Edition and Amazon Aurora PostgreSQL-Compatible Edition) and Amazon RDS databases (including Amazon RDS for MySQL and Amazon RDS for PostgreSQL), and Amazon OpenSearch Service, unlocking advanced search capabilities such as hybrid search, ranked results, and faceted search on transactional databases. You can now deliver low-latency, high-throughput search results, live inventory updates, and personalized recommendations while focusing on creating exceptional customer experiences instead of managing data synchronization. This integration reduces the operational burden of maintaining complex ETL pipelines, reducing costs while providing instant data availability for search operations.

Amazon OpenSearch Ingestion provides near real-time data synchronization between Amazon Aurora or Amazon RDS and OpenSearch Service. Select your Aurora or RDS database, and OpenSearch Ingestion handles the rest, supporting both Aurora MySQL or RDS for MySQL (8.0 and above) and Aurora PostgreSQL or RDS for PostgreSQL (16 and above).

Solution overview

Here’s how these services work together:

  • Data ingestion – OpenSearch Ingestion first loads your database snapshot from Amazon Simple Storage Service (Amazon S3), where Aurora or Amazon RDS has exported the initial data. It then uses Aurora or Amazon RDS change data capture (CDC) streams to replicate further changes in near real time and indexes them into OpenSearch Service. This automated process keeps your data is consistently up to date in OpenSearch, making it readily available for search and analysis without manual intervention.
  • Real-time querying – OpenSearch Service offers powerful query capabilities that enable you to perform complex searches and aggregations on your data. Whether you need to analyze trends, detect anomalies, or perform search queries to return relevant results for your application, OpenSearch Service provides the tools you need.

The following diagram illustrates the solution architecture for Amazon Aurora as a source:

A diagram of a processAI-generated content may be incorrect.

Getting Started

Configuring Your Database Source

Before setting up synchronization, you need to configure your source database’s logging settings. For Aurora MySQL, configure your cluster parameter group with enhanced binary log settings. For Amazon RDS, enable basic binary logging or logical replication through your instance parameter group settings. These logging configurations enable OpenSearch Ingestion to capture and replicate data changes from your database.

The sample HR database with Aurora MySQL is a good example to show how this integration works.

Before creating the view, we now explain how OpenSearch will represent this data. OpenSearch mappings define how documents and their fields are stored and indexed, similar to how a database schema defines tables and columns. The OpenSearch Ingestion pipeline uses dynamic mappings by default, automatically converting Aurora or Amazon RDS data types to appropriate OpenSearch field types. For example, database DATE fields become OpenSearch date types, and numeric fields are mapped to corresponding OpenSearch numeric types. Although you can customize these mappings using index templates, the default mappings typically handle common data types correctly, including dates, numbers, and text fields.

GET employees/_mapping

To demonstrate the integration’s ability to handle complex data relationships, we now examine how OpenSearch Ingestion handles joined data. We create a view in the sample HR database that combines information from multiple related tables into a single, searchable document in OpenSearch. This approach shows how you can transform normalized database structures into denormalized documents that are optimized for search operations.

This employee_details view combines data from multiple tables, creating a rich, denormalized representation of employee information. When replicated to OpenSearch, this view becomes a single, comprehensive document for each employee. This structure is ideal for search operations, allowing for fast and complex queries across what were originally separate tables. For example, you could easily search for employees in a specific department and country or analyze salary distributions across regions—queries that would be more complex and potentially slower in the original normalized database structure.

In the pipeline configuration shown in the following screenshot, you can check how OpenSearch Ingestion connects to the HR database. The configuration identifies the source database and the specific tables we want to replicate. While we created a view to understand the data relationships, the pipeline tracks changes from the underlying base tables (employees, departments, locations, and regions). OpenSearch Ingestion automatically maintains these relationships, which means that changes to these tables are properly reflected in your OpenSearch index, keeping your search data consistent with your source database.

In the gif shown below, you can see a demo of setting up this integration using the visual editor of OpenSearch Ingestion.

You can also specify index mapping templates to map your Aurora or Amazon RDS fields to the correct fields in your OpenSearch Service indexes.

For a comprehensive overview of configuration settings for the pipeline, refer to the OpenSearch Data Prepper documentation. You must set up AWS Identity and Access Management (IAM) roles for the pipeline. For instructions, refer to Configure the pipeline role.

After you configure the integration in OpenSearch Ingestion, the pipeline automatically creates indexes that you can view in OpenSearch Dashboards. OpenSearch Ingestion first triggers an automatic export of your Aurora or Amazon RDS database to Amazon S3, then loads this snapshot data from S3 into your OpenSearch cluster to create the initial indices. After this initial load, OpenSearch Ingestion continually captures changes using binary logs (binlog) for MySQL-based databases or write-ahead logs (WAL) for PostgreSQL-based databases. This way, your OpenSearch indices stay synchronized with your source database in near real time. You can view your indices in OpenSearch Dashboards by invoking:

GET _cat/indices

Example response:

Demonstrating near real time data synchronization

Consider the first five entries in the employee table:

When you make changes to your database, OpenSearch Ingestion updates Amazon OpenSearch Service with the change data. For example, the following code updates an employee’s salary:

UPDATE hr.employees SET SALARY = 26000 WHERE EMPLOYEE_ID = 100;

Amazon Aurora sends out a change notice, your OpenSearch Ingestion pipeline picks it up, and OpenSearch Ingestion sends the changed record to OpenSearch in near real time. You can verify this with an OpenSearch query:

GET employees/_search

Important details about this feature:

  • Monitoring Track pipeline performance and data synchronization through CloudWatch metrics and the OpenSearch Ingestion dashboard
  • Limitations – Requires same-Region and same-account deployment, primary keys for optimal synchronization, and currently has no data definition language (DDL) statement support

Conclusion

Amazon Aurora or Amazon RDS integration with Amazon OpenSearch Service is now generally available in all AWS Regions where OpenSearch Ingestion is available.

To learn more, refer to the AWS documentation for Aurora or Amazon RDS integration with Amazon OpenSearch Service:


About the authors

Michael Torio is an Associate Specialist Solutions Architect at AWS focused on Amazon OpenSearch Service based out of Mountain View, CA. Michael enjoys helping customers leverage cloud technologies to solve their business challenges.

Sohaib Katariwala is a Senior Specialist Solutions Architect at AWS focused on Amazon OpenSearch Service based out of Chicago, IL. His interests are in all things data and analytics. More specifically he loves to help customers use AI in their data strategy to solve modern day challenges.

Arjun Nambiar is a Product Manager with Amazon OpenSearch Service. He focuses on ingestion technologies that enable ingesting data from a wide variety of sources into Amazon OpenSearch Service at scale. Arjun is interested in large-scale distributed systems and cloud-centered technologies, and is based out of Seattle, Washington.

Scale your AWS Glue for Apache Spark jobs with R type, G.12X, and G.16X workers

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/scale-your-aws-glue-for-apache-spark-jobs-with-r-type-g-12x-and-g-16x-workers/

With AWS Glue, organizations can discover, prepare, and combine data for analytics, machine learning (ML), AI, and application development. At its core, AWS Glue for Apache Spark jobs operate by specifying your code and the number of Data Processing Units (DPUs) needed, with each DPU providing computing resources to power your data integration tasks. However, although the existing workers effectively serve most data integration needs, today’s data landscapes are becoming increasingly complex at larger scale. Organizations are dealing with larger data volumes, more diverse data sources, and increasingly sophisticated transformation requirements.

Although horizontal scaling (adding more workers) effectively addresses many data processing challenges, certain workloads benefit significantly from vertical scaling (increasing the capacity of individual workers). These scenarios include processing large, complex query plans, handling memory-intensive operations, or managing workloads that require substantial per-worker resources for operations such as large join operations, complex aggregations, and data skew scenarios. The ability to scale both horizontally and vertically provides the flexibility needed to optimize performance across diverse data processing requirements.

Responding to these growing demands, today we are pleased to announce the general availability of AWS Glue R type, G.12X, and G.16X workers, the new AWS Glue worker types for the most demanding data integration workloads. G.12X and G.16X workers offer increased compute, memory, and storage, making it possible for you to vertically scale and run even more intensive data integration jobs. R type workers offer increased memory to meet even more memory-intensive requirements. Larger worker types not only benefit the Spark executors, but also in cases where the Spark driver needs larger capacity—for instance, because the job query plan is large. To learn more about Spark driver and executors, see Key topics in Apache Spark.

This post demonstrates how AWS Glue R type, G.12X, and G.16X workers help you scale up your AWS Glue for Apache Spark jobs.

R type workers

AWS Glue R type workers are designed for memory-intensive workloads where you need more memory per worker than G worker types. G worker types run with a 1:4 vCPU to memory (GB) ratio, whereas R worker types run with a 1:8 vCPU to memory (GB) ratio. R.1X workers provide 1 DPU, with 4 vCPU, 32 GB memory, and 94 GB of disk per node. R.2X workers provide 2 DPU, with 8 vCPU, 64 GB memory, and 128 GB of disk per node. R.4X workers provide 4 DPU, with 16 vCPU, 128 GB memory, and 256 GB of disk per node. R.8X workers provide 8 DPU, with 32 vCPU, 256 GB memory, and 512 GB of disk per node. As with G worker types, you can choose R type workers with a single parameter change in the API, AWS Command Line Interface (AWS CLI), or AWS Glue Studio. Regardless of the worker used, the AWS Glue jobs have the same capabilities, including automatic scaling and interactive job authoring using notebooks. R type workers are available with AWS Glue 4.0 and 5.0.

The following table shows compute, memory, disk, and Spark configurations for each R worker type.

AWS Glue Worker Type DPU per Node vCPU Memory (GB) Disk (GB) Approximate Free Disk Space (GB) Number of Spark Executors per Node Number of Cores per Spark Executor
R.1X 1 4 32 94 44 1 4
R.2X 2 8 64 128 78 1 8
R.4X 4 16 128 256 230 1 16
R.8X 8 32 256 512 485 1 32

To use R type workers on an AWS Glue job, change the setting of the worker type parameter. In AWS Glue Studio, you can choose R 1X, R 2X, R 4X, or R 8X under Worker type.

In the AWS API or AWS SDK, you can specify R worker types in the WorkerType parameter. In the AWS CLI, you can use the --worker-type parameter in a create-job command.

To use R worker types on an AWS Glue Studio notebook or interactive sessions, set R.1X, R.2X, R.4X, or R.8X in the %worker_type magic:

R type workers are priced at $0.52 per DPU-hour for each job, billed per second with a 1-minute minimum.

G.12X and G.16X workers

AWS Glue G.12X and G.16X workers give you more compute, memory, and storage to run your most demanding jobs. G.12X workers provide 12 DPU, with 48 vCPU, 192 GB memory, and 768 GB of disk per worker node. G.16X workers provide 16 DPU, with 64 vCPU, 256 GB memory, and 1024 GB of disk per node. G.16x is double the resources of the existing largest worker type G.8X. You can enable G.12X and G.16X workers with a single parameter change in the API, AWS CLI, or AWS Glue Studio. Regardless of the worker used, the AWS Glue jobs have the same capabilities, including automatic scaling and interactive job authoring using notebooks. G.12X and G.16X workers are available with AWS Glue 4.0 and 5.0.The following table shows compute, memory, disk, and Spark configurations for each G worker type.

AWS Glue Worker Type DPU per Node vCPU Memory (GB) Disk (GB) Approximate Free Disk Space (GB) Number of Spark Executors per Node Number of Cores per Spark Executor
G.025X 0.25 2 4 84 34 1 2
G.1X 1 4 16 94 44 1 4
G.2X 2 8 32 138 78 1 8
G.4X 4 16 64 256 230 1 16
G.8X 8 32 128 512 485 1 32
G.12X (new) 12 48 192 768 741 1 48
G.16X (new) 16 64 256 1024 996 1 64

To use G.12X and G.16X workers on an AWS Glue job, change the setting of the worker type parameter to G.12X or G.16X. In AWS Glue Studio, you can choose G 12X or G 16X under Worker type.

In the AWS API or AWS SDK, you can specify G.12X or G.16X in the WorkerType parameter. In the AWS CLI, you can use the --worker-type parameter in a create-job command.

To use G.12X and G.16X on an AWS Glue Studio notebook or interactive sessions, set G.12X or G.16X in the %worker_type magic:

G type workers are priced at $0.44 per DPU-hour for each job, billed per second with a 1-minute minimum. This is the same pricing as the existing worker types.

Choose the right worker type for your workload

To optimize job resource utilization, run your expected application workload to identify the ideal worker type that aligns with your application’s requirements. Start with general worker types like G.1X or G.2X, and monitor your job run from AWS Glue job metrics, observability metrics, and Spark UI. For more details about how to monitor the resource metrics for AWS Glue jobs, see Best practices for performance tuning AWS Glue for Apache Spark jobs.

When your data processing workload is well distributed across workers, G.1X or G.2X work very well. However, some workloads might require more resources per worker. You can use the new G.12X, G.16X, and R type workers to address them. In this section, we discuss typical use cases where vertical scaling is effective.

Large join operations

Some joins might involve large tables where one or both sides need to be broadcast. Multi-way joins require multiple large datasets to be held in memory. With skewed joins, certain partition keys have disproportionately large data volumes. Horizontal scaling doesn’t help when the entire dataset needs to be in memory on each node for broadcast joins.

High-cardinality group by operations

This use case includes aggregations on columns with many unique values, operations requiring maintenance of large hash tables for grouping, and distinct counts on columns with high uniqueness. High-cardinality operations often result in large hash tables that need to be maintained in memory on each node. Adding more nodes doesn’t reduce the size of these per-node data structures.

Window functions and complex aggregations

Some operations might require a large window frame, or involve computing percentiles, medians, or other rank-based analytics across large datasets, in addition to complex grouping sets or CUBE operations on high-cardinality columns. These operations often require keeping large portions of data in memory per partition. Adding more nodes doesn’t reduce the memory requirement for each individual window or grouping operation.

Complex query plans

Complex query plans can have many stages and deep dependency chains, operations requiring large shuffle buffers, or multiple transformations that need to maintain large intermediate results. These query plans often involve large amounts of intermediate data that need to be held in memory. More nodes don’t necessarily simplify the plan or reduce per-node memory requirements.

Machine learning and complex analytics

With ML and analytics use cases, model training might involve large feature sets, wide transformations requiring substantial intermediate data, or complex statistical computations requiring entire datasets in memory. Many ML algorithms and complex analytics require the entire dataset or large portions of it to be processed together, which can’t be effectively distributed across more nodes.

Data skew scenarios

In some data skew scenarios, you might have to process heavily skewed data where certain partitions are significantly larger, or perform operations on datasets with high-cardinality keys, leading to uneven partition sizes. Horizontal scaling can’t address the fundamental issue of data skew, where some partitions remain much larger than others regardless of the number of nodes.

State-heavy stream processing

State-heavy stream processing can include stateful operations with large state requirements, windowed operations over streaming data with large window sizes, or processing micro-batches with complex state management. Stateful stream processing often requires maintaining large amounts of state per key or window, which can’t be easily distributed across more nodes without compromising the integrity of the state.

In-memory caching

These scenarios might include large datasets that must be be cached for repeated access, iterative algorithms requiring multiple passes over the same data, or caching large datasets for fast access, which often requires keeping substantial portions of data in each node’s memory. Horizontal scaling might not help if the entire dataset needs to be cached on each node for optimal performance.

Data skew example scenarios

Several common patterns can typically cause data skew, such as sorting or groupBy transformations on columns with non-uniformed value distributions, and join operations where certain keys appear more frequently than other keys.

In the following example, we compare the behavior with two different worker types, G.2X and R.2X in the same sample workload to process skewed data.

With G.2X workers

With the G.2X worker type, an AWS Glue job with 10 workers failed due to a No space on left device error while writing records into Amazon Simple Storage Service (Amazon S3). This was mainly caused by large shuffling on a specific column. The following Spark UI view shows the job details.

The Jobs tab shows two completed jobs and one active job where 8 tasks failed out of 493 tasks. Let’s drill down to the details.

The Executors tab shows an uneven distribution of data processing across the Spark executors, which indicates data skew in this failed job. Executors with IDs 2, 7, and 10 have failed tasks and read approximately 64.5 GiB of shuffle data as shown in the Shuffle Read column. In contrast, the other executors show 0.0 B of shuffle data in the Shuffle Read column.

The G.2X worker type can handle most Spark workloads such as data transformations and join operations. However, in this example, there was significant data skew, which caused certain executors to fail due to exceeding the allocated memory.

With R.2X workers

With the R.2X worker type, an AWS Glue job with 10 workers successfully ran without any failures. The number of workers is the same as the previous example—the only difference is the worker type. R workers have two times more memory compared to G workers. The following Spark UI view shows more details.

The Jobs tab shows three completed jobs. No failures are shown on this page.

The Executors tab shows no failed tasks per executor even though there’s an uneven distribution of shuffle reads across executors.

The results showed that R.2X workers successfully completed the workload that failed on G.2X workers using the same number of executors but with the additional memory capacity to handle the skewed data distribution.

Conclusion

In this post, we demonstrated how AWS Glue R type, G.12X, and G.16X workers can help you vertically scale your AWS Glue for Apache Spark jobs. You can start using the new R type, G.12X, and G.16X workers to scale your workload today. For more information on these new worker types and AWS Regions where the new workers are available, visit the AWS Glue documentation.

To learn more, see Getting Started with AWS Glue.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect with AWS Analytics services. He’s responsible for building software artifacts to help customers. In his spare time, he enjoys cycling on his road bike.

Tomohiro Tanaka is a Senior Cloud Support Engineer at Amazon Web Services. He’s passionate about helping customers use Apache Iceberg for their data lakes on AWS. In his free time, he enjoys a coffee break with his colleagues and making coffee at home.

Peter Tsai is a Software Development Engineer at AWS, where he enjoys solving challenges in the design and performance of the AWS Glue runtime. In his leisure time, he enjoys hiking and cycling.

Matt Su is a Senior Product Manager on the AWS Glue team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytics services. In his spare time, he enjoys skiing and gardening.

Sean McGeehan is a Software Development Engineer at AWS, where he builds features for the AWS Glue fulfillment system. In his leisure time, he explores his home of Philadelphia and work city of New York.

Streamline the path from data to insights with new Amazon SageMaker Catalog capabilities

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/streamline-the-path-from-data-to-insights-with-new-amazon-sagemaker-capabilities/

Modern organizations manage data across multiple disconnected systems—structured databases, unstructured files, and separate visualization tools—creating barriers that slow analytics workflows and limit insight generation. Separate visualization platforms often create barriers that prevent teams from extracting comprehensive business insights.

These disconnected workflows prevent your organizations from maximizing your data investments, creating delays in decision making and missed opportunities for comprehensive analysis that combines multiple data types.

Starting today, you can use three new capabilities in Amazon SageMaker to accelerate your path from raw data to actionable insights:

  • Amazon QuickSight integration – Launch Amazon QuickSight directly from Amazon SageMaker Unified Studio to build dashboards using your project data, then publish them to the Amazon SageMaker Catalog for broader discovery and sharing across your organization.
  • Amazon SageMaker adds support for Amazon S3 general purpose buckets and Amazon S3 Access Grants in SageMaker Catalog– Make data stored in Amazon S3 general purpose buckets easier for teams to find, access, and collaborate on all types of data including unstructured data, while maintaining fine-grained access control using Amazon S3 Access Grants.
  • Automatic data onboarding from your lakehouse – Automatic onboarding of existing AWS Glue Data Catalog (GDC) datasets from the lakehouse architecture into SageMaker Catalog, without manual setup.

These new SageMaker capabilities address the complete data lifecycle within a unified and governed experience. You get automatic onboarding of existing structured data from your lakehouse, seamless cataloging of unstructured data content in Amazon S3, and streamlined visualization through QuickSight—all with consistent governance and access controls.

Let’s take a closer look at each capability.

Amazon SageMaker and Amazon QuickSight Integration
With this integration, you can build dashboards in Amazon QuickSight using data from your Amazon SageMaker projects. When you launch QuickSight from Amazon SageMaker Unified Studio, Amazon SageMaker automatically creates the QuickSight dataset and organizes it in a secured folder accessible only to project members.

Furthermore, the dashboards you build stay within this folder and automatically appear as assets in your SageMaker project, where you can publish them to the SageMaker Catalog and share them with users or groups in your corporate directory. This keeps your dashboards organized, discoverable, and governed within SageMaker Unified Studio.

To use this integration, both your Amazon SageMaker Unified Studio domain and QuickSight account must be integrated with AWS IAM Identity Center using the same IAM Identity Center instance. Additionally, your QuickSight account must exist in the same AWS account where you want to enable the QuickSight blueprint. You can learn more about the prerequisites on Documentation page

After these prerequisites are met, you can enable the blueprint for Amazon QuickSight by navigating to the Amazon SageMaker console and choosing the Blueprints tab. Then find Amazon QuickSight and follow the instructions.

You also need to configure your SQL analytics project profile to include Amazon QuickSight in Add blueprint deployment settings.

To learn more on onboarding setup, refer to the Documentation page.

Then, when you create a new project, you need to use the SQL analytics profile.

With your project created, you can start building visualizations with QuickSight. You can navigate to the Data tab, select the table or view to visualize, and choose Open in QuickSight under Actions.

This will redirect you to the Amazon QuickSight transactions dataset page and you can choose USE IN ANALYSIS to begin exploring the data.

When you create a project with the QuickSight blueprint, SageMaker Unified Studio automatically provisions a restricted QuickSight folder per project where SageMaker scopes all new assets—analyses, datasets, and dashboards. The integration maintains real-time folder permission sync, keeping QuickSight folder access permissions aligned with project membership.

Amazon Simple Storage Service (S3) general purpose buckets integration
Starting today, SageMaker adds support for S3 general purpose buckets in SageMaker Catalog to increase discoverability and allows granular permissions through S3 Access Grants, enabling users to govern data, including sharing and managing permissions. Data consumers, such as data scientists, engineers, and business analysts, can now discover and access S3 assets through SageMaker Catalog. This expansion also enables data producers to govern security controls on any S3 data asset through a single interface.

To use this integration, you need appropriate S3 general purpose bucket permissions, and your SageMaker Unified Studio projects must have access to the S3 buckets containing your data. Learn more about prerequisites on Amazon S3 data in Amazon SageMaker Unified Studio Documentation page.

You can add a connection to an existing S3 bucket.

When it’s connected, you can browse accessible folders and create discoverable assets by choosing on the bucket or a folder and selecting Publish to Catalog.

This action creates a SageMaker Catalog asset of type “S3 Object Collection” and opens an asset details page where users can augment business context to improve search and discoverability. Once published, data consumers can discover and subscribe to these cataloged assets. When data consumers subscribe to “S3 Object Collection” assets, SageMaker Catalog automatically grants access using S3 Access Grants upon approval, enabling cross-team collaboration while ensuring only the right users have the right access.

When you have access, now you can process your unstructured data in Amazon SageMaker Jupyter notebook. Following screenshot is an example to process image in medical use case.

If you have structured data, you can query your data using Amazon Athena or process using Spark in notebooks.

With this access granted through S3 Access Grants, you can seamlessly incorporate S3 data into my workflows—analyzing it in notebooks, combining it with structured data in the lakehouse and Amazon Redshift for comprehensive analytics. You can access unstructured data such as documents, images in JupyterLab notebooks to train ML models, or generate queryable insights.

Automatic data onboarding from your lakehouse
This integration automatically onboards all your lakehouse datasets into SageMaker Catalog. The key benefit for you is to bring AWS Glue Data Catalog (GDC) datasets into SageMaker Catalog, eliminating manual setup for cataloging, sharing, and governing them centrally.

This integration requires an existing lakehouse setup with Data Catalog containing your structured datasets.

When you set up a SageMaker domain, SageMaker Catalog automatically ingests metadata from all lakehouse databases and tables. This means you can immediately explore and use these datasets from within SageMaker Unified Studio without any configuration.

The integration helps you to start managing, governing, and consuming these assets from within SageMaker Unified Studio, applying the same governance policies and access controls you can use for other data types while unifying technical and business metadata.

Additional things to know
Here are a couple of things to note:

  • Availability – These integrations are available in all commercial AWS Regions where Amazon SageMaker is supported.
  • Pricing – Standard SageMaker Unified Studio, QuickSight, and Amazon S3 pricing applies. No additional charges for the integrations themselves.
  • Documentation – You can find complete setup guides in the SageMaker Unified Studio Documentation.

Get started with these new integrations through the Amazon SageMaker Unified Studio console.

Happy building!
Donnie

Introducing Jobs in Amazon SageMaker

Post Syndicated from Chiho Sugimoto original https://aws.amazon.com/blogs/big-data/introducing-jobs-in-amazon-sagemaker/

Processing large volumes of data efficiently is critical for businesses, and so data engineers, data scientists, and business analysts need reliable and scalable ways to run data processing workloads. The next generation of Amazon SageMaker is the center for all your data, analytics, and AI. Amazon SageMaker Unified Studio is a single data and AI development environment where you can find and access all of the data in your organization and act on it using the best tools across any use case.

We’re excited to announce a new data processing job experience for Amazon SageMaker. Jobs are a common concept widely used in existing AWS services such as Amazon EMR and AWS Glue. With this launch, you can now build jobs in SageMaker to process large volumes of data. Jobs can be built using your preferred tool. For example, you can create jobs from extract, transform, and load (ETL) scripts coded in the Unified Studio code editor, code interactively in a Unified Studio Notebooks, or create jobs visually using the Unified Studio Visual ETL editor. After being created, data processing jobs can be set to run on demand, scheduled using the built in scheduler, or orchestrated with SageMaker workflows. You can monitor the status of your data processing jobs and view run history showing status, logs, and performance metrics. When jobs encounter failures, you can use generative AI troubleshooting to automatically analyze errors and receive detailed recommendations to resolve issues quickly. Together, you can use these capabilities to author, manage, operate, and monitor data processing workloads across your organization. The new experience provides an experience that’s consistent with other AWS analytics services such as AWS Glue.

This post demonstrates how the new jobs experience works in SageMaker Unified Studio.

Prerequisites

To get started, you must have the following prerequisites in place:

  • An AWS account
  • A SageMaker Unified Studio domain
  • A SageMaker Unified Studio project with an Data analytics and AI-ML model development project profile

Example use case

A global apparel ecommerce retailer processes thousands of customer reviews daily across multiple marketplaces. They need to transform their raw review data into actionable insights to improve their product offerings and customer experience. Using SageMaker Unified Studio visual ETL editor, we’ll demonstrate how to transform raw review data into structured analytical datasets that enable market-specific performance analysis and product quality monitoring.

Create and run a visual job

In this section, you’ll create a Visual ETL Job that processes the review data from a Parquet file in Amazon Simple Storage Service Amazon S3. The job transforms the data using SQL queries and saves the results back to S3 buckets. Complete the following steps to create a Visual ETL Job:

  1. On the SageMaker Unified Studio console, on the top menu, choose Build.
  2. Under DATA ANALYSIS & INTEGRATION, choose Data processing jobs.
  3. Choose Create Visual ETL Job.

You’ll be directed to the Visual ETL editor, where you can create ETL jobs. You can use this editor to design data transformation pipelines by connecting source nodes, transformation nodes, and target nodes.

  1. On the top left, choose the plus (+) icon in the circle. Under Data sources, select Amazon S3.
  2. Select the Amazon S3 source node and enter the following values:
    1. S3 URI: s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Apparel/
    2. Format: Parquet
  3. Select Update node.
  4. Choose the plus (+) icon in the circle to the right of the Amazon S3 source node. Under Transforms, select SQL query.
  5. Enter the following query statement and select Update node.
SELECT
    marketplace,
    star_rating,
    DATE_FORMAT(review_date, 'yyyy-MM-dd') as review_date,
    COUNT(*) as review_count,
    AVG(CAST(helpful_votes as DOUBLE) / NULLIF(total_votes, 0)) as helpfulness_ratio,
    COUNT(CASE WHEN insight = 'Y' THEN 1 END) as insight_count
FROM {myDataSource}
GROUP BY
    marketplace,
    star_rating,
    DATE_FORMAT(review_date, 'yyyy-MM-dd')
  1. Choose the plus (+) icon to the right of the SQL Query node. Under Data target, select Amazon S3.
  2. Select the Amazon S3 target node and enter the following values:
    1. S3 URI: Choose the Amazon S3 location from the project overview page and add the suffix “/output/rating_analysis/”. For example, s3://<bucket-name>/<domainId>/<projectId>/output/rating_analysis/
    2. Format: Parquet
    3. Compression: Snappy
    4. Partition keys: review_date
    5. Mode: Append
  3. Select Update node.

Next, add another SQL query node connected to the same Amazon S3 data source. This node performs a SQL query transformations and outputs the results to a separate S3 location.

  1. On the top left, choose the plus (+) icon in the circle. Under Transforms, select SQL query, and connect the Amazon S3 source node.
  2. Enter the following query statement and select Update node.
SELECT 
    marketplace,
    product_id,
    product_title,
    COUNT(*) as review_count,
    AVG(star_rating) as avg_rating,
    SUM(helpful_votes) as total_helpful_votes,
    COUNT(DISTINCT customer_id) as unique_reviewers,
    COUNT(CASE WHEN insight = 'Y' THEN 1 END) as insight_count
FROM {myDataSource}
GROUP BY 
    marketplace,
    product_id,
    product_title
  1. Choose the plus (+) icon to the right of the SQL Query node. Under Data target, select Amazon S3.
  2. Select the Amazon S3 target node and enter the following values:
    1. S3 URI: Choose the Amazon S3 location from the project overview page and add suffix “/output/product_analysis/”. For example, s3://<bucket-name>/<domainId>/<projectId>/output/product_analysis/
    2. Format: Parquet
    3. Compression: Snappy
    4. Partition keys: marketplace
    5. Mode: Append
  3. Select Update node.

At this point, your end-to-end visual job should look like the following image. The next step is to save this job to the project and run the job.

  1. On the top right, choose Save to project to save the draft job. You can optionally change the name and add a description.
  2. Choose Save.
  3. On the top right, choose Run.

This will start running your Visual ETL job. You can monitor the list of job runs by selecting View runs in the top middle of the screen.

Create and run a code based job

In addition to creating jobs through the Visual ETL Editor, you can create jobs using a code-based approach by specifying Python script or Notebook files. When you specify a Notebook file, it automatically converts to a Python script to create the job. Here, you’ll create a notebook in JupyterLab within SageMaker Unified Studio, save it to the project repository, and then create a code-based job from that notebook. First, create a Notebook.

  1. On the SageMaker Unified Studio console, on the top menu, choose Build.
  2. Under IDE & APPLICATIONS, select JupyterLab.
  3. Select Python 3 under Notebook.

  1. For the first cell, select Local Python, python, enter following code:
%%configure -n project.spark.compatibility
{
    "number_of_workers": 10,
    "session_type": "etl",
    "glue_version": "5.0",
    "worker_type": "G.1X",
    "idle_timeout": 10,
    "timeout": 1200
}
  1. For the second cell, select PySpark, project.spark.compatibility, enter following code. This performs the same processing as the Visual ETL job you created above. Replace the S3 bucket and folder names for output_path.
import sys
from pyspark.context import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

# Create Spark session
sc = SparkContext.getOrCreate()
spark = SparkSession.builder.getOrCreate()

# Configure paths
input_path = "s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Apparel/"
output_path = "s3://<bucket-name>/<domainId>/<projectId>/code-job-output/results"


# Read data from S3
df = spark.read.format("parquet").load(input_path)
df.createOrReplaceTempView("reviews")

# Transform 1: Rating Analysis
rating_analysis = spark.sql("""
    SELECT 
        marketplace,
        star_rating,
        DATE_FORMAT(review_date, 'yyyy-MM-dd') as review_date,
        COUNT(*) as review_count,
        AVG(CAST(helpful_votes as DOUBLE) / NULLIF(total_votes, 0)) as helpfulness_ratio,
        COUNT(CASE WHEN insight = 'Y' THEN 1 END) as insight_count
    FROM reviews
    GROUP BY 
        marketplace,
        star_rating,
        DATE_FORMAT(review_date, 'yyyy-MM-dd')
""")

# Transform 2: Product Analysis
product_analysis = spark.sql("""
    SELECT 
        marketplace,
        product_id,
        product_title,
        COUNT(*) as review_count,
        AVG(star_rating) as avg_rating,
        SUM(helpful_votes) as total_helpful_votes,
        COUNT(DISTINCT customer_id) as unique_reviewers,
        COUNT(CASE WHEN insight = 'Y' THEN 1 END) as insight_count
    FROM reviews
    GROUP BY 
        marketplace,
        product_id,
        product_title
    HAVING 
        COUNT(*) >= 5
""")

# Write results to S3
rating_analysis.write.format("parquet") \
    .option("compression", "snappy") \
    .partitionBy("review_date") \
    .mode("append") \
    .save(f"{output_path}/rating_analysis")

product_analysis.write.format("parquet") \
    .option("compression", "snappy") \
    .partitionBy("marketplace") \
    .mode("append") \
    .save(f"{output_path}/product_analysis")
  1. Choose the File icon to save the notebook file. Enter the name of your notebook.

Save the notebook to the project’s repository.

  1. Choose the Git icon in the left navigation. This opens a panel where you can view the commit history and perform Git operations.
  2. Choose the plus (+) icon next to the files you want to commit.
  3. Enter a brief summary of the commit in the Summary text entry field. Optionally, enter a longer description of the commit in the Description text entry field.
  4. Choose Commit.
  5. Choose the Push committed changes icon to do a git push.

Create the Code-based Job from the Notebook file in the project repository.

  1. On the SageMaker Unified Studio console, on the top menu, choose Build.
  2. Under DATA ANALYSIS & INTEGRATION, choose Data processing jobs.
  3. Choose Create job from files.
  4. Choose Choose project files and choose Browse files.
  5. Select the Notebook file you created and choose Select.

Here, the Python script automatically converted from your notebook file will be displayed. Review the content.

  1.  Choose Next.
  2. For Job name, enter the name of your job.
  3. Choose Submit to create your job.
  4. Choose the job you created.
  5. Choose Run job.

Convert existing Visual ETL flows to jobs

You can convert an existing visual ETL flow to a job by saving your existing Visual ETL flow to the project repository. Use the following steps to create a job from your existing visual ETL flow:

  1. On the SageMaker Unified Studio console, on the top menu, choose Build.
  2. Under DATA ANALYSIS & INTEGRATION, select Visual ETL editor.
  3. Select the existing Visual ETL flow.
  4. On the top right, choose Save to project to save the draft flow. You can optionally change the name and add a description.
  5. Choose Save.

View jobs

You can view the list of jobs in your project on the Data processing jobs page. Jobs can be filtered by mode (Visual ETL or Code).

Monitor job runs

On each job’s detail page, you can view a list of job runs in the Job runs tab. You can filter activities by job run ID, status, start time, and end time. The Job runs list shows basic attributes such as duration, resources consumed, and instance type, along with log group names and various job parameters. You can list, compare, and explore job runs history based on various attributes.

On the individual job run details page, you can view job properties and output logs from the run. When a job fails because of an error, you can see the error message at the top of the page and examine detailed error information in the output logs.

Intelligent troubleshooting with generative AI: When jobs fail, you can take advantage of generative AI troubleshooting to resolve issues quickly. SageMaker Unified Studio’s AI-powered troubleshooting automatically analyzes job metadata, Spark event logs, error stack traces, and runtime metrics to identify root causes and provide actionable solutions. It handles both simple scenarios like missing S3 buckets, and complex performance issues such as out-of-memory exceptions. The analysis explains not just what failed, but why it failed and how to fix it, reducing troubleshooting time from hours or days to minutes.

To start the analysis, choosing Troubleshoot with AI at the top right. The troubleshooting analysis provides Root Cause Analysis identifying the specific issue, Analysis Insights explaining the error context and failure patterns, and Recommendations with step-by-step remediation actions. This expert-level analysis makes complex Spark debugging accessible to all team members, regardless of their Spark expertise.

Clean up

To avoid incurring future charges, delete the resources you created during this walkthrough:

  1. Delete Visual ETL flows in Visual ETL editor.
  2. Delete Data processing jobs, including Visual ETL and Code-based jobs.
  3. Delete Output files in the S3 bucket.

Conclusion

In this post, we explored the new job experience in Amazon SageMaker Unified Studio, which brings a familiar and consistent experience for data processing and data integration tasks. This new capability streamlines your workflows by providing enhanced visibility, cost management, and seamless migration paths from AWS Glue.With the ability to create both visual and code-based jobs, monitor job runs, and set up scheduling, the new jobs experience helps you build and manage data processing and data integration tasks efficiently. Whether you’re a data engineer working on ETL processes or a data scientist preparing datasets for machine learning, the job experience in SageMaker Unified Studio provides the tools you need in a unified environment.Start exploring the new job experience today to simplify your data processing workflows and make the most of your data in Amazon SageMaker Unified Studio.


About the authors

Chiho Sugimoto is a Cloud Support Engineer on the AWS Big Data Support team. She is passionate about helping customers build data lakes using ETL workloads. She loves planetary science and enjoys studying the asteroid Ryugu on weekends.

Noritaka Sekiyama is a Principal Big Data Architect at the AWS Analytics product team. He’s responsible for designing new features in AWS products, building software artifacts, and providing architecture guidance to customers. In his spare time, he enjoys cycling on his road bike.

Matt Su is a Senior Product Manager on the AWS Glue team. He enjoys helping customers uncover insights and make better decisions using their data with AWS Analytics services. In his spare time, he enjoys skiing and gardening.

Revenue NSW modernises analytics with AWS, enabling unified and scalable data management, processing, and access

Post Syndicated from Saeed Barghi original https://aws.amazon.com/blogs/big-data/revenue-nsw-modernises-analytics-with-aws-enabling-unified-and-scalable-data-management-processing-and-access/

Revenue NSW, in Australia, is New South Wales (NSW) state’s principal revenue management agency and aspires to be the world’s most innovative and customer-centric revenue agency. Revenue NSW exists to administer grants, resolve fines, and collect revenue to fund essential state services for the over 8 million people of NSW in a fair, efficient, and timely manner.

Analytics at Revenue NSW plays a key role in enabling the organization’s goals and purpose by delivering reliable, secure, and authoritative insights. These insights are key to:

  • Understanding customer attributes to enable empathetic and informed actions
  • Supporting policy development
  • Assisting in the sequencing of millions of decisions
  • Maintaining compliance and education
  • Fostering transparency by providing open data and insights directly to the public

The challenge

Revenue NSW Analytics consumes data from a multitude of operational databases and real-time interfaces and through internally generated reports and files received from external data partners such as other government departments and agencies. The varying technologies, formats, and complexities of these data sources created friction and inefficiencies in data transformation, consolidation, and analysis in an environment that is often time-critical. In addition, these analytics systems were previously hosted on dedicated hardware on-premises that was nearing end-of-life and wasn’t easy to scale efficiently. To address these challenges, Revenue NSW Analytics used their partnership with AWS to build a strategic, unified, scalable, frictionless and modern data environment to help them standardize data transformation and consolidation pipelines from the hundreds of data sources. Additionally, the modern data environment must provide a single source of truth and enable secure and seamless access to the data through a unified SQL interface regardless of the data’s original format or technology.

After understanding other offerings, Revenue NSW Analytics decided on a proof of concept (PoC) using Amazon Web Services (AWS) cloud-based services, including Amazon Redshift. The key goals of the PoC were to assess the completeness of the solution, its performance, and the potential change in total cost of ownership compared to their on-premises setup.

Amazon Redshift, with its integration options, columnar storage, and massively parallel processing (MPP) architecture, offered the desired end-state solution. Tests demonstrated a typical speed increase between 5- and 50-fold in query execution, with many results 100 times faster than the existing on-premises solution. Amazon Redshift also performed significantly better compared with other cloud-based solutions, offering up to 6 times better performance. The success of the initial PoC led Revenue NSW Analytics to further collaborate with AWS, working towards developing a prototype that incorporated Amazon Redshift alongside various data ingestion patterns.

The solution

To simplify data ingestion from the operational databases—which run on different database engines including Oracle, PostgreSQL, and Microsoft SQL—Revenue NSW Analytics used AWS Database Migration Service (AWS DMS) to perform a bulk initial load, followed by capturing ongoing changes from these databases into Amazon Redshift in near real time.

For data from Salesforce’s real-time API, Revenue NSW Analytics used Amazon AppFlow to automate the continuous pulling and ingesting of data into Amazon Redshift.

The hundreds of structured and semi-structured data files were handled using AWS Glue. These files are regularly uploaded to Amazon Simple Storage Service (Amazon S3), triggering the relevant AWS Glue extract, transform, and load (ETL) jobs in an event-based architecture to transfer the data into Amazon Redshift.

To facilitate repeatability and enable iteration, Revenue NSW Analytics used infrastructure-as-code (IaC) and continuous integration and delivery (CI/CD) pipelines to deploy the different components of the solution.

The following is a high-level architecture demonstrating how these different components and services fit together.

Along with standardization and unified access, the success criteria of the new data environment included the ease of transition, consolidation of processes to the new standardised pipelines, scalability, language uniformity, and availability. The combination of supporting standard SQL, AWS DMS, and Amazon AppFlow low-code capabilities, and supporting Python in AWS Glue, a popular programming language, played a crucial role in facilitating the successful transition and adoption of the cloud-based data environment.

Other success factors of this environment include the ability to work within current budgets, and the extendibility and modularity of the solution. As shown in the preceding high-level architecture, the solution runs on multiple building blocks that are decoupled, modular, and either serverless—like AWS Glue—or managed services that support seamless scalable configurations that don’t require rebuilds. This allowed Revenue NSW Analytics to start small with each use case, expand and grow as required, and pay only for what they need.

Moreover, with the new cloud-based data environment, Revenue NSW Analytics can access to up-to-date data in near real time, which is essential to fulfilling critical use cases such as information requests and assisting with compliance case identification. The automated data ingestion pipelines removed much of the boilerplate and heavy lifting, allowing Revenue NSW teams to work more efficiently and focus on the differentiators of their business, and in some cases, shorten workflow times from months to weeks or days.

Another significant factor contributing to the project’s success is the people at the heart of Revenue NSW Analytics. The teams allocated to own and deliver this platform are cross-functional, with adjoining responsibilities and skills, and were prepared through multiple in-person and online training sessions. The teams were empowered to trial individual services to deliver new use cases and iterate on the solution to learn from successes and innovate progressively. This approach, together with support Revenue NSW received from AWS specialist solution architects, helped to minimize the risk of knowledge gaps that often arise when separate teams are responsible for building and operating a system.

The hard work of the Analytics team, the investment of Revenue NSW Analytics leadership in its people, and the continuous support from AWS can truly be seen throughout the delivery of the data environment, resulting in the achievement of the intended outcomes.

Conclusion and call to action

Since going live with their cloud-based data environment on AWS, Revenue NSW has onboarded dozens of analysts who can get more done in less time. This is a result of establishing a single source of truth from different data sources in Amazon Redshift, so that analysts and data consumers don’t need to shop around to find the data that they need to complete their tasks. This new data environment also provides Revenue NSW with the ability to create improved conditions for:

  • Increasing agility by exposing reusable, trusted data services for people and AI
  • Empowering operational systems with services best provided by analytical approaches
  • Decommissioning heritage, costly infrastructure and data practices.

Successful delivery of the cloud-based data environment on AWS has led to further collaboration between AWS and Revenue NSW. This includes exploring the adoption of AI and machine learning (AI/ML) and generative AI to further improve the delivery of services for the people of NSW.

To learn more about customer success stories like this or how to get started with building a data environment on AWS, contact your AWS account team. You can read about similar customers by browsing Customer Success Stories on our website.


About the authors

Saeed BarghiSaeed Barghi is a Sr. Specialist Solutions Architect at Amazon Web Services (AWS) specializing in architecting enterprise data platforms and AI solutions. Based in Melbourne, Australia, Saeed works with public sector customers in Australia and New Zealand and helps his customers build fit-for-purpose and future-proof data platforms and AI solutions.

Miroslaw (Mick) Mioduszewski is the Director of Analytics at Revenue NSW Department of Customer service in NSW. He held multiple C-level roles in private and public companies as well as government, e.g. COO and CIO, as well as serving as company director. Mick holds computer science and business degrees, is a fellow of the Australian Institute of Company Directors and an industry fellow at the University of technology, Sydney.

Moha Alsouli is a Public Sector Solutions Architect at Amazon Web Services (AWS) in Sydney. He is dedicated to supporting state and local government customers deliver citizen services, through solution design, reviews, optimisation, and architecture guidance. Moha is also specialising in generative artificial intelligence (AI) on AWS.

Harnessing the Power of Nested Materialized Views and exploring Cascading Refresh

Post Syndicated from Ritesh Sinha original https://aws.amazon.com/blogs/big-data/harnessing-the-power-of-nested-materialized-views-and-exploring-cascading-refresh/

Amazon Redshift materialized views enables you to significantly improve performance of complex queries. Materialized views store precomputed query results that future similar queries can utilize, offering a powerful solution for data warehouse environments where applications often need to execute resource-intensive queries against large tables. This optimization technique enhances query speed and efficiency by allowing many computation steps to be skipped, with precomputed results returned directly. Materialized views are particularly useful for speeding up predictable and repeated queries, such as those used to populate dashboards or generate reports. Instead of repeatedly performing resource-intensive operations, applications can query a materialized view and retrieve precomputed results, leading to significant performance gains and improved user experience. Additionally, materialized views can be incrementally refreshed, applying logic only to changed data when data manipulation language (DML) changes are made to the underlying base tables, further optimizing performance and maintaining data consistency.

This post demonstrates how to maximize your Amazon Redshift query performance by effectively implementing materialized views. We’ll explore creating materialized views and implementing nested refresh strategies, where materialized views are defined in terms of other materialized views to expand their capabilities. This approach is particularly powerful for reusing precomputed joins with different aggregate options, significantly reducing processing time for complex ETL and BI workloads. Let’s explore how to implement this powerful feature in your data warehouse environment.

Introduction to Nested Materialized Views

Nested materialized views in Amazon Redshift allow you to create materialized views based on other materialized views. This capability enables a hierarchical structure of precomputed results, significantly enhancing query performance and data processing efficiency. With nested materialized views, you can build multi-layered data abstractions, creating increasingly complex and specialized views tailored to specific business needs.This layered approach offers several advantages:

  • Improved Query Performance: Each level of the nested materialized view hierarchy serves as a cache, allowing queries to quickly access pre-computed data without the need to traverse the underlying base tables.
  • Reduced Computational Load: By offloading the computational work to the materialized view refresh process, you can significantly reduce the runtime and resource utilization of your day-to-day queries.
  • Simplified Data Modeling: Nested materialized views enable you to create a more modular and extensible data model, where each layer represents a specific business concept or use case.
  • Incremental Refreshes: The Redshift materialized views support incremental refreshes, allowing you to update only the changed data within the nested hierarchy, further optimizing the refresh process.
  • Cascading Materialized Views: The Redshift materialized views support automatic handling of Extract, Load, and Transform (ELT) style workloads, minimizing the need for manual creation and management of these processes.

You can implement nested materialized views using the CREATE MATERIALIZED VIEW statement, which allows referencing other materialized views in the definition. Common use cases include:

  • Modular data transformation pipelines
  • Hierarchical aggregations for progressive analysis
  • Multi-level data validation pipelines
  • Historical data snapshot management
  • Optimized BI reporting with precomputed results

Architecture

architecture

Architectural diagram depicting Amazon Redshift’s nested materialized view structure. Shows multiple base tables (orange) connecting to materialized views (red), with connections to a nested view layer and data sharing table (green). Includes integration points for users and QuickSight visualization.

  1. Base Table(s): These are the underlying base tables that contain the raw data for your data warehouse. It can be local tables or data sharing tables.
  2. Base Materialized View(s): These are the first-level materialized views that are created directly on top of the base tables. These views encapsulate common data transformations and aggregations. This can serve as the base for the nested materialized view and also be accessed by users directly.
  3. Nested Materialized View(s): These are the second level (or higher) materialized views that are created based on the base materialized views. The nested materialized view can further aggregate, filter, or transform the data from the base materialized views.
  4. Application/Users/BI Reporting: The application or business intelligence (BI) tools interact with the nested materialized views to generate reports and dashboards. The nested views provide a more optimized and precomputed data structure for efficient querying and reporting.

Creating and using nested materialized views

To demonstrate how nested materialized views work in Amazon Redshift, we’ll use the TPC-DS dataset. We’ll create three queries using the STORE, STORE_SALES, CUSTOMER, and CUSTOMER_ADDRESS tables to simulate data warehouse reports. This example will illustrate how multiple reports can share result sets and how materialized views can improve both resource efficiency and query performance.Let’s consider the following queries as dashboard queries:

SELECT cust.c_customer_id,
cust.c_first_name, 
cust.c_last_name, 
sales.ss_item_sk, 
sales.ss_quantity, 
cust.c_current_addr_sk 
FROM store_sales sales INNER JOIN customer cust
ON sales.ss_customer_sk = cust.c_customer_sk;

SELECT cust.c_customer_id,
cust.c_first_name, 
cust.c_last_name, 
sales.ss_item_sk, 
sales.ss_quantity, 
cust.c_current_addr_sk, 
store.s_store_name
FROM store_sales sales INNER JOIN customer cust
ON sales.ss_customer_sk = cust.c_customer_sk
INNER JOIN store store
ON sales.ss_store_sk = store.s_store_sk;

SELECT cust.c_customer_id, 
cust.c_first_name, cust.c_last_name, 
sales.ss_item_sk, 
sales.ss_quantity, 
addr.ca_state
FROM store_sales sales INNER JOIN customer cust
ON sales.ss_customer_sk = cust.c_customer_sk
INNER JOIN store store
ON sales.ss_store_sk = store.s_store_sk
INNER JOIN customer_address addr
ON cust.c_current_addr_sk = addr.ca_address_sk;

Notice that the join between STORE_SALES and CUSTOMER tables is present at all 3 queries (dashboards).

The second query adds a join with STORE table and the third query is the second one with an extra join with CUSTOMER_ADDRESS table. This pattern is common in business intelligence scenarios. As mentioned earlier, using a materialized view can speed up queries because the result set is stored and ready to be delivered to the user, avoiding reprocessing of the same data. In cases like this, we can use nested materialized views to reuse already processed data.When transforming our queries into a set of nested materialized views, the result would be as below:

CREATE MATERIALIZED VIEW StoreSalesCust as
SELECT cust.c_customer_id, 
cust.c_first_name, 
cust.c_last_name, 
sales.ss_item_sk, 
sales.ss_store_sk, 
sales.ss_quantity, 
cust.c_current_addr_sk
FROM store_sales sales INNER JOIN customer cust
ON sales.ss_customer_sk = cust.c_customer_sk;

CREATE MATERIALIZED VIEW StoreSalesCustStore as
SELECT storesalescust.c_customer_id, 
storesalescust.c_first_name, 
storesalescust.c_last_name, 
storesalescust.ss_item_sk, 
storesalescust.ss_quantity, 
storesalescust.c_current_addr_sk, 
store.s_store_name
FROM StoreSalesCust storesalescust INNER JOIN store store
ON storesalescust.ss_store_sk = store.s_store_sk;

CREATE MATERIALIZED VIEW StoreSalesCustAddress as
SELECT storesalescuststore.c_customer_id, 
storesalescuststore.c_first_name, 
storesalescuststore.c_last_name, 
storesalescuststore.ss_item_sk, 
storesalescuststore.ss_quantity, 
addr.ca_state
FROM StoreSalesCustStore storesalescuststore INNER JOIN customer_address addr
ON storesalescuststore.c_current_addr_sk = addr.ca_address_sk;

Nested materialized views can improve performance and resource efficiency by reusing initial view results, minimizing redundant joins, and working with smaller result sets. This creates a hierarchical structure where materialized views depend on one another. Due to these dependencies, you must refresh the views in a specific order.

message

SQL query result indicating dependency issue for REFRESH MATERIALIZED VIEW StoreSalesCustAddress.

With the new option “REFRESH MATERIALIZED VIEW mv_name CASCADE” you will be able to refresh the entire chain of dependencies for the materialized views you have. Note that in this example we are using the third materialized view, StoreSalesCustAddress, and this will refresh all 3 materialized views because they are dependent on each other.

message

SQL query showing successful CASCADE refresh of StoreSalesCustAddress materialized view in Amazon Redshift.

If we use the second materialized view with the CASCADE option, we will refresh only the first and second materialized views, leaving the third unchanged. This may be useful when we need to keep some materialized views with less current data than others.

The SVL_MV_REFRESH_STATUS system view reveals the refresh sequence of materialized views. When triggering a cascade refresh on StoreSalesCustAddress, the system follows the dependency chain we established: StoreSalesCust refreshes first, followed by StoreSalesCustStore, and finally StoreSalesCustAddress. This demonstrates how the refresh operation respects the hierarchical structure of our materialized views.

result

SQL query result from SVL_MV_REFRESH_STATUS showing successful recomputation of three materialized views.

Considerations

Consider a dependency chain where StoreSalesCust (A) → StoreSalesCustStore (B) → StoreSalesCustAddress (C).

  • The CASCADE refresh behavior works as follows:
    • When refreshing C with CASCADE: A, B, and C will all be refreshed.
    • When refreshing B with CASCADE: Only A and B will be refreshed.
    • When refreshing A with CASCADE: Only A will be refreshed.
    • If you specifically need to refresh A and C but not B, you must perform separate refresh operations without using CASCADE—first refresh A, then refresh C directly.

Best Practices for Materialized View

  • Improve the source query: Start with a well-optimized SELECT statement for your materialized view. This is especially important for views that need full rebuilds during each refresh.
  • Plan refresh strategies: When creating materialized views that depend on other materialized views, you cannot use AUTO REFRESH YES. Instead, implement orchestrated refresh mechanisms using Redshift Data API with Amazon EventBridge for scheduling and AWS Step Functions for workflow management.
  • Leverage distribution and sort keys: Properly configure distribution and sort keys on materialized views based on their query patterns to optimize performance. Well-chosen keys improve query speed and reduce I/O operations.
  • Consider incremental refresh capability: When possible, design materialized views to support incremental refresh, which only updates changed data rather than rebuilding the entire view, greatly improving refresh performance.
  • To learn more about the Automated materialized view (auto-MV) feature to boost your workload performance, this intelligent system monitors your workload and automatically creates materialized views to enhance overall performance. For more detailed information on this feature, please refer to Automated materialized views.

Clean up

Complete the following steps to clean up your resources:

  • Delete the Redshift provisioned replica cluster or the Redshift serverless endpoints created for this exercise

or

  • Drop only the Materialized view which you have created for testing

Conclusion

This post showed how to create nested Amazon Redshift materialized views and refresh the child materialized views using the new REFRESH CASCADE option. You can quickly build and maintain efficient data processing pipelines and seamlessly extend the low latency query execution benefits of materialized views to data analysis.


About the authors

Ritesh Kumar Sinha is an Analytics Specialist Solutions Architect based out of San Francisco. He has helped customers build scalable data warehousing and big data solutions for over 16 years. He loves to design and build efficient end-to-end solutions on AWS. In his spare time, he loves reading, walking, and doing yoga.

Raza Hafeez is a Senior Product Manager at Amazon Redshift. He has over 13 years of professional experience building and optimizing enterprise data warehouses and is passionate about enabling customers to realize the power of their data. He specializes in migrating enterprise data warehouses to AWS Modern Data Architecture.

Ricardo Serafim is a Senior Analytics Specialist Solutions Architect at AWS. He has been helping companies with Data Warehouse solutions since 2007.

Realizing ocean data democratization: Furuno Electric’s initiatives using Amazon DataZone

Post Syndicated from Akira Mikami original https://aws.amazon.com/blogs/big-data/realizing-ocean-data-democratization-furuno-electrics-initiatives-using-amazon-datazone/

This is a guest post authored by Akira Mikami, a technical expert at Furuno Electric. The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.

Since successfully commercializing the world’s first fish finder in 1948, Furuno Electric has been developing unique ultrasonic and electronic technologies in the marine electronics field. Under the company motto of “making the invisible visible”, they’ve have expanded their business centered on marine sensing technology and are now extending into subscription-based data businesses using Internet of Things (IoT) data. They’re are actively promoting the planning and development of data businesses to realize their new management vision outlined in FURUNO GLOBAL VISION NAVI NEXT 2030.

Like many manufacturing companies, Furuno Electric faced significant changes in revenue structure and technical architecture as they transitioned from traditional business to data-driven business. To succeed in this transformation, it was essential to build a foundation that promotes data utilization across the entire organization.

This post demonstrates how Furuno Electric built their system using Amazon DataZone and other Amazon Web Services (AWS) services to address technical infrastructure fragmentation, establish proper security governance, and develop an effective data business promotion system as part of their journey transitioning from a traditional manufacturing company to a data-driven business.

Challenges

Furuno Electric faced three specific challenges in promoting their data business: technical infrastructure fragmentation and duplication, lack of security governance, and underdeveloped data business promotion system.

Project managers in the data business were independently designing and building data infrastructure, resulting in duplication of components for data collection, processing, and storage. This situation created wasteful development investments, hindered effective use of common data, and caused inefficient states that took time to launch businesses. Marine data services including fishing vessel data collection and sharing system, FWC, and Furuno Open Platform (FOP) had similar functions implemented separately for each project along the functional axes of data collection, processing, visualization, and analysis, resulting in unnecessary workload across the organization.

Security measures were considered and implemented separately by each department, and although checklists existed, they weren’t applied uniformly. This resulted in a lack of consistency in security measures, duplicate consideration costs for each department, and uncertainty in the comprehensiveness of measures. Integrated risk management was also difficult.

The organizational structure wasn’t prepared for the iterative development processes and long-term revenue models specific to data businesses, and there was a lack of mechanisms for cross-departmental data utilization and joint development. The distributed operational structure across departments made it difficult to rapidly deploy and continuously improve data businesses. In the process of creating data businesses, it became necessary to build entirely different customer relationships compared to traditional product sales businesses. In terms of organizational management and talent strategy, there was a need to transition from a top-down, risk-averse, specialized skill-focused structure to a bottom-up, challenge-oriented structure that emphasizes communication skills and diversity.

Solution overview

Furuno Electric built a data management foundation centered on Amazon DataZone, Amazon Simple Storage Service (Amazon S3), AWS Glue, and AWS Control Tower, a comprehensive solution designed to address each of the three challenges mentioned in the preceding section.

Building the integrated data platform JuBuRaw

To address technical infrastructure fragmentation and duplication, they built Junction Architecture of Business Raw Data (JuBuRaw), a platform that consolidates common components for data collection, storage, management, and authentication. Using AWS Cloud Development Kit (AWS CDK) to code the infrastructure, they achieved standardization and automation of environment construction. This provides consistency and reproducibility, making it easier to add new systems and migrate existing systems to the common platform. Merely by executing CDK, a standard data pipeline (using AWS IoT Core, Amazon S3, AWS Glue, Amazon Kinesis, Amazon API Gateway, and AWS Lambda) for a specific system is automatically built. This eliminates duplicate design and development within the organization, reducing business launch time and improving fixed cost management. By standardizing common functions, they reduced the management and operation costs of existing systems and enabled the launch of new systems in half the time compared to before.

The following diagram is the overall JuBuRaw architecture.

Security control with AWS Control Tower

To address the lack of security governance, they implemented a comprehensive security framework centered on AWS Control Tower to apply consistent security policies across multiple accounts. With automated monitoring systems using AWS Security Hub, AWS Config, and AWS CloudTrail and an integrated authentication system using AWS IAM Identity Center, they provide security consistency while reducing operational costs and management burden.

With the organization’s management account at the top, they placed AWS Control Tower, AWS Organizations, and AWS IAM Identity Center to achieve hierarchical security management. By adopting a multi-layered defense structure consisting of account baselines with AWS CloudTrail and AWS Config enabled, log archive environments, and audit and security operation environments, consistent security policies are applied to all accounts, enabling early detection and response to security incidents. This integrated approach has reduced the workload for security responses. This configuration is shown in the following diagram.

Establishing a data democratization foundation with Amazon DataZone

To address the underdeveloped data business promotion system, they introduced Amazon DataZone to streamline data discovery, sharing, and governance across the organization. They clarified the role division between the infrastructure management team and the data management team, centralizing data security policies, quality management, and metadata standardization. With a project-based collaboration environment, they promoted cross-departmental data utilization, establishing a foundation to support the creation and continuous monetization of data businesses.

Organizational reform and operational structure establishment

In parallel with the introduction of technical solutions, they implemented organizational reforms to support medium- to long-term data utilization. The new organizational structure consists of three main roles: the infrastructure management team, the data management team, and the chief data officer. The following chart shows this organizational structure.

The infrastructure management team is responsible for maintaining and developing the technical foundation of the platform, managing multiple accounts using AWS Control Tower, applying and monitoring security baselines, and tracking infrastructure version management and changing history. By specializing in common technologies, they can provide a stable platform.

The data management team is responsible for data quality management and continual improvement using AWS Glue Data Quality, standardization and maintenance of metadata, definition and application of data security policies, management of Amazon DataZone data Catalog, and providing data governance using Amazon DataZone. To maximize the value of data, they focus on deeply understanding business requirements and data characteristics and performing appropriate data management.

The chief data officer is responsible for formulating data business strategies and determining direction, promoting coordination and collaboration between teams, making decisions regarding the evolution of the data management foundation, and fostering a data utilization culture throughout the organization. From a strategic perspective, they oversee the whole and bridge business goals and technology.

This clear division of roles has established an operational structure for effective data utilization, accelerating the data business creation process. Additionally, clarifying data ownership has improved data quality and reliability, promoting data utilization across the organization. This structure is sustainable and can flexibly respond to technological changes and changes in the business environment.

Benefits of the modernized platform

As a concrete application example of the integrated data platform JuBuRaw and organizational structure explained in the previous section, we introduce the migration project of the existing service SHIPS. This use case is a comprehensive migration case that uses all three solution elements of data collection, management, and utilization mentioned earlier.

Furuno Electric provides a system called SHIPS that plots ship position information and monitors the status of equipment installed on ships. By migrating this existing service to the JuBuRaw foundation, the several functional enhancements are expected.

In terms of data integration enhancement, by using the data catalog function of Amazon DataZone, it becomes easier to integrate not only ship position information but also various data sources such as internal systems, IoT devices, other company systems, automatic identification system (AIS) data, and weather and sea condition data. This enables swift data analysis and comprehensive ship management, which means operators can detect potential issues and implement preventive measures before they develop into serious problems. Particularly important is that by storing this data in a common data lake and retaining them as master data, they create an environment where the data can be easily used by other applications.

For security enhancement, organizations can use Amazon DataZone federated governance with publish-subscribe (pubsub) workflow mechanism and fine-grained access control capabilities. This means they can implement detailed permissions management specifically for data assets, rows, and columns while maintaining unified access control and data governance across multiple AWS accounts and organizational boundaries.

In this case, by using the new integrated data management foundation, it becomes possible to integrate individually designed and built data foundations, improving both efficiency and functionality. A consistent data flow from data sources to the data platform and then to individual applications is realized, enabling flexible data utilization centered on the data lake. Linkage with each application can also be easily realized from the data lake, providing expandability for future data utilization.

This SHIPS migration case is a comprehensive approach using the solution elements of the JuBuRaw foundation and is expected to serve as a reference model for future system migrations. It’s expected to achieve both service quality improvement and operational cost reduction.

Future vision and next steps

Based on the data management foundation they’ve built, Furuno Electric aims to further expand and deepen data utilization. As part of their plan to continue and expand digital transformation, they’re currently starting with the migration of SHIPS, but plan to gradually migrate other IoT-related services (such as FOP, FWC, and Ichidake) to the new data management foundation in the future. This is expected to further strengthen the foundation for company-wide data utilization and enhance synergies between services.

Continuous enhancement of secure data sharing and access control is also essential. With the increase in data and expansion of utilization scope, the importance of security and access control will further increase. They’ll optimize the balance between data protection and utilization while incorporating practices accumulated through operations.

Additionally, Furuno Electric is exploring the expansion of their data management capabilities to Amazon SageMaker, specifically using Amazon SageMaker Catalog integrated with Amazon DataZone. This integration will enable them to seamlessly extend their existing data analytics governance workflows into artificial intelligence and machine learning (AI/ML) workloads. By applying the same data discovery, data sharing, and access control foundation across both data analytics and AI model development, they can accelerate the development of new AI-powered services. The unified governance framework will also provide secure and efficient AI adoption throughout the organization.

Through these initiatives, Furuno Electric is realizing their company motto of “making the invisible visible” in the field of data business as well. The integrated data platform JuBuRaw isn’t just an integration of technical foundations but serves as a foundation to support organizational culture transformation and the creation of new business models. As seen in the SHIPS migration case, using this foundation not only enhances existing services but also expands possibilities for new data utilization.

Through building a data foundation that can flexibly respond to business growth and changes while using a cloud-based environment, Furuno Electric has successfully led their digital transformation. They’ll continue to provide new value to customers through the democratization of marine data and accelerate the transition to data-driven business.

This case serves as a reference for many manufacturing companies promoting data utilization, showing that approaches from both technical and organizational perspectives are key to success. As Furuno Electric’s initiatives demonstrate, data democratization and effective utilization play an important role in the digital transformation of manufacturing.


About the Authors

Akira Mikami is a technical expert who played a central role in the FURUNO Data Platform (JuBuRaw) Construction Project at Furuno Electric Co., Ltd. Specializing in data platform construction and architecture, he led the implementation of cloud solutions utilizing AWS. He contributed to achieving efficient data management and strengthening team collaboration, leading the project to success.

Junpei Ozono is a Sr. Go-to-market (GTM) Data & AI solutions architect at Amazon Web Services (AWS) in Japan. He drives technical market creation for data and AI solutions while collaborating with global teams to develop scalable GTM motions. He guides organizations in designing and implementing innovative data-driven architectures powered by AWS services, helping customers accelerate their cloud transformation journey through modern data and AI solutions. His expertise spans across modern data architectures including data mesh, data lakehouse, and generative AI, so customers can build scalable and innovative solutions on Amazon Web Services (AWS).

Mitsuhiko Nishida is an Enterprise Solutions Architecture Automotive & Manufacturing Group Solutions Architect at Amazon Web Services (AWS) in Japan. He serves as a field Solutions Architect for manufacturing customers, helping them solve their business challenges. With expertise in generative AI and manufacturing IT, he guides the design and implementation of innovative solutions leveraging cutting-edge technologies. He supports manufacturing customers in building efficient architecture powered by AWS services to accelerate their cloud transformation journey and contribute to their digital transformation initiatives.

Develop and monitor a Spark application using existing data in Amazon S3 with Amazon SageMaker Unified Studio

Post Syndicated from Amit Maindola original https://aws.amazon.com/blogs/big-data/develop-and-monitor-a-spark-application-using-existing-data-in-amazon-s3-with-amazon-sagemaker-unified-studio/

Organizations face significant challenges managing their big data analytics workloads. Data teams struggle with fragmented development environments, complex resource management, inconsistent monitoring, and cumbersome manual scheduling processes. These issues lead to lengthy development cycles, inefficient resource utilization, reactive troubleshooting, and difficult-to-maintain data pipelines.These challenges are especially critical for enterprises processing terabytes of data daily for business intelligence (BI), reporting, and machine learning (ML). Such organizations need unified solutions that streamline their entire analytics workflow.

The next generation of Amazon SageMaker with Amazon EMR in Amazon SageMaker Unified Studio addresses these pain points through an integrated development environment (IDE) where data workers can develop, test, and refine Spark applications in one consistent environment. Amazon EMR Serverless alleviates cluster management overhead by dynamically allocating resources based on workload requirements, and built-in monitoring tools help teams quickly identify performance bottlenecks. Integration with Apache Airflow through Amazon Managed Workflows for Apache Airflow (Amazon MWAA) provides robust scheduling capabilities, and the pay-only-for-resources-used model delivers significant cost savings.

In this post, we demonstrate how to develop and monitor a Spark application using existing data in Amazon Simple Storage Service (Amazon S3) using SageMaker Unified Studio.

Solution overview

This solution uses SageMaker Unified Studio to execute and oversee a Spark application, highlighting its integrated capabilities. We cover the following key steps:

  1. Create an EMR Serverless compute environment for interactive applications using SageMaker Unified Studio.
  2. Create and configure a Spark application.
  3. Use TPC-DS data to build and run the Spark application using a Jupyter notebook in SageMaker Unified Studio.
  4. Monitor application performance and schedule recurring runs with Amazon MWAA integrated.
  5. Analyze results in SageMaker Unified Studio to optimize workflows.

Prerequisites

For this walkthrough, you must have the following prerequisites:

Add EMR Serverless as compute

Complete the following steps to create an EMR Serverless compute environment to build your Spark application:

  1. In SageMaker Unified Studio, open the project you created as a prerequisite and choose Compute.
  2. Choose Data processing, then choose Add compute.
  3. Choose Create new compute resources, then choose Next.

  1. Choose EMR Serverless, then choose Next.

  1. For Compute name, enter a name.
  2. For Release label, choose emr-7.5.0.
  3. For Permission mode, choose Compatibility.
  4. Choose Add compute.

It takes a few minutes to spin up the EMR Serverless application. After it’s created, you can view the compute in SageMaker Unified Studio.

The preceding steps demonstrate how you can set up an Amazon EMR Serverless application in SageMaker Unified Studio to run interactive PySpark workloads. In subsequent steps, we build and monitor Spark applications in an interactive JupyterLab workspace.

Develop, monitor, and debug a Spark application in a Jupyter notebook within SageMaker Unified Studio

In this section, we build a Spark application using the TPC-DS dataset within SageMaker Unified Studio. With Amazon SageMaker Data Processing, you can focus on transforming and analyzing your data without managing compute capacity or open source applications, saving you time and reducing costs. SageMaker Data Processing provides a unified developer experience from Amazon EMR, AWS Glue, Amazon Redshift, Amazon Athena, and Amazon MWAA in a single notebook and query interface. You can automatically provision your capacity on Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2) or EMR Serverless. Scaling rules manage changes to your compute demand to optimize performance and runtimes. Integration with Amazon MWAA simplifies workflow orchestration by alleviating infrastructure management needs. For this post, we use EMR Serverless to read and query the TPC-DS dataset within a notebook and run it using Amazon MWAA.

Complete the following steps:

  1. Upon completion of the previous steps and prerequisites, navigate to SageMaker Studio and open your project.
  2. Choose Build and then JupyterLab.

The notebook takes about 30 seconds to initialize and connect to the space.

  1. Under Notebook, choose Python 3 (ipykernel).
  2. In the first cell, next to Local Python, choose the dropdown menu and choose PySpark.
  3. Choose the dropdown menu next to Project.Spark and choose EMR-S Compute.
  4. Run the following code to develop your Spark application. This example reads a 3 TB TPC-DS dataset in Parquet format from a publicly accessible S3 bucket:
spark.read.parquet("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/store/").createOrReplaceTempView("store")

After the Spark session starts and execution logs start to populate, you can explore the Spark UI and driver logs to further debug and troubleshoot Spark progra The following screenshot shows an example of the Spark UI. The following screenshot shows an example of the driver logs. The following screenshot shows the Executors tab, which provides access to the driver and executor logs.

  1. Use the following code to read some more TPC-DS datasets. You can create temporary views and use the Spark UI to see the files being read. Refer to the appendix at the end of this for details on using the TPC-DS dataset within your buckets.
spark.read.parquet("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/item/").createOrReplaceTempView("item")
spark.read.parquet("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/store_sales/").createOrReplaceTempView("store_sales")
spark.read.parquet("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/date_dim/").createOrReplaceTempView("date_dim")
spark.read.parquet("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/customer/").createOrReplaceTempView("customer")
spark.read.parquet("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/catalog_sales/").createOrReplaceTempView("catalog_sales")
spark.read.parquet("s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/web_sales/").createOrReplaceTempView("web_sales")

In each cell of your notebook, you can expand Spark Job Progress to view the stages of the job submitted to EMR Serverless for a specific cell. You can see the time taken to complete each stage. In addition, if a failure occurs, you can examine the logs, making troubleshooting a seamless experience.

Because the files are partitioned based on date key column, you can observe that Spark runs parallel tasks for reads.

  1. Next, get the count across the date time keys on data that is partitioned based on the time key using the following code:
select count(1), ss_sold_date_sk from store_sales group by ss_sold_date_sk order by ss_sold_date_sk

Monitor jobs in the Spark UI

On the Jobs tab of the Spark UI, you can see a list of complete or actively running jobs, with the following details:

  • The action that triggered the job
  • The time it took (for this example, 41 seconds, but timing will vary)
  • The number of stages (2) and tasks (3,428); these are for reference and specific to this specific example

You can choose the job to view more details, particularly around the stages. Our job has two stages; a new stage is created whenever there is a shuffle. We have one stage for the initial reading of each dataset, and one for the aggregation. In the following example, we run some TPC-DS SQL statements that are used for performance and benchmarks:

 with frequent_ss_items as
 (select substr(i_item_desc,1,30) itemdesc,i_item_sk item_sk,d_date solddate,count(*) cnt
  from store_sales, date_dim, item
  where ss_sold_date_sk = d_date_sk
    and ss_item_sk = i_item_sk
    and d_year in (2000, 2000+1, 2000+2,2000+3)
  group by substr(i_item_desc,1,30),i_item_sk,d_date
  having count(*) >4),
 max_store_sales as
 (select max(csales) tpcds_cmax
  from (select c_customer_sk,sum(ss_quantity*ss_sales_price) csales
        from store_sales, customer, date_dim
        where ss_customer_sk = c_customer_sk
         and ss_sold_date_sk = d_date_sk
         and d_year in (2000, 2000+1, 2000+2,2000+3)
        group by c_customer_sk) x),
 best_ss_customer as
 (select c_customer_sk,sum(ss_quantity*ss_sales_price) ssales
  from store_sales, customer
  where ss_customer_sk = c_customer_sk
  group by c_customer_sk
  having sum(ss_quantity*ss_sales_price) > (95/100.0) *
    (select * from max_store_sales))
 select sum(sales)
 from (select cs_quantity*cs_list_price sales
       from catalog_sales, date_dim
       where d_year = 2000
         and d_moy = 2
         and cs_sold_date_sk = d_date_sk
         and cs_item_sk in (select item_sk from frequent_ss_items)
         and cs_bill_customer_sk in (select c_customer_sk from best_ss_customer)
      union all
      (select ws_quantity*ws_list_price sales
       from web_sales, date_dim
       where d_year = 2000
         and d_moy = 2
         and ws_sold_date_sk = d_date_sk
         and ws_item_sk in (select item_sk from frequent_ss_items)
         and ws_bill_customer_sk in (select c_customer_sk from best_ss_customer))) x

You can monitor your Spark job in SageMaker Unified Studio using two methods. Jupyter notebooks provide basic monitoring, showing real-time job status and execution progress. For more detailed analysis, use the Spark UI. You can examine specific stages, tasks, and execution plans. The Spark UI is particularly useful for troubleshooting performance issues and optimizing queries. You can track estimated stages, running tasks, and task timing details. This comprehensive view helps you understand resource utilization and track job progress in depth.

In this section, we explained how you can EMR Serverless compute in SageMaker Unified Studio to build an interactive Spark application. Through the Spark UI, the interactive application provides fine-grained task-level status, I/O, and shuffle details, as well as links to corresponding logs of the task for this stage directly from your notebook, enabling a seamless troubleshooting experience.

Clean up

To avoid ongoing charges in your AWS account, delete the resources you created during this tutorial:

  1. Delete the connection.
  2. Delete the EMR job.
  3. Delete the EMR output S3 buckets.
  4. Delete the Amazon MWAA resources, such as workflows and environments.

Conclusion

In this post, we demonstrated how the next generation of SageMaker, combined with EMR Serverless, provides a powerful solution for developing, monitoring, and scheduling Spark applications using data in Amazon S3. The integrated experience significantly reduces complexity by offering a unified development environment, automatic resource management, and comprehensive monitoring capabilities through Spark UI, while maintaining cost-efficiency through a pay-as-you-go model. For businesses, this means faster time-to-insight, improved team collaboration, and reduced operational overhead, so data teams can focus on analytics rather than infrastructure management.

To get started, explore the Amazon SageMaker Unified Studio User Guide, set up a project in your AWS environment, and discover how this solution can transform your organization’s data analytics capabilities.

Appendix

In the following sections, we discuss how to run a workload on a schedule and provide details about the TPC-DS dataset for building the Spark application using EMR Serverless.

Run a workload on a schedule

In this section, we deploy a JupyterLab notebook and create a workflow using Amazon MWAA. You can use workflows to orchestrate notebooks, querybooks, and more in your project repositories. With workflows, you can define a collection of tasks organized as a directed acyclic graph (DAG) that can run on a user-defined schedule.Complete the following steps:

  1. In SageMaker Unified Studio, choose Build, and under Orchestration, choose Workflows.

  1. Choose Create Workflow in Editor.

You will be redirected to the JupyterLab notebook with a new DAG called untitled.py created under the /src/workflows/dag folder.

  1. We rename this notebook to tpcds_data_queries.py.
  2. You can reuse the existing template with the following updates:
    1. Update line 17 with the schedule you want your code to run.
    2. Update line 26 with your NOTEBOOK_PATH. This should be in src/<notebook_name>.ipynb. Note the name of the automatically generated dag_id; you can name it based on your requirements.

  1. Choose File and Save notebook.

To test, you can trigger a manual run of your workload.

  1. In SageMaker Unified Studio, choose Build, and under Orchestration, choose Workflows.
  2. Choose your workflow, then choose Run.

You can monitor the success of your job on the Runs tab.

To debug your notebook job by accessing the Spark UI within your Airflow job console, you must use EMR Serverless Airflow Operators to submit your job. The link is available on the Details tab of your query.

This option has the following key limitations: it’s not available for Amazon EMR on EC2, and SageMaker notebook job operators don’t work.

You can configure the operator to generate one-time links to the application UIs and Spark stdout logs by passing enable_application_ui_links=True as a parameter. After the job starts running, these links are available on the Details tab of the relevant task. If enable_application_ui_links=False, then the links will be present but grayed out.

Make sure you have the emr-serverless:GetDashboardForJobRun AWS Identity and Access Management (IAM) permissions to generate the dashboard link.

Open the Airflow UI for your job. The Spark UI and history server dashboard options are visible on the Details tab, as shown in the following screenshot.

The following screenshot shows the Jobs tab of the Spark UI.

Use the TPC-DS dataset to build the Spark application using EMR Serverless

To use the TPC-DS dataset to run the Spark application against a dataset in an S3 bucket, you need to copy the TPC-DS dataset into your S3 bucket:

  1. Create a new S3 bucket in your test account if needed. In the following code, replace $YOUR_S3_BUCKET with your S3 bucket name. We suggest you export YOUR_S3_BUCKET as an environment variable:
<Your bucket name>
  1. Copy the TPC-DS source data as input to your S3 bucket. If it’s not exported as an environment variable, replace $YOUR_S3_BUCKET with your S3 bucket name:
aws s3 sync s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/ s3://$YOUR_S3_BUCKET/blog/BLOG_TPCDS-TEST-3T-partitioned/

About the Authors

Amit Maindola is a Senior Data Architect focused on data engineering, analytics, and AI/ML at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

Abhilash is a senior specialist solutions architect at Amazon Web Services (AWS), helping public sector customers on their cloud journey with a focus on AWS Data and AI services. Outside of work, Abhilash enjoys learning new technologies, watching movies, and visiting new places.

Perform per-project cost allocation in Amazon SageMaker Unified Studio

Post Syndicated from Enrique Salgado Hernández original https://aws.amazon.com/blogs/big-data/perform-per-project-cost-allocation-in-amazon-sagemaker-unified-studio/

Amazon SageMaker Unified Studio is a single data and AI development environment where you can find and access your data and act on it using AWS resources for SQL analytics, data processing, model development, and generative AI application development.

SageMaker Unified Studio is part of the next generation of Amazon SageMaker. SageMaker brings together AWS artificial intelligence and machine learning (AI/ML) and analytics capabilities and delivers an integrated experience for analytics and AI with unified access to data.

With SageMaker Unified Studio, you can create domains and projects, providing a single interface to build, deploy, execute, and monitor end-to-end workflows. This approach helps drive collaboration across teams and facilitates agile development.

SageMaker Unified Studio implements resource tagging when AWS resources are provisioned. You can use these tags to track and allocate costs for the various resources created as part of the domains and projects within SageMaker Unified Studio.

This post demonstrates how to perform cost allocation using these resource tags, so finance analysts and business analysts can implement and follow Financial Operations (FinOps) best practices to control and track cloud infrastructure costs.

Solution overview

The following diagram illustrates how tagging works within SageMaker domains.

High level diagram that illustrates SageMaker Unified Studio entities (domains, projects and environments) are organized and how tags are applied to each of them

Before reviewing the implementation details, let’s explore several key SageMaker concepts: domain, project, project profile, and environment blueprint. For more information, refer to the SageMaker Unified Studio Administrator Guide.

  • Domain – A domain is an organizing entity created by an administrator. Administrators assign users to domains to enable collaboration using similar tools, assets, and resources. A domain can represent a business organization or a business unit containing people who collaborate and share resources. After creating a domain, administrators share the URL with users to access the portal.
  • Projects – Projects exist within each domain. A project provides a boundary where users can collaborate on a business use case. Users can create and share data, computing, and other resources within projects.
  • Project profile – When you create a project, you must select a project profile. A project profile is a template that governs infrastructure for the project, simplifying project creation with preconfigured settings and resources ready for use.
  • Environment blueprints – Environment blueprints are reusable templates for creating environments. They define settings for resource deployment and provide information for provisioning. Each blueprint uses an AWS CloudFormation template to create resources in a repeatable and scalable manner.

For effective cost tracking and allocation, make sure your SageMaker resources have proper tags. You can configure these as cost allocation tags to group and filter across AWS Billing and Cost Management tools (such as AWS Cost Explorer and AWS Data Exports).

As of this writing, SageMaker domains support tagging at the blueprint, domain, project, and environment level. When you create projects or add resources within an existing project, the following tags are automatically added to resources through CloudFormation resource tags, configured for each blueprint stack:

  • AmazonDataZoneBlueprint – Type of blueprint corresponding to this blueprint’s CloudFormation template (for example, Tooling)
  • AmazonDataZoneDomainAmazon DataZone domain associated with this CloudFormation template
  • AmazonDataZoneEnvironment – Amazon DataZone environment ID associated with this CloudFormation template
  • AmazonDataZoneProject – Amazon DataZone project associated with this CloudFormation template

To track costs in SageMaker Unified Studio, you will perform the following steps:

  1. Create a SageMaker domain and project.
  2. Configure cost and billing settings by enabling cost allocation tags.
  3. (Optional) Generate costs for your project.
  4. Track costs using Cost Explorer and Data Exports.

Prerequisites

This post requires the following configurations in your AWS account:

  • AWS IAM Identity Center enabled in your organization management account (preferred) or in the member account where you will use SageMaker Unified Studio. For instructions on enabling IAM Identity Center, refer to Enable IAM Identity Center.
  • Cost Explorer enabled in your organization management account (preferred) or in the member account where you will use SageMaker Unified Studio. For configuration steps, refer to Enabling Cost Explorer.

Either legacy AWS Cost and Usage Reports (AWS CUR) with Amazon Athena integration or Data Exports configured and integrated with Athena for queries. For setup instructions, refer to creating Data Exports.

Create a SageMaker Unified Studio domain and project

Complete the following steps to set up your domain and project:

  1. Create a SageMaker Unified Studio domain using the Quick setup option (recommended for new users) or manual setup.

After domain creation, you will be redirected to the domain overview page.

  1. Choose Open Unified Studio.
  2. On the SageMaker Unified Studio console, choose Create project.
  3. For Project profile, choose SQL analytics, then choose Continue.

SageMaker Unified Studio create project wokflow (configuration page)

  1. Choose Continue to keep the default blueprint parameters.
  2. Review the configuration summary, then choose Create project.

SageMaker Unified Studio create project wokflow (confirmation page)

After the project is created, you will be redirected to the project overview page. Record the project ID and domain ID.

Project details page showing various details such as project id, project name and project IAM role ARN

Cost and billing configuration

As mentioned earlier, to track costs in SageMaker Unified Studio, you must configure cost allocation tags. Refer to Organizing and tracking costs using AWS cost allocation tags for more information about this feature.

Complete the following steps:

  1. On the AWS Billing and Cost Management console, under Cost organization in the navigation pane, choose Cost allocation tags.
  2. Select the following tags and choose Activate:
    1. AmazonDataZoneDomain
    2. AmazonDataZoneProject
    3. AmazonDataZoneEnvironment
    4. AmazonDataZoneBlueprint

The AmazonDataZoneProject and AmazonDataZoneDomain tags correspond to the project and domain ID values you recorded earlier.

AWS cost allocation tags interface showing the AWS tags that are currently configured as cost allocation tags

Cost allocation tags configuration doesn’t apply retroactively. If you want to monitor costs associated with these tags in the AWS Billing and Cost Management tools before the activation date, you must request a cost allocation tag backfill. The backfill operation can take several hours to complete.

Generate costs for the project

This section explains how to generate costs associated with the underlying data backend (Amazon Redshift in this case) to examine them using AWS billing tools. You can skip this section if you’re tracking costs on an active project.

To generate costs, we use the table structure used in the Redshift Immersion Labs. Refer to Create Tables for more details.

To run queries in SageMaker Unified Studio, follow these steps:

  1. In your project, choose New and then Query.

Image that shows the query button within the SageMaker Unified Studio project overview page allowing users to open the query editor tool

  1. Use the Amazon Redshift Serverless compute configured for the project to generate the costs:
    1. Choose the Redshift (Lakehouse) connection.
    2. Choose the dev database.
    3. Choose the project schema.
    4. Choose Choose.

Image that shows the conection selector available in SageMaker Unified Studio. In this case Redshift LakeHouse connection is selected with dev database and project schema selected underneath

  1. Copy and execute the SQL statements provided in the following GitHub repo into the SageMaker Unified Studio query editor to create, load, and validate data on the tables.

View of the Query editor within the SageMaker Unified Studio portal. Image contains two SQL queries (create tables and COPY data operation)

After running these steps, you will have generated some Amazon Redshift costs that will be present for further analysis in AWS Billing and Cost Management tools. However, these tools (Cost Explorer and Data Exports) are refreshed least one time every 24 hours, so you might need to wait up to 24 hours before proceeding to the next section.

Tracking costs in AWS Billing and Cost Management tools

With the cost allocation tags enabled, you can use AWS Billing and Cost Management tools to analyze and track costs, including Cost Explorer and Data Exports. For more information about using these tools, refer to the AWS Billing and Cost Management User Guide.

Check costs in Cost Explorer

You can check your SageMaker Unified Studio costs using Cost Explorer. With this tool, you can view and analyze your costs and usage through an interface with pre-built filters and aggregation capabilities for various metrics. For more information, refer to the Analyzing your costs and usage with AWS Cost Explorer.

To access Cost Explorer, complete the following steps:

  1. On the AWS Management Console, choose your account name in the top right corner and choose Billing Dashboard, or search for “Cost Explorer” in the console search bar.
  2. On the Billing Dashboard, choose Cost Explorer in the navigation pane.
  3. For first-time users, choose Launch Cost Explorer to enable the service.

AWS can take up to 24 hours to prepare your cost data.

  1. To view overall costs per project, configure the following report parameters:
    1. For Date Range, enter your range.
    2. For Granularity, choose Monthly.
    3. For Dimension, choose Tag.
    4. For Tag, enter your tag (AmazonDataZoneProject).

Image that shows how to group by a particular dimension (tag) in cost explorer

The following screenshot shows a sample report.

AWS cost explorer report showing costs by SageMaker Unified Studio project

  1. To view different service costs for a specific project, update the following parameters:
    1. For Dimension, choose Service.Image that shows how to group by a particular dimension (service) in cost explorer
    2. For Tag¸ choose AmazonDataZoneProject and choose the value of the project you want to inspect (in this case, 4z9d694nbsnyqx).

Image that illustrates how to filter by a specific dimension (tag) and value in cost explorer

The results should look similar to the following screenshot.

AWS cost explorer report showing service costs for a particular SageMaker Unified Studio project

Check costs using Data Exports

With Data Exports, you can query your cost and usage in AWS with the maximum flexibility degree compared to other tools such as Cost Explorer. It provides a comprehensive set of measures and dimensions that you can include in the export to create a personalized report. This report is then delivered to Amazon Simple Storage Service (Amazon S3) so you can configure it with Athena, so it can be queried using SQL or business intelligence (BI) tools such as Amazon QuickSight.

This post assumes you have already configured a data export and you have it integrated with Athena (refer to Processing data exports for more information). For instructions on setting up CUR and Athena integration, refer to Creating reports.

Check costs by project

Use the following query to check costs by project:

SELECT product_servicecode,
    product_product_family,
    resource_tags[ 'user_amazon_data_zone_project' ] as user_amazon_data_zone_project,
    round(sum(line_item_unblended_cost), 2) costs,
    line_item_line_item_description 
FROM "data_exports"."data_exportdata"
where resource_tags [ 'user_amazon_data_zone_project' ] != ''
group by product_product_family,
    product_servicecode,
    resource_tags[ 'user_amazon_data_zone_project' ],
    line_item_line_item_description
order by round(sum(line_item_unblended_cost), 2) DESC;

Results will look similar to the following screenshot on the Athena console.

Athena SQL query results when querying cost and usage data from data exports

The preceding query shows your costs grouped by:

  • Project (using tags)
  • Service
  • Product family, which corresponds to the subtype for a given product usage charge (for example, ML Instance for SageMaker, or Managed Storage for Amazon Redshift)

Check costs for individual projects

To check costs for a specific SageMaker Unified Studio project (for example, the sample project 4z9d694nbsnyqx created during this walkthrough), you can use the following query:

SELECT product_servicecode,
    product_product_family,
    resource_tags[ 'user_amazon_data_zone_project' ] as user_amazon_data_zone_project,
    round(sum(line_item_unblended_cost), 2) costs,
    line_item_line_item_description 
FROM "data_exports"."data_exportdata"
where resource_tags [ 'user_amazon_data_zone_project' ] != ''
and resource_tags [ 'user_amazon_data_zone_project' ] = <provide the project id here>
group by product_product_family,
    product_servicecode,
    resource_tags[ 'user_amazon_data_zone_project' ],
    line_item_line_item_description
order by round(sum(line_item_unblended_cost), 2) DESC;

Monitor costs with Data Exports and QuickSight

If you enabled Athena to work with Data Exports, you can also configure QuickSight to query this data source. With QuickSight, you can create interactive dashboards to track SageMaker costs in SageMaker Unified Studio at scale.

Configure access and permissions

To create CUR dashboards in QuickSight, first complete the following steps:

  1. Subscribe to QuickSight and have an author user account. For instructions on subscribing to QuickSight, refer to Signing up for an Amazon QuickSight subscription.
  2. Enable access to Athena and your CUR S3 bucket in the Security & permissions section of the QuickSight administration console. You need QuickSight administrator permissions to access this console.

Image shows QuickSight administration console where administrators can edit the AWS services (Athena in this case) that QuickSight is allowed to access

  1. If you’re using AWS Lake Formation, make sure your QuickSight user is authorized to query the CUR database and table. For more information about granting access in Lake Formation, refer to Granting permissions on Data Catalog resources.

Create a QuickSight dataset

The next step is to create a dataset in QuickSight using a SQL query. For instructions on creating a dataset with SQL, refer to Using SQL to customize data. Use the following SQL expression:

SELECT product_servicecode,
    product_product_family,
    resource_tags[ 'user_amazon_data_zone_environment' ] as user_amazon_data_zone_environment,
    resource_tags[ 'user_amazon_data_zone_project' ] as user_amazon_data_zone_project,
    resource_tags[ 'user_amazon_data_zone_domain' ] as user_amazon_data_zone_domain,
    line_item_unblended_cost,
    line_item_usage_start_date,
    line_item_line_item_description
FROM "data_exports"."data_exportdata"
where resource_tags [ 'user_amazon_data_zone_environment' ] != '' or resource_tags [ 'user_amazon_data_zone_project' ] != ''

Image of QuickSight dataset preparation page. Shows a SQL query that is used to extract data from the data exports previously configured.

The preceding query includes only cost and usage data that’s tagged with either user_amazon_data_zone_environment or user_amazon_data_zone_project to focus on SageMaker associated costs. To include other AWS costs, you must modify these filters.

Create QuickSight dashboards

Using the authoring capabilities of QuickSight, you can create interactive dashboards where business stakeholders can explore and track costs associated with SageMaker Unified Studio projects. You can use these dashboards to review relevant cost metrics at a glance that are derived from the Data Exports dimensions and metrics included in your dataset, as shown in the following screenshot. For more information about adding visuals to analyses, refer to Adding visuals to Amazon QuickSight analyses.

Example of a QuickSight dashboard consuming data exports cost and usage data. Dashboard contains multiple visuals that illustrate SageMaker Unified Studio costs by project and service

The preceding example shows a dashboard built using QuickSight connected to a Data Exports dataset. The dashboard contains the following visuals:

  • KPI visual showing the current monthly costs for SageMaker Unified Studio along with the month over month (MoM) variation and history
  • Autonarrative visual analyzing SageMaker Unified Studio costs (highest) by month
  • Vertical stacked bar chart showing SageMaker Unified Studio costs by month (grouped by project)
  • Donut chart showing SageMaker Unified Studio cost by service
  • Heat map visual correlating costs by project ID and service

Using this approach (QuickSight and Data Exports), you can create highly customizable dashboards to explore and monitor your SageMaker Unified Studio costs. Furthermore, you can create automated reports using the QuickSight reporting feature to send these by email to the relevant stakeholders.

Clean up

Delete the resources you created as part of this post when you’re done with them to avoid monthly charges. This includes SageMaker resources, created Data Export reports and the QuickSight subscription (in case it was created to visualize costs).

  1. Delete SageMaker resources
    1. Log in to the SageMaker domain using an admin role.
    2. Delete the project you created.
    3. Delete the SageMaker domain.
  2. Delete Data Exports reports
    1. On the AWS Billing console, in the navigation pane, choose Cost & Usage Reports.
    2. Select the report you want to delete.
    3. Choose Delete.
    4. Confirm the deletion by choosing Delete report.

For more information about managing Data Exports, refer to Deleting exports.

  1. Unsubscribe from QuickSight
    1. On the QuickSight console, choose your profile name in the top right corner.
    2. Choose Manage QuickSight.
    3. Choose Account settings.
    4. At the bottom of the page, choose Delete your QuickSight account.
    5. Review the information about data deletion.
    6. Enter delete to confirm.
    7. Choose Delete.

IMPORTANT NOTE: Before unsubscribing, make sure you backed up any dashboards or analyses you want to keep. After deletion, you can’t recover your QuickSight assets. For more information about managing your QuickSight subscription, refer to Deleting your Amazon QuickSight subscription and closing the account.

Conclusion

Managing costs on a unified platform like SageMaker can seem challenging because it aggregates many tools and services with different cost models. In this post, we showed how to use AWS Billing and Cost Management tools to aggregate and categorize costs across the various services used within SageMaker. With this approach, you can monitor and track respective service costs, either in aggregate or focusing on a particular project.

Start taking control of your analytics and ML costs today. With AWS Billing and Cost Management tools with SageMaker, you can:

  • Track and monitor your service costs
  • Break down expenses by project or service
  • Implement efficient back charging mechanisms to the different business units or organizations using SageMaker within your organization

For further reading, refer to Analyzing your costs and usage with AWS Cost Explorer and Processing Data Exports (using Athena).


About the authors

Enrique Salgado Hernández is a Senior Specialist Solutions Architect at AWS with more than 10 years of experience working in the cloud. He specializes in designing and implementing large-scale analytics architectures across various industry sectors. He is passionate about working with customers to solve their problems by supporting them during their cloud journey.

Angel Conde Manjon is a Senior EMEA Data & AI PSA, based in Madrid. He previously worked on research related to data analytics and AI in diverse European research projects. In his current role, Angel helps partners develop businesses centered on data and AI.

Overcome your Kafka Connect challenges with Amazon Data Firehose

Post Syndicated from Swapna Bandla original https://aws.amazon.com/blogs/big-data/overcome-your-kafka-connect-challenges-with-amazon-data-firehose/

Apache Kafka is a popular open source distributed streaming platform that is widely used in the AWS ecosystem. It’s designed to handle real-time, high-throughput data streams, making it well-suited for building real-time data pipelines to meet the streaming needs of modern cloud-based applications.

For AWS customers looking to run Apache Kafka, but don’t want to worry about the undifferentiated heavy lifting involved with self-managing their Kafka clusters, Amazon Managed Streaming for Apache Kafka (Amazon MSK) offers fully managed Apache Kafka. This means Amazon MSK provisions your servers, configures your Kafka clusters, replaces servers when they fail, orchestrates server patches and upgrades, makes sure clusters are architected for high availability, makes sure data is durably stored and secured, sets up monitoring and alarms, and runs scaling to support load changes. With a managed service, you can spend your time developing and running streaming event applications.

For applications to use data sent to Kafka, you need to write, deploy, and manage application code that consumes data from Kafka.

Kafka Connect is an open-source component of the Kafka project that provides a framework for connecting with external systems such as databases, key-value stores, search indexes, and file systems from your Kafka clusters. On AWS, our customers commonly write and manage connectors using the Kafka Connect framework to move data out of their Kafka clusters into persistent storage, like Amazon Simple Storage Service (Amazon S3), for long-term storage and historical analysis.

At scale, customers need to programmatically manage their Kafka Connect infrastructure for consistent deployments when updates are required, as well as the code for error handling, retries, compression, or data transformation as it is delivered from your Kafka cluster. However, this introduces a need for investment into the software development lifecycle (SDLC) of this management software. Although the SDLC is a cost-effective and time-efficient process to help development teams build high-quality software, for many customers, this process is not desirable for their data delivery use case, particularly when they could dedicate more resources towards innovating for other key business differentiators. Beyond SDLC challenges, many customers face fluctuating data streaming throughput. For instance:

  • Online gaming businesses experience throughput variations based on game usage
  • Video streaming applications see changes in throughput depending on viewership
  • Traditional businesses have throughput fluctuations tied to consumer activity

Striking the right balance between resources and workload can be challenging. Under-provisioning can lead to consumer lag, processing delays, and potential data loss during peak loads, hampering real-time data flows and business operations. On the other hand, over-provisioning results in underutilized resources and unnecessary high costs, making the setup economically inefficient for customers. Even the action of scaling up your infrastructure introduces additional delays because resources need to be provisioned and acquired for your Kafka Connect cluster.

Even when you can estimate aggregated throughput, predicting throughput per individual stream remains difficult. As a result, to achieve smooth operations, you might resort to over-provisioning your Kafka Connect resources (CPU) for your streams. This approach, though functional, might not be the most efficient or cost-effective solution.

Customers have been asking for a fully serverless solution that will not only handle managing resource allocation, but transition the cost model to only pay for the data they are delivering from the Kafka topic, instead of underlying resources that require constant monitoring and management.

In September 2023, we announced a new integration between Amazon and Amazon Data Firehose, allowing builders to deliver data from their MSK topics to their destination sinks with a fully managed, serverless solution. With this new integration, you no longer needed to develop and manage your own code to read, transform, and write your data to your sink using Kafka Connect. Data Firehose abstracts away the retry logic required when reading data from your MSK cluster and delivering it to the desired sink, as well as infrastructure provisioning, because it can scale out and scale in automatically to adjust to the volume of data to transfer. There are no provisioning or maintenance operations required on your side.

At release, the checkpoint time to start consuming data from the MSK topic was the creation time of the Firehose stream. Data Firehose couldn’t start reading from other points on the data stream. This caused challenges for several different use cases.

For customers that are setting up a mechanism to sink data from their cluster for the first time, all data in the topic older than the timestamp of Firehose stream creation would need another way to be persisted. For example, customers using Kafka Connect connectors, like These users were limited in using Data Firehose because they wanted to sink all the data from the topic to their sink, but Data Firehose couldn’t read data from earlier than the timestamp of Firehose stream creation.

For other customers that were running Kafka Connect and needed to migrate from their Kafka Connect infrastructure to Data Firehose, this required some extra coordination. The release functionality of Data Firehose means you can’t point your Firehose stream to a specific point on the source topic, so a migration requires stopping data ingest to the source MSK topic and waiting for Kafka Connect to sink all the data to the destination. Then you can create the Firehose stream and restart the producers such that the Firehose stream can then consume new messages from the topic. This adds additional, and non-trivial, overhead to the migration effort when attempting to cut over from an existing Kafka Connect infrastructure to a new Firehose stream.

To address these challenges, we’re happy to announce a new feature in the Data Firehose integration with Amazon MSK. You can now specify the Firehose stream to either read from the earliest position on the Kafka topic or from a custom timestamp to begin reading from your MSK topic.

In the first post of this series, we focused on managed data delivery from Kafka to your data lake. In this post, we extend the solution to choose a custom timestamp for your MSK topic to be synced to Amazon S3.

Overview of Data Firehose integration with Amazon MSK

Data Firehose integrates with Amazon MSK to offer a fully managed solution that simplifies the processing and delivery of streaming data from Kafka clusters into data lakes stored on Amazon S3. With just a few clicks, you can continuously load data from your desired Kafka clusters to an S3 bucket in the same account, eliminating the need to develop or run your own connector applications. The following are some of the key benefits to this approach:

  • Fully managed service – Data Firehose is a fully managed service that handles the provisioning, scaling, and operational tasks, allowing you to focus on configuring the data delivery pipeline.
  • Simplified configuration – With Data Firehose, you can set up the data delivery pipeline from Amazon MSK to your sink with just a few clicks on the AWS Management Console.
  • Automatic scaling – Data Firehose automatically scales to match the throughput of your Amazon MSK data, without the need for ongoing administration.
  • Data transformation and optimization – Data Firehose offers features like JSON to Parquet/ORC conversion and batch aggregation to optimize the delivered file size, simplifying data analytical processing workflows.
  • Error handling and retries – Data Firehose automatically retries data delivery in case of failures, with configurable retry durations and backup options.
  • Offset select option – With Data Firehose, you can select the starting position for the MSK delivery stream to be delivered within a topic from three options:
    • Firehose stream creation time – This allows you to deliver data starting from Firehose stream creation time. When migrating from to Data Firehose, if you have an option to pause the producer, you can consider this option.
    • Earliest – This allows you to deliver data starting from MSK topic creation time. You can choose this option if you’re setting a new delivery pipeline with Data Firehose from Amazon MSK to Amazon S3.
    • At timestamp – This option allows you to provide a specific start date and time in the topic from where you want the Firehose stream to read data. The time is in your local time zone. You can choose this option if you prefer not to stop your producer applications while migrating from Kafka Connect to Data Firehose. You can refer to the Python script and steps provided later in this post to derive the timestamp for the latest events in your topic that were consumed by Kafka Connect.

The following are benefits of the new timestamp selection feature with Data Firehose:

  • You can select the starting position of the MSK topic, not just from the point that the Firehose stream is created, but from any point from the earliest timestamp of the topic.
  • You can replay the MSK stream delivery if required, for example in the case of testing scenarios to select from different timestamps with the option to select from a specific timestamp.
  • When migrating from Kafka Connect to Data Firehose, gaps or duplicates can be managed by selecting the starting timestamp for Data Firehose delivery from the point where Kafka Connect delivery ended. Because the new custom timestamp feature isn’t monitoring Kafka consumer offsets per partition, the timestamp you select for your Kafka topic should be a few minutes before the timestamp at which you stopped Kafka Connect. The earlier the timestamp you select, the more duplicate records you will have downstream. The closer the timestamp to the time of Kafka Connect stopping, the higher the likelihood of data loss if certain partitions have fallen behind. Be sure to select a timestamp appropriate to your requirements.

Overview of solution

We discuss two scenarios to stream data.

In Scenario 1, we migrate to Data Firehose from Kafka Connect with the following steps:

  1. Derive the latest timestamp from MSK events that Kafka Connect delivered to Amazon S3.
  2. Create a Firehose delivery stream with Amazon MSK as the source and Amazon S3 as the destination with the topic starting position as Earliest.
  3. Query Amazon S3 to validate the data loaded.

In Scenario 2, we create a new data pipeline from Amazon MSK to Amazon S3 with Data Firehose:

  1. Create a Firehose delivery stream with Amazon MSK as the source and Amazon S3 as the destination with the topic starting position as At timestamp.
  2. Query Amazon S3 to validate the data loaded.

The solution architecture is depicted in the following diagram.

Prerequisites

You should have the following prerequisites:

  • An AWS account and access to the following AWS services:
  • An MSK provisioned or MSK serverless cluster with topics created and data streaming to it. The sample topic used in this is order.
  • An EC2 instance configured to use as a Kafka admin client. Refer to Create an IAM role for instructions to create the client machine and IAM role that you will need to run commands against your MSK cluster.
  • An S3 bucket for delivering data from Amazon MSK using Data Firehose.
  • Kafka Connect to deliver data from Amazon MSK to Amazon S3 if you want to migrate from Kafka Connect (Scenario 1).

Migrate to Data Firehose from Kafka Connect

To reduce duplicates and minimize data loss, you need to configure your custom timestamp for Data Firehose to read events as close to the timestamp of the oldest committed offset that Kafka Connect reported. You can follow the steps in this section to visualize how the timestamps of each committed offset will vary by partition across the topic you want to read from. This is for demonstration purposes and doesn’t scale as a solution for workloads with a large number of partitions.

Sample data was generated for demonstration purposes by following the instructions referenced in the following GitHub repo. We set up a sample producer application that generates clickstream events to simulate users browsing and performing actions on an imaginary ecommerce website.

To derive the latest timestamp from MSK events that Kafka Connect delivered to Amazon S3, complete the following steps:

  1. From your Kafka client, query Amazon MSK to retrieve the Kafka Connect consumer group ID:
    ./kafka-consumer-groups.sh --bootstrap-server $bs --list --command-config client.properties

  2. Stop Kafka Connect.
  3. Query Amazon MSK for the latest offset and associated timestamp for the consumer group belonging to Kafka Connect.

You can use the get_latest_offsets.py Python script from the following GitHub repo as a reference to get the timestamp associated with the latest offsets for your Kafka Connect consumer group. To enable authentication and authorization for a non-Java client with an IAM authenticated MSK cluster, refer to the following GitHub repo for instructions on installing the aws-msk-iam-sasl-signer-python package for your client.

python3 get_latest_offsets.py --broker-list $bs --topic-name “order” --consumer-group-id “connect-msk-serverless-connector-090224” --aws-region “eu-west-1”

Note the earliest timestamp across all the partitions.

Create a data pipeline from Amazon MSK to Amazon S3 with Data Firehose

The steps in this section are applicable to both scenarios. Complete the following steps to create your data pipeline:

  1. On the Data Firehose console, choose Firehose streams in the navigation pane.
  2. Choose Create Firehose stream.
  3. For Source, choose Amazon MSK.
  4. For Destination, choose Amazon S3.
  5. For Source settings, browse to the MSK cluster and enter the topic name you created as part of the prerequisites.
  6. Configure the Firehose stream starting position based on your scenario:
    1. For Scenario 1, set Topic starting position as At Timestamp and enter the timestamp you noted in the previous section.
    2. For Scenario 2, set Topic starting position as Earliest.
  7. For Firehose stream name, leave the default generated name or enter a name of your preference.
  8. For Destination settings, browse to the S3 bucket created as part of the prerequisites to stream data.

Within this S3 bucket, by default, a folder structure with YYYY/MM/dd/HH will be automatically created. Data will be delivered to subfolders pertaining to the HH subfolder according to the Data Firehose to Amazon S3 ingestion timestamp.

  1. Under Advanced settings, you can choose to create the default IAM role for all the permissions that Data Firehose needs or choose existing an IAM role that has the policies that Data Firehose needs.
  2. Choose Create Firehose stream.

On the Amazon S3 console, you can verify the data streamed to the S3 folder according to your chosen offset settings.

Clean up

To avoid incurring future charges, delete the resources you created as part of this exercise if you’re not planning to use them further.

Conclusion

Data Firehose provides a straightforward way to deliver data from Amazon MSK to Amazon S3, enabling you to save costs and reduce latency to seconds. To try Data Firehose with Amazon S3, refer to the Delivery to Amazon S3 using Amazon Data Firehose lab.


About the Authors

Swapna Bandla is a Senior Solutions Architect in the AWS Analytics Specialist SA Team. Swapna has a passion towards understanding customers data and analytics needs and empowering them to develop cloud-based well-architected solutions. Outside of work, she enjoys spending time with her family.

Austin Groeneveld is a Streaming Specialist Solutions Architect at Amazon Web Services (AWS), based in the San Francisco Bay Area. In this role, Austin is passionate about helping customers accelerate insights from their data using the AWS platform. He is particularly fascinated by the growing role that data streaming plays in driving innovation in the data analytics space. Outside of his work at AWS, Austin enjoys watching and playing soccer, traveling, and spending quality time with his family.

How Stifel built a modern data platform using AWS Glue and an event-driven domain architecture

Post Syndicated from Amit Maindola original https://aws.amazon.com/blogs/big-data/how-stifel-built-a-modern-data-platform-using-aws-glue-and-an-event-driven-domain-architecture/

Stifel Financial Corp. is an American multinational independent investment bank and financial services company, founded in 1890 and headquartered in downtown St. Louis, Missouri. Stifel offers securities-related financial services in the United States and Europe through several wholly owned subsidiaries. Stifel provides both equity and fixed income research and is the largest provider of US equity research.

In this post, we show you how Stifel implemented a modern data platform using AWS services and open data standards, building an event-driven architecture for domain data products while centralizing the metadata to facilitate discovery and sharing of data products.

Stifel’s modern data platform use case

Stifel envisioned a data platform that delivers accurate, timely, and properly governed data, providing consistency throughout the organization whenever users access the information. This approach showed limitations as the data complexity increased, data volumes grew, and demand for quick, business-driven insights rose. These challenges are encountered by financial institutions worldwide, leading to a reassessment of traditional data management practices. Under the federated governance model, Stifel developed a modern data strategy based on the following objectives:

  • Managing ingestion and metadata
  • Creating source-aligned data products complying with Stifel business streams
  • Integrating source-aligned data products from other domains (Stifel business units)
  • Producing consumer-aligned data products for specific business purposes
  • Publishing data products to a centralized data catalog

Some of the Stifel challenges highlighted in the preceding list required building a data platform that can:

  • Boost agility by democratizing data, thus reducing time to market and enhancing the customer experience
  • Improve data quality and trust in the data
  • Standardize tools and eliminate the shadow information technology (IT) culture to increase scalability, reduce risk, and minimize operational inefficiencies

Following the federated governance model, Stifel has organized its domain structure to provide autonomy to various functional teams while preserving the core values of data mesh. The following diagram depicts a high-level architecture of the data mesh implementation at Stifel.

Each data domain has the flexibility to create data products that can be published to the centralized catalog, while maintaining the autonomy for teams to develop data products that are exclusively accessible to teams within the domain. These products aren’t available to others until they are deemed ready for broader enterprise use. Domains have the freedom to decide which data they want to share. They can either:

  • Make their data products visible to everyone through the central catalog
  • Keep their data products visible only within their own domain

By implementing an event-driven domain architecture, organizations can achieve significant business advantages while positioning themselves for future growth and innovation. Stifel data products refreshes were dependent on data assets with variable cadence. Event-driven architecture enables real-time or near real-time updates by allowing data products to automatically respond to changes in underlying data assets as they occur, rather than relying on fixed batch schedules that might miss critical updates or waste resources on unnecessary refreshes. The key is to carefully plan the implementation and make sure of alignment with business objectives while considering both technical and organizational factors. This architecture style particularly suits organizations that:

  • Need real-time processing capabilities
  • Have complex domain interactions
  • Require high scalability
  • Want to improve business agility
  • Need better system integration
  • Are pursuing digital transformation

The following are some of the key AWS Services that helped Stifel to build their modern data platform.

  • AWS Glue is a serverless data integration service that’s used for data processing to build data assets and data products in the domains. Data is also cataloged in AWS Glue Catalog, making it straightforward to discover and query with supported engines.
  • Amazon EventBridge provides a scalable and flexible serverless event bus that facilitates seamless communication between different domains and services. By using EventBridge, Stifel was able to implement a publish-subscribe model where domain events can be emitted, filtered, and routed to appropriate consumers based on configurable rules. EventBridge supports custom event buses for domain-specific events, enabling clear separation of concerns and improved manageability.
  • AWS Lake Formation helped in providing centralized security, governance, and catalog capabilities while preserving domain autonomy in data product creation and management. With Lake Formation, data domains were able to maintain their independent data products within a federated structure while enforcing consistent access controls, data quality standards, and metadata management across the organization.
  • Apache Hudi on Amazon Simple Storage Service (Amazon S3) offers an optimized way to store data assets and products and promotes interoperability across other services.

Stifel’s solution architecture

The following diagram illustrates the data mesh architecture that Stifel uses to build a domain-driven architecture. In this system, various domains create data products and share them with other domains through a central governance account that uses Lake Formation.

Let’s look at some of the key design components that are being used to enable and implement data mesh and event driven design

Data ingestion framework

The data ingestion framework consists of several processor modules that are built using several AWS services and metadata driven architecture. The following diagram shows the architecture of the raw data ingestion framework.

The framework gets raw data files from both internal Stifel systems and third-party data sources. These files are processed and stored in a raw data ingestion account on Amazon S3 in open table format Apache Hudi. This stored data is then shared with different parts of the organization, called data domains. Each domain can use this shared data to create their own data products.

As a file (in CSV, XML, JSON and custom formats) lands into the landing bucket, an Amazon S3 event notification is created and placed in an Amazon Simple Queue Service (Amazon SQS)queue. The Amazon SQS queue triggers an AWS Lambda function and saves the metadata (such as the name of the file, date and time the file was received, and the file size) to a file audit data store (Amazon Aurora PostgreSQL-Compatible Edition).

An EventBridge time scheduler invokes an AWS Step Functions workflow at pre-determined intervals. The Step Functions workflow orchestrates the batch ingestion from raw to staging layer.

  1. The Step Functions workflow orchestrates a set of Lambda functions to get the list of unprocessed raw files from the audit data store and create batches of raw files to process them in parallel. The Step Functions workflow then triggers parallel AWS Glue jobs that process each batch of raw files.
  2. Each raw file is validated for any data quality checks and the data is saved to staging tables in Hudi format. Any errors encountered are logged into an audit table and a notification is generated for support team. For all successfully processed raw files, the file status is updated to PROCESSED and logged into an audit table.
  3. After the Hudi table is updated, a data refresh event is sent to EventBridge and then passed to the Central Mesh Account. The Central Mesh Account forwards these events to the data domains to notify them that the raw tables are refreshed, allowing the data domains to use this data for creating their own data products.

Event driven data product refresh

The Stifel data lake is based on a data mesh architecture where several data producers share data across data domains. A mechanism is needed to alert consumers who depend on other data producers’ data products when those source data products are refreshed, so that the consumers can update their own data products accordingly. The following diagram describes the technical architecture of event-based data processing. The central governance account acts as the central event bus, which receives all data refresh events from all data producers. The central event bus forwards the events to consumer accounts. The consumer accounts filter the events consumers are interested in from data producers for their data processing needs.

Orchestration design

Stifel designed and implemented an event-based data pipeline orchestration system that triggers data pipelines when specific events occur. This system processes data immediately after receiving all required dependency events, enabling efficient workflow management.

The following diagram describes the logical architecture of the domain data pipeline orchestration framework.

The orchestration framework includes the components described in the following list. The data dependencies and data pipeline state management metadata are hosted in an Aurora PostgreSQL database.

  1. Data refresh processor: Receives data refresh events from central mesh and local data domain and evaluates if the domain data products data dependencies are met
  2. Data product dependency processor: Retrieves metadata for the product, kicks off a corresponding data domain AWS Glue job, and updates metadata with the job information
  3. Data pipeline state change processor: Monitors the domain data jobs and takes actions based on the job’s final status (SUCCEED or FAILED) and then creates incident tickets for failed jobs

Conclusion

Stifel has improved its data management and reduced data silos by adopting a data product approach. This strategy has positioned Stifel to become a data-driven, customer-centric organization. The company combines federated platform practices with AWS and open standards. As a result, Stifel is achieving its decentralization objectives through a scalable data platform. This platform empowers domain teams to make informed decisions, drive innovation, and maintain a competitive edge. Here are the some of the advantages Stifel got from an event-driven domain architecture (EDDA):

  • Business agility: Rapid market response, new business capability integration, scalable domains, quicker feature deployment, and flexible process modification
  • Customer experience: Real-time processing, responsive interactions, personalized services, consistent omnichannel presence, and enhanced service availability
  • Operational efficiency: Reduced system coupling, optimal resource use, scalable systems, lower maintenance overhead, and efficient data processing
  • Cost benefits: Lower development costs, reduced infrastructure expenses, decreased maintenance costs, efficient resource usage, and a better ROI on technology investments

In this post, we demonstrated how Stifel is building a modern data platform by recognizing the critical importance of data in today’s financial landscape. This strategic approach not only enhances operational efficiency but also positions Stifel at the forefront of technological innovation in the financial services industry. To learn more and get started, see the following resources:


About the authors

Amit Maindola is a Senior Data Architect focused on data engineering, analytics, and AI/ML at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

Srinivas Kandi is a Senior Architect at Stifel focusing on delivering the next generation of cloud data platform on AWS. Prior to joining Stifel, Srini was a delivery specialist in cloud data analytics at AWS helping several customers in their transformational journey into AWS cloud. In his free time, Srini likes to explore cooking, travel and learn new trends and innovations in AI and cloud computing.

Hossein Johari is a seasoned data and analytics leader with over 25 years of experience architecting enterprise-scale platforms. As Lead and Senior Architect at Stifel Financial Corp. in St. Louis, Missouri, he spearheads initiatives in Data Platforms and Strategic Solutions, driving the design and implementation of innovative frameworks that support enterprise-wide analytics, strategic decision-making, and digital transformation. Known for aligning technical vision with business objectives, he works closely with cross-functional teams to deliver scalable, forward-looking solutions that advance organizational agility and performance.

Ahmad Rawashdeh is a Senior Architect at Stifel Financial. He supports Stifel and its clients in designing, implementing, and building scalable and reliable data architectures on Amazon Web Services (AWS), with a strong focus on data lake strategies, database services, and efficient data ingestion and transformation pipelines.

Lei Meng is a data architect at Stifel. His focus is working in designing and implementing scalable and secure data solutions on the AWS and helping Stifel’s cloud migration from on-premises systems.

Build conversational AI search with Amazon OpenSearch Service

Post Syndicated from Bharav Patel original https://aws.amazon.com/blogs/big-data/build-conversational-ai-search-with-amazon-opensearch-service/

Retrieval Augmented Generation (RAG) is a well-known approach to creating generative AI applications. RAG combines large language models (LLMs) with external world knowledge retrieval and is increasingly popular for adding accuracy and personalization to AI. It retrieves relevant information from external sources, augments the input with this data, and generates responses based on both. This approach reduces hallucinations, improves fact accuracy, and allows for up-to-date, efficient, and explainable AI systems. RAG’s ability to break through classical language model limitations has made it applicable to broad AI use cases.

Amazon OpenSearch Service is a versatile search and analytics tool. It is capable of performing security analytics, searching data, analyzing logs, and many other tasks. It can also work with vector data with a k-nearest neighbors (k-NN) plugin, which makes it helpful for more complex search strategies. Because of this feature, OpenSearch Service can serve as a knowledge base for generative AI applications that integrate language generation with search results.

By preserving context over several exchanges, honing responses, and providing a more seamless user experience, conversational search enhances RAG. It helps with complex information needs, resolves ambiguities, and manages multi-turn reasoning. Conversational search provides a more natural and personalized interaction, yielding more accurate and pertinent results, even though standard RAG performs well for single queries.

In this post, we explore conversational search, its architecture, and various ways to implement it.

Solution overview

Let’s walk through the solution to build conversational search. The following diagram illustrates the solution architecture.

The new OpenSearch feature known as agents and tools is used to create conversational search. To develop sophisticated AI applications, agents coordinate a variety of machine learning (ML) tasks. Every agent has a number of tools; each intended for a particular function. To use agents and tools, you need OpenSearch version 2.13 or later.

Prerequisites

To implement this solution, you need an AWS account. If you don’t have one, you can create an account. You also need an OpenSearch Service domain with OpenSearch version 2.13 or later. You can use an existing domain or create a new domain.

To use the Amazon Titan Text Embedding and Anthropic Claude V1 models in Amazon Bedrock, you need to enable access to these foundation models (FMs). For instructions, refer to Add or remove access to Amazon Bedrock foundation models.

Configure IAM permissions

Complete the following steps to set up an AWS Identity and Access Management (IAM) role and user with appropriate permissions:

  1. Create an IAM role with the following policy that will allow the OpenSearch Service domain to invoke the Amazon Bedrock API:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "Statement1",
                "Effect": "Allow",
                "Action": [
                    "bedrock:InvokeAgent",
                    "bedrock:InvokeModel"
                ],
                "Resource": [
                    "arn:aws:bedrock:${Region}::foundation-model/amazon.titan-embed-text-v1",
                    "arn:aws:bedrock: ${Region}::foundation-model/anthropic.claude-instant-v1"
                ]
            }
        ]
    }
    

Depending on the AWS Region and model you use, specify those in the Resource section.

  1. Add opensearchservice.amazonaws.com as a trusted entity.
  2. Make a note of the IAM role Amazon Resource name (ARN).
  3. Assign the preceding policy to the IAM user that will create a connector.
  4. Create a passRole policy and assign it to IAM user that will create the connector using Python:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": "iam:PassRole",
                "Resource": "arn:aws:iam::${AccountId}:role/OpenSearchBedrock"
            }
        ]
    }
  5. Map the IAM role you created to the OpenSearch Service domain role using the following steps:
    • Log in to the OpenSearch Dashboard and open the Security page from the navigation menu.
    • Choose Roles and select ml_all_access.
    • Choose Mapped Users and Manage Mapping.
    • Under Users, add the ARN of the IAM user you created.

Establish a connection to the Amazon Bedrock model using the MLCommons plugin

In order to identify patterns and relationships, an embedding model transforms input data—such as words or images—into numerical vectors in a continuous space. Similar objects are grouped together to make it easier for AI systems to comprehend and respond to intricate user enquiries.

Semantic search concentrates on the purpose and meaning of a query. OpenSearch stores data in a vector index for retrieval and transforms it into dense vectors (lists of numbers) using text embedding models. We are using amazon.titan-embed-text-v1 hosted on Amazon Bedrock, but you will need to evaluate and choose the right model for your use case. The amazon.titan-embed-text-v1 model maps sentences and paragraphs to a 1,536-dimensional dense vector space and is optimized for the task of semantic search.

Complete the following steps to establish a connection to the Amazon Bedrock model using the MLCommons plugin:

  1. Establish a connection by using the Python client with the connection blueprint.
  2. Modify the values of the host and region parameters in the provided code block. For this example, we’re running the program in Visual Studio Code with Python version 3.9.6, but newer versions should also work.
  3. For the role ARN, use the ARN you created earlier, and run the following script using the credentials of the IAM user you created:
    import boto3
    import requests 
    from requests_aws4auth import AWS4Auth
    
    host = 'https://search-test.us-east-1.es.amazonaws.com/'
    region = 'us-east-1'
    service = 'es'
    credentials = boto3.Session().get_credentials()
    awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
    
    path = '_plugins/_ml/connectors/_create'
    url = host + path
    
    payload = {
      "name": "Amazon Bedrock Connector: embedding",
      "description": "The connector to bedrock Titan embedding model",
      "version": 1,
      "protocol": "aws_sigv4",
      "parameters": {
        "region": "us-east-1",
        "service_name": "bedrock",
        "model": "amazon.titan-embed-text-v1"
      },
      "credential": {
        "roleArn": "arn:aws:iam::<accountID>:role/opensearch_bedrock_external"
      },
      "actions": [
        {
          "action_type": "predict",
          "method": "POST",
          "url": "https://bedrock-runtime.${parameters.region}.amazonaws.com/model/${parameters.model}/invoke",
          "headers": {
            "content-type": "application/json",
            "x-amz-content-sha256": "required"
          },
          "request_body": "{ \"inputText\": \"${parameters.inputText}\" }",
          "pre_process_function": "connector.pre_process.bedrock.embedding",
          "post_process_function": "connector.post_process.bedrock.embedding"
        }
      ]
    }
    
    headers = {"Content-Type": "application/json"}
    
    r = requests.post(url, auth=awsauth, json=payload, headers=headers, timeout=15)
    print(r.status_code)
    print(r.text)
    
  4. Run the Python program. This will return connector_id.
    python3 connect_bedrocktitanembedding.py
    200
    {"connector_id":"nbBe65EByVCe3QrFhrQ2"}
  5. Create a model group against which this model will be registered in the OpenSearch Service domain:
    POST /_plugins/_ml/model_groups/_register
    {
      "name": "embedding_model_group",
      "description": "A model group for bedrock embedding models"
    }

    You get the following output:

    {
      "model_group_id": "1rBv65EByVCe3QrFXL6O",
      "status": "CREATED"
    }
  6. Register a model using connector_id and model_group_id:
    POST /_plugins/_ml/models/_register
    {
        "name": "titan_text_embedding_bedrock",
        "function_name": "remote",
        "model_group_id": "1rBv65EByVCe3QrFXL6O",
        "description": "test model",
        "connector_id": "nbBe65EByVCe3QrFhrQ2",
       "interface": {}
    }

You get the following output:

{
  "task_id": "2LB265EByVCe3QrFAb6R",
  "status": "CREATED",
  "model_id": "2bB265EByVCe3QrFAb60"
}
  1. Deploy a model using the model ID:
POST /_plugins/_ml/models/2bB265EByVCe3QrFAb60/_deploy

You get the following output:

{
  "task_id": "bLB665EByVCe3QrF-slA",
  "task_type": "DEPLOY_MODEL",
  "status": "COMPLETED"
}

Now the model is deployed, and you will see that in OpenSearch Dashboards on the OpenSearch Plugins page.

Create an ingestion pipeline for data indexing

Use the following code to create an ingestion pipeline for data indexing. The pipeline will establish a connection to the embedding model, retrieve the embedding, and then store it in the index.

PUT /_ingest/pipeline/cricket_data_pipeline {
    "description": "batting score summary embedding pipeline",
    "processors": [
        {
            "text_embedding": {
                "model_id": "GQOsUJEByVCe3QrFfUNq",
                "field_map": {
                    "cricket_score": "cricket_score_embedding"
                }
            }
        }
    ]
}

Create an index for storing data

Create an index for storing data (for this example, the cricket achievements of batsmen). This index stores raw text and embeddings of the summary text with 1,536 dimensions and uses the ingest pipeline we created in the previous step.

PUT cricket_data {
    "mappings": {
        "properties": {
            "cricket_score": {
                "type": "text"
            },
            "cricket_score_embedding": {
                "type": "knn_vector",
                "dimension": 1536,
                "space_type": "l2",
                "method": {
                    "name": "hnsw",
                    "engine": "faiss"
                }
            }
        }
    },
    "settings": {
        "index": {
            "knn": "true"
        }
    }
}

Ingest sample data

Use the following code to ingest the sample data for four batsmen:

POST _bulk?pipeline=cricket_data_pipeline
{"index": {"_index": "cricket_data"}}
{"cricket_score": "Sachin Tendulkar, often hailed as the 'God of Cricket,' amassed an extraordinary batting record throughout his 24-year international career. In Test cricket, he played 200 matches, scoring a staggering 15,921 runs at an average of 53.78, including 51 centuries and 68 half-centuries, with a highest score of 248 not out. His One Day International (ODI) career was equally impressive, spanning 463 matches where he scored 18,426 runs at an average of 44.83, notching up 49 centuries and 96 half-centuries, with a top score of 200 not out – the first double century in ODI history. Although he played just one T20 International, scoring 10 runs, his overall batting statistics across formats solidified his status as one of cricket's all-time greats, setting numerous records that stand to this day."}
{"index": {"_index": "cricket_data"}}
{"cricket_score": "Virat Kohli, widely regarded as one of the finest batsmen of his generation, has amassed impressive statistics across all formats of international cricket. As of April 2024, in Test cricket, he has scored over 8,000 runs with an average exceeding 50, including numerous centuries. His One Day International (ODI) record is particularly stellar, with more than 12,000 runs at an average well above 50, featuring over 40 centuries. In T20 Internationals, Kohli has maintained a high average and scored over 3,000 runs. Known for his exceptional ability to chase down targets in limited-overs cricket, Kohli has consistently ranked among the top batsmen in ICC rankings and has broken several batting records throughout his career, cementing his status as a modern cricket legend."}
{"index": {"_index": "cricket_data"}}
{"cricket_score": "Adam Gilchrist, the legendary Australian wicketkeeper-batsman, had an exceptional batting record across formats during his international career from 1996 to 2008. In Test cricket, Gilchrist scored 5,570 runs in 96 matches at an impressive average of 47.60, including 17 centuries and 26 half-centuries, with a highest score of 204 not out. His One Day International (ODI) record was equally remarkable, amassing 9,619 runs in 287 matches at an average of 35.89, with 16 centuries and 55 half-centuries, and a top score of 172. Gilchrist's aggressive batting style and ability to change the course of a game quickly made him one of the most feared batsmen of his era. Although his T20 International career was brief, his overall batting statistics, combined with his wicketkeeping skills, established him as one of cricket's greatest wicketkeeper-batsmen."}
{"index": {"_index": "cricket_data"}}
{"cricket_score": "Brian Lara, the legendary West Indian batsman, had an extraordinary batting record in international cricket during his career from 1990 to 2007. In Test cricket, Lara amassed 11,953 runs in 131 matches at an impressive average of 52.88, including 34 centuries and 48 half-centuries. He holds the record for the highest individual score in a Test innings with 400 not out, as well as the highest first-class score of 501 not out. In One Day Internationals (ODIs), Lara scored 10,405 runs in 299 matches at an average of 40.48, with 19 centuries and 63 half-centuries. His highest ODI score was 169. Known for his elegant batting style and ability to play long innings, Lara's exceptional performances, particularly in Test cricket, cemented his status as one of the greatest batsmen in the history of the game."}

Deploy the LLM for response generation

Use the following code to deploy the LLM for response generation. Modify the values of host, region, and roleArn in the provided code block.

  1. Create a connector by running the following Python program. Run the script using the credentials of the IAM user created earlier.
    import boto3
    import requests 
    from requests_aws4auth import AWS4Auth
    
    host = 'https://search-test.us-east-1.es.amazonaws.com/'
    region = 'us-east-1'
    service = 'es'
    credentials = boto3.Session().get_credentials()
    awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
    
    path = '_plugins/_ml/connectors/_create'
    url = host + path
    
    payload = {
      "name": "BedRock Claude instant-v1 Connector ",
      "description": "The connector to BedRock service for claude model",
      "version": 1,
      "protocol": "aws_sigv4",
      "parameters": {
        "region": "us-east-1",
        "service_name": "bedrock",
        "anthropic_version": "bedrock-2023-05-31",
        "max_tokens_to_sample": 8000,
        "temperature": 0.0001,
        "response_filter": "$.completion"
      },
       "credential": {
            "roleArn": "arn:aws:iam::accountId:role/opensearch_bedrock_external"
        },
      "actions": [
        {
          "action_type": "predict",
          "method": "POST",
          "url": "https://bedrock-runtime.${parameters.region}.amazonaws.com/model/anthropic.claude-instant-v1/invoke",
          "headers": {
            "content-type": "application/json",
            "x-amz-content-sha256": "required"
          },
          "request_body": "{\"prompt\":\"${parameters.prompt}\", \"max_tokens_to_sample\":${parameters.max_tokens_to_sample}, \"temperature\":${parameters.temperature},  \"anthropic_version\":\"${parameters.anthropic_version}\" }"
        }
      ]
     }
        
    
    headers = {"Content-Type": "application/json"}
    
    r = requests.post(url, auth=awsauth, json=payload, headers=headers, timeout=15)
    print(r.status_code)
    print(r.text)

If it ran successfully, it would return connector_id and a 200-response code:

200
{"connector_id":"LhLSZ5MBLD0avmh1El6Q"}
  1. Create a model group for this model:
    POST /_plugins/_ml/model_groups/_register
    {
        "name": "claude_model_group",
        "description": "This is an example description"
    }

This will return model_group_id; make a note of it:

{
  "model_group_id": "LxLTZ5MBLD0avmh1wV4L",
  "status": "CREATED"
}
  1. Register a model using connection_id and model_group_id:
    POST /_plugins/_ml/models/_register
    {
        "name": "anthropic.claude-v1",
        "function_name": "remote",
        "model_group_id": "LxLTZ5MBLD0avmh1wV4L",
        "description": "LLM model",
        "connector_id": "LhLSZ5MBLD0avmh1El6Q",
        "interface": {}
    }
    

It will return model_id and task_id:

{
  "task_id": "YvbVZ5MBtVAPFbeA7ou7",
  "status": "CREATED",
  "model_id": "Y_bVZ5MBtVAPFbeA7ovb"
}
  1. Finally, deploy the model using an API:
POST /_plugins/_ml/models/Y_bVZ5MBtVAPFbeA7ovb/_deploy

The status will show as COMPLETED. That means the model is successfully deployed.

{
  "task_id": "efbvZ5MBtVAPFbeA7otB",
  "task_type": "DEPLOY_MODEL",
  "status": "COMPLETED"
}

Create an agent in OpenSearch Service

An agent orchestrates and runs ML models and tools. A tool performs a set of specific tasks. For this post, we use the following tools:

  • VectorDBTool – The agent use this tool to retrieve OpenSearch documents relevant to the user question
  • MLModelTool – This tool generates user responses based on prompts and OpenSearch documents

Use the embedding model_id in VectorDBTool and LLM model_id in MLModelTool:

POST /_plugins/_ml/agents/_register {
    "name": "cricket score data analysis agent",
    "type": "conversational_flow",
    "description": "This is a demo agent for cricket data analysis",
    "app_type": "rag",
    "memory": {
        "type": "conversation_index"
    },
    "tools": [
        {
            "type": "VectorDBTool",
            "name": "cricket_knowledge_base",
            "parameters": {
                "model_id": "2bB265EByVCe3QrFAb60",
                "index": "cricket_data",
                "embedding_field": "cricket_score_embedding",
                "source_field": [
                    "cricket_score"
                ],
                "input": "${parameters.question}"
            }
        },
        {
            "type": "MLModelTool",
            "name": "bedrock_claude_model",
            "description": "A general tool to answer any question",
            "parameters": {
                "model_id": "gbcfIpEByVCe3QrFClUp",
                "prompt": "\n\nHuman:You are a professional data analysist. You will always answer question based on the given context first. If the answer is not directly shown in the context, you will analyze the data and find the answer. If you don't know the answer, just say don't know. \n\nContext:\n${parameters.cricket_knowledge_base.output:-}\n\n${parameters.chat_history:-}\n\nHuman:${parameters.question}\n\nAssistant:"
            }
        }
    ]
}

This returns an agent ID; take note of the agent ID, which will be used in subsequent APIs.

Query the index

We have batting scores of four batsmen in the index. For the first query, let’s specify the player name:

POST /_plugins/_ml/agents/<agent ID>/_execute {
    "parameters": {
        "question": "What is batting score of Sachin Tendulkar ?"
    }
}

Based on context and available information, it returns the batting score of Sachin Tendulkar. Note the memory_id from the response; you will need it for subsequent questions in the next steps.

We can ask a follow-up question. This time, we don’t specify the player name and expect it to answer based on the earlier question:

POST /_plugins/_ml/agents/<agent ID>/_execute {
    "parameters": {
        "question": " How many T20 international match did he play?",
        "next_action": "then compare with Virat Kohlis score",
        "memory_id": "so-vAJMByVCe3QrFYO7j",
        "message_history_limit": 5,
        "prompt": "\n\nHuman:You are a professional data analysist. You will always answer question based on the given context first. If the answer is not directly shown in the context, you will analyze the data and find the answer. If you don't know the answer, just say don't know. \n\nContext:\n${parameters.population_knowledge_base.output:-}\n\n${parameters.chat_history:-}\n\nHuman:always learn useful information from chat history\nHuman:${parameters.question}, ${parameters.next_action}\n\nAssistant:"
    }
}

In the preceding API, we use the following parameters:

  • Question and Next_action – We also pass the next action to compare Sachin’s score with Virat’s score.
  • Memory_id – This is memory assigned to this conversation. Use the same memory_id for subsequent questions.
  • Prompt – This is the prompt you give to the LLM. It includes the user’s question and the next action. The LLM should answer only using the data indexed in OpenSearch and must not invent any information. This way, you prevent hallucination.

Refer to ML Model tool for more details about setting up these parameters and the GitHub repo for blueprints for remote inferences.

The tool stores the conversation history of the questions and answers in the OpenSearch index, which is used to refine answers by asking follow-up questions.

In real-world scenarios, you can map memory_id against the user’s profile to preserve the context and isolate the user’s conversation history.

We have demonstrated how to create a conversational search application using the built-in features of OpenSearch Service.

Clean up

To avoid incurring future charges, delete the resources created while building this solution:

  1. Delete the OpenSearch Service domain.
  2. Delete the connector.
  3. Delete the index.

Conclusion

In this post, we demonstrated how to use OpenSearch agents and tools to create a RAG pipeline with conversational search. By integrating with ML models, vectorizing questions, and interacting with LLMs to improve prompts, this configuration oversees the entire process. This method allows you to quickly develop AI assistants that are ready for production without having to start from scratch.

If you’re building a RAG pipeline with conversational history to let users ask follow-up questions for more refined answers, give it a try and share your feedback or questions in the comments!


About the author

Bharav Patel is a Specialist Solution Architect, Analytics at Amazon Web Services. He primarily works on Amazon OpenSearch Service and helps customers with key concepts and design principles of running OpenSearch workloads on the cloud. Bharav likes to explore new places and try out different cuisines.

Enhance stability with dedicated cluster manager nodes using Amazon OpenSearch Service

Post Syndicated from Chinmayi Narasimhadevara original https://aws.amazon.com/blogs/big-data/enhance-stability-with-dedicated-cluster-manager-nodes-using-amazon-opensearch-service/

Amazon OpenSearch Service is a managed service that you can use to secure, deploy, and operate OpenSearch clusters at scale in the AWS Cloud. With OpenSearch Service, you can configure clusters with different types of node options such as data nodes, dedicated cluster manager nodes, dedicated coordinator nodes, and UltraWarm nodes. When configuring your OpenSearch Service domain, you can exercise different node options to manage your cluster’s overall stability, performance, and resiliency.

In this post, we show how to enhance the stability of your OpenSearch Service domain with dedicated cluster manager nodes and how using these in deployment enhances your cluster’s stability and reliability.

The benefit of dedicated cluster manager nodes

A dedicated cluster manager node handles the behind-the-scenes work of running an OpenSearch Service cluster, but it doesn’t store actual data or process search requests. In the absence of dedicated cluster manager nodes, OpenSearch Service will use data nodes for cluster management; combining these responsibilities on the data nodes can impact performance and stability because data operations (like indexing and searching) compete with critical cluster management tasks for computing resources. The dedicated cluster manager node is responsible for several key tasks: monitoring and keeping track of all the data nodes in the cluster, knowing how many indexes and shards there are and where they’re located, and routing data to the correct places. They also update and share the cluster state whenever something changes, like creating an index or adding and removing nodes. The problem, however, is that when traffic gets heavy, the cluster manager node can get overloaded and become unresponsive. If this happens, your cluster will not respond to write requests until it elects a new cluster manager, at which point the cycle might repeat itself. You can alleviate this issue by deploying dedicated cluster manager instances, whereby this separation of duties between the manager node and the data nodes results in a much more stable cluster.

Calculating the number of dedicated cluster manager nodes

In OpenSearch Service, a single node is elected as the cluster manager from all eligible nodes through a quorum-based voting process, confirming consensus before taking on the responsibility of coordinating cluster-wide operations and maintaining the cluster’s state. Quorum is the minimum number of nodes that need to agree before the cluster makes important decisions. It helps keep your data consistent and your cluster running smoothly. When you use dedicated cluster manager nodes, only those nodes are eligible for election and OpenSearch Service sets the quorum to half of the nodes, rounded down to the nearest whole number, plus one. One dedicated cluster manager node is explicitly prohibited by OpenSearch Service because you have no backup in the event of a failure. Using three dedicated cluster manager nodes makes sure that even if one node fails, the remaining two can still reach a quorum and maintain cluster operations. We recommend three dedicated cluster manager nodes for production use cases. Multi-AZ with standby is an OpenSearch Service feature designed to deliver four 9s of availability using a third AWS Availability Zone as a standby. When you use Multi-AZ with standby, the service requires three dedicated cluster manager nodes. If you deploy with Multi-AZ without standby or Single-AZ, we still recommend three dedicated cluster manager nodes. It provides two backup nodes in the event of one cluster manager node failure and the necessary quorum (two) to elect a new manager. You can choose three or five dedicated cluster manager nodes.

Having five dedicated cluster manager nodes works as well as three, and you can lose two nodes while maintaining a quorum. But because only one dedicated cluster manager node is active at any given time, this configuration means you pay for four idle nodes.

Cluster manager node configurations for different domain creation methods

This section explains the resources each domain creation method and template deploy when you set up an OpenSearch Service domain.

With the Easy create option, you can quickly create a domain using ‘multi-AZ with standby’ for high availability three-cluster manager nodes distributed across three Availability Zones. The following table summarizes the configuration.

Domain Creation Method Output
Easy Create

Dedicated cluster manager node: Yes

Number of cluster manager nodes: 3

Availability Zones: 3

Standby: Yes

The Standard create option provides templates for ‘Production’ and ‘Dev/test’workloads. Both templates come with a Domain with standby and a Domain without standby deployment choice. The following table summarizes these configuration options.

Domain Creation Method Template Deployment Option Output
Standard Create Production Domain with standby

Requires dedicated cluster manager node

Number of cluster manager nodes: 3

Availability Zones: 3

Standby: Yes

Instance type choice: Yes

Standard create Production Domain without standby

Requires dedicated cluster manager node

Number of cluster manager nodes: 3, 5

Availability Zones: 3

Standby: No

Instance type choice: Yes

Standard Create Dev/test Domain with standby

Requires dedicated cluster manager node

Number of cluster manager nodes: 3

Availability Zones: 3

Standby: Yes

Instance type choice: Yes

Standard create Dev/test Domain without standby Does not require dedicated cluster manager node

Choosing a dedicated cluster manager instance type

Dedicated cluster manager instances typically handle critical cluster operations like shard distribution and index management and track cluster state changes. It’s recommended to select a comparatively smaller instance type. Refer to Choosing instance types for dedicated master nodes for more information on instance types for dedicated cluster manager nodes.

You should expect to occasionally adjust cluster manager instance size and type as your workload evolves over time. As with all scale questions, you need to monitor performance and make sure you have enough CPU and Java virtual machine (JVM) heap for your dedicated cluster managers. We recommend using Amazon CloudWatch alarms to monitor the following CloudWatch metrics, and adjust according to the alarm state:

  • ManagerCPUUtilization – Maximum is greater than or equal to 50% for 15 minutes, three consecutive times
  • ManagerJVMMemoryPressure – Maximum is greater than or equal to 95% for 1 minute, three consecutive times

Conclusion

Dedicated cluster manager nodes provide added stability and protection against split-brain situations, can be of a different instance type than data nodes, and are an obvious benefit when OpenSearch Service is backing mission-critical applications for production workloads. They are typically not required for development workloads like proof of concept because the cost of running a dedicated cluster manager node exceeds the tangible benefits of keeping the cluster up and running. To learn more about OpenSearch best practices, see link.


About the authors

Imtiaz (Taz) Sayed is the WW Tech Leader for Analytics at AWS. He enjoys engaging with the community on all things data and analytics. He can be reached through LinkedIn.

Chinmayi Narasimhadevara is a Senior Solutions Architect focused on Data Analytics and AI at AWS. She helps customers build advanced, highly scalable, and performant solutions.

Kaltura reduces observability operational costs by 60% with Amazon OpenSearch Service

Post Syndicated from Ido Ziv original https://aws.amazon.com/blogs/big-data/kaltura-reduces-observability-operational-costs-by-60-with-amazon-opensearch-service/

This post is co-written with Ido Ziv from Kaltura.

As organizations grow, managing observability across multiple teams and applications becomes increasingly complex. Logs, metrics, and traces generate vast amounts of data, making it challenging to maintain performance, reliability, and cost-efficiency.

At Kaltura, an AI-infused video-first company serving millions of users across hundreds of applications, observability is mission-critical. Understanding system behavior at scale isn’t just about troubleshooting—it’s about providing seamless experiences for customers and employees alike. But achieving effective observability at this scale comes with challenges: managing spans; correlating logs, traces, and events across distributed systems; and maintaining visibility without overwhelming teams with noise. Balancing granularity, cost, and actionable insights requires constant tuning and thoughtful architecture.

In this post, we share how Kaltura transformed its observability strategy and technological stack by migrating from a software as a service (SaaS) logging solution to Amazon OpenSearch Service—achieving higher log retention, a 60% reduction in cost, and a centralized platform that empowers multiple teams with real-time insights.

Observability challenges at scale

Kaltura ingests over 8TB of logs and traces daily, processing more than 20 billion events across 6 production AWS Regions and over 200 applications—with log spikes reaching up to 6 GB per second. This immense data volume, combined with a highly distributed architecture, created significant challenges in observability. Historically, Kaltura relied on a SaaS-based observability solution that met initial requirements but became increasingly difficult to scale. As the platform evolved, teams generated disparate log formats, applied retention policies that no longer reflected data value, and operated more than 10 organically grown observability sources. The lack of standardization and visibility required extensive manual effort to correlate data, maintain pipelines, and troubleshoot issues – leading to rising operational complexity and fixed costs that didn’t scale efficiently with usage.

Kaltura’s DevOps team recognized the need to reassess their observability solution and began exploring a variety of options, from self-managed platforms to fully managed SaaS offerings. After a comprehensive evaluation, they made the strategic decision to migrate to OpenSearch Service, using its advanced features such as Amazon OpenSearch Ingestion, the Observability plugin, UltraWarm storage, and Index State Management.

Solution overview

Kaltura created a new AWS account that would be a dedicated observability account, where OpenSearch Service was deployed. Logs and traces were collected from different accounts and producers such as microservices on Amazon Elastic Kubernetes Service (Amazon EKS) and services running on Amazon Elastic Compute Cloud (Amazon EC2).

By using AWS services such as AWS Identity and Access Management (IAM), AWS Key Management Service (AWS KMS), and Amazon CloudWatch, Kaltura was able to meet the standards to create a production-grade system while keeping security and reliability in mind. The following figure shows a high-level design of the environment setup.

Ingestion

As seen in the following diagram, logs are shipped using log shippers, also known as collectors. In Kaltura’s case, they used Fluent Bit. A log shipper is a tool designed to collect, process, and transport log data from various sources to a centralized location, such as log analytics platforms, management systems, or an aggregator system. Fluent Bit was used in all sources and also provided light processing abilities. Fluent Bit was deployed as a daemonset in Kubernetes. The application development teams didn’t change their code, because the Fluent Bit pods were reading the stdout of the application pods.

The following code is an example of FluentBit configurations for Amazon EKS:

[INPUT]
   Name                tail
   Path                /var/log/containers/*.log
   Tag                 kube.*
   Skip_Long_Lines     On
   multiline.parser    docker, cri
[FILTER]
   alias               k8s
   # kubernetes filter to parse all logs
   Name                kubernetes
   Match               kube.*
   Kube_Tag_Prefix     kube.var.log.containers.
   Annotations         On
   Labels              Off
   Merge_Log           On
   Keep_Log            Off
   Kube_URL            https://kubernetes.default.svc.cluster.local:443 
[FILTER]
   alias               apps
   Name                rewrite_tag
   Match               kube.*
   Rule                $kubernetes['annotations']['kaltura.com/observability'] ^apps$ 
[OUTPUT]
   Name                http
   Match               apps.*
   Alias               apps
   Host                xxxxx.us-east-1.osis.amazonaws.com
   Port                443
   URI                 /log/apps
   Format              json
   aws_auth            true
   aws_region          us-east-1
   aws_service         osis
   aws_role_arn        arn:aws:iam::xxxxx:role/osis-ingestion-role
   Log_Level           trace
   tls On

Spans and traces were collected directly from the application layer using a seamless integration approach. To facilitate this, Kaltura deployed an OpenTelemetry Collector (OTEL) using the OpenTelemetry Operator for Kubernetes. Additionally, the team developed a custom OTEL code library, which was incorporated into the application code to efficiently capture and log traces and spans, providing comprehensive observability across their system.

Data from Fluent Bit and OpenTelemetry Collector was sent to OpenSearch Ingestion, a fully managed, serverless data collector that delivers real-time log, metric, and trace data to OpenSearch Service domains and Amazon OpenSearch Serverless collections. Each producer sent data to a specific pipeline, one for logs and one for traces, where data was transformed, aggregated, enriched, and normalized before being sent to OpenSearch Service. The trace pipeline used the otel_trace and service_map processors, while using the OpenSearch Ingestion OpenTelemetry trace analytics blueprint.

The following code is an example of the OpenSearch Ingestion pipeline for logs:

version: "2"
entry-pipeline:
 source:
   http:
     path: "/log/apps"

 processor:
   - add_entries:
       entries:
       - key: "log_type"
         value: "default"
       - key: "log_type"
         value: "api"
         add_when: 'contains(/filename, "api.log")'
         overwrite_if_key_exists: true
       - key: "log_type"
         value: "stats"
         add_when: 'contains(/filename, "stats.log")'
         overwrite_if_key_exists: true
       - key: "log_type"
         value: "event"
         add_when: 'contains(/filename, "event.log")'
         overwrite_if_key_exists: true
       - key: "log_type"
         value: "login"
         add_when: 'contains(/filename, "login.log")'
         overwrite_if_key_exists: true

   - grok:
       grok_when: '/log_type == "api"'
       match:
         log: ['^\[%%{DATA:timestamp}] \[%%{DATA:logIp}\] \[%%{DATA:host}\] \[%%{WORD:id}\] %%{WORD:priorityName}\(%%{NUMBER:priority}\): \[memory: %%{DATA:memory} MB, real: %%{DATA:real}MB\] %%{GREEDYDATA:message}']

   - date:
       match:
         - key: timestamp
           patterns: ["dd-MMM-yyyy HH:mm:ss", "dd/MMM/yyyy:HH:mm:ss Z", "EEE MMM dd HH:mm:ss.SSSSSS yyyy"]

       destination: "@timestamp"
       output_format: "yyyy-MM-dd'T'HH:mm:ss"

   - rename_keys:
       entries:
       - from_key: "timestamp"
         to_key: "@timestamp"
         overwrite_if_to_key_exists: false
       - from_key: "date"
         to_key: "@timestamp"
         overwrite_if_to_key_exists: false

   - drop_events:
       drop_when: 'contains(/filename, "simplesamlphp.log")'


 sink:
   - opensearch:
       hosts: ["${opensearch_host}"]
       index: '$${/env}-api-$${/log_type}-app-logs'
       index_type: custom
       action: create
       bulk_size: 20
       aws:
         sts_role_arn: ${sts_role_arn}
         region:  ${region}
       dlq:
         s3:
           bucket: "${bucket}"
           key_path_prefix: 'my-app-dlq-files'
           region: "${region}"
           sts_role_arn: "${sts_role_arn}"

The preceding example shows the use of processors such as grok, date, add_entries, rename_keys, and drop_events:

  • add_entries:
    • Adds a new field log_type based on filename
    • Default: “default”
    • If the filename contains specific substrings (such as api.log or stats.log), it assigns a more specific type
  • grok:
    • Applies Grok parsing to logs of type “api”
    • Extracts fields like timestamp, logIp, host, priorityName, priority, memory, real, and message using a custom pattern
  • date:
    • Parses timestamp strings into a standard datetime format
    • Stores it in a field called @timestamp based on ISO8601 format
    • Handles multiple timestamp patterns
  • rename_keys:
    • timestamp or date are renamed into @timestamp
    • Does not overwrite if @timestamp already exists
  • drop_events:
    • Drops logs where filename contains simplesamlphp.log
    • This is a filtering rule to ignore noisy or irrelevant logs

The following is an example of the input of a log line:

   "log": "[25-Mar-2025 18:23:18] [127.0.0.1] [the-most-awesome-server-in-kaltura] [67e2f496cc321] INFO(6): [memory: 4.51 MB, real: 6MB] [request: 1] [time: 0.0263s / total: 0.0263s]",

After processing, we get the following code:

    "log_type": "api",
    "priorityName": "INFO",
    "memory": "4.51",
    "host": "the-most-awesome-server-in-kaltura",
    "real": "6",
    "priority": "6",
    "message": "[request: 1] [time: 0.0263s / total: 0.0263s]",
    "logIp": "127.0.0.1",
    "id": "67e2f496cc321",
    "@timestamp": "2025-03-25T18:23:18"

Kaltura followed some OpenSearch Ingestion best practices, such as:

  • Including a dead-letter queue (DLQ) in pipeline configuration. This can significantly help troubleshoot pipeline issues.
  • Starting and stopping pipelines to optimize cost-efficiency, when possible.
  • During the proof of concept stage:
    • Installing Data Prepper locally for faster development iterations.
    • Disabling persistent buffering to expedite blue-green deployments.

Achieving operational excellence with efficient log and trace management

Logs and traces play a vital role in identifying operational issues, but they come with unique challenges. First, they represent time series data, which inherently evolves over time. Second, their value typically diminishes as time passes, making efficient management crucial. Third, they are append-only in nature. With OpenSearch, Kaltura faced distinct trade-offs between cost, data retention, and latency. The goal was to make sure valuable data remained accessible to engineering teams with minimal latency, but the solution also needed to be cost-effective. Balancing these factors required thoughtful planning and optimization.

Data was ingested to OpenSearch data streams, which simplifies the process of ingesting append-only time series data. Several Index State Management (ISM) policies were applied to different data streams, which were dependent on log retention requirements. ISM policies handled moving indexes from hot storage to UltraWarm, and eventually deleting the indexes. This allowed a customizable and cost-effective solution, with low latency for querying new data and reasonable latency for querying historical data.

The following example ISM policy makes sure indexes are managed efficiently, rolled over, and moved to different storage tiers based on their age and size, and eventually deleted after 60 days. If an action fails, it is retried with an exponential backoff strategy. In case of failures, notifications are sent to relevant teams to keep them informed.

{
    "id": "retention",
    "policy": {
        "description": "production ISM",
        },
        "default_state": "hot",
        "states": [
            {
                "name": "hot",
                "actions": [
                    {
                        "retry": {
                            "count": 5,
                            "backoff": "exponential",
                            "delay": "1h"
                        },
                        "rollover": {
                            "min_primary_shard_size": "30gb",
                            "copy_alias": false
                        }
                    }
                ],
                "transitions": [
                    {
                        "state_name": "warm",
                        "conditions": {
                            "min_index_age": "2d"
                        }
                    }
                ]
            },
            {
                "name": "warm",
                "actions": [
                    {
                        "retry": {
                            "count": 5,
                            "backoff": "exponential",
                            "delay": "1h"
                        },
                        "warm_migration": {}
                    }
                ],
                "transitions": [
                    {
                        "state_name": "cold",
                        "conditions": {
                            "min_index_age": "14d"
                        }
                    }
                ]
            },
            {
                "name": "cold",
                "actions": [
                    {
                        "retry": {
                            "count": 5,
                            "backoff": "exponential",
                            "delay": "1h"
                        },
                        "cold_migration": {
                            "start_time": null,
                            "end_time": null,
                            "timestamp_field": "@timestamp",
                            "ignore": "none"
                        }
                    }
                ],
                "transitions": [
                    {
                        "state_name": "delete",
                        "conditions": {
                            "min_index_age": "60d"
                        }
                    }
                ]
            },
            {
                "name": "delete",
                "actions": [
                    {
                        "retry": {
                            "count": 3,
                            "backoff": "exponential",
                            "delay": "1m"
                        },
                        "cold_delete": {}
                    }
                ],
                "transitions": []
            }
        ],
        "ism_template": [
            {
                "index_patterns": [
                    "*-logs"
                ],
                "priority": 50,
            }
        ]
    }
}

To create a data stream in OpenSearch, a definition of index template is required, which configures how the data stream and its backing indexes will behave. In the following example, the index template specifies key index settings such as the number of shards, replication, and refresh interval—controlling how data is distributed, replicated, and refreshed across the cluster. It also defines the mappings, which describe the structure of the data—what fields exist, their types, and how they should be indexed. These mappings make sure the data stream knows how to interpret and store incoming log data efficiently. Finally, the template enables the @timestamp field as the time-based field required for a data stream.

{
  "index_patterns": [
    "*my-app-logs"
  ],
  "template": {
    "settings": {
      "index.number_of_shards": "32",
      "index.number_of_replicas": "0",
      "index.refresh_interval": "60s"
    },
    "mappings": {
      "properties": {
        "priorityName": {
          "type": "keyword"
        },
        "log_type": {
          "type": "keyword"
        },
        "@timestamp": {
          "type": "date"
        },
        "memory": {
          "type": "float"
        },
        "host": {
          "type": "keyword"
        },
        "pid": {
          "type": "keyword"
        },
        "real": {
          "type": "float"
        },
        "env": {
          "type": "keyword"
        },
        "message": {
          "type": "text"
        },
        "priority": {
          "type": "integer"
        },
        "logIp": {
          "type": "ip"
        }
      }
    }
  },
  "composed_of": [],
  "priority": "100",
  "_meta": {
    "flow": "simple"
  },
  "data_stream": {
    "timestamp_field": {
      "name": "@timestamp"
    }
  },
  "name": "my-app-logs"
}

Implementing role-based access control and user access

The new observability platform is accessed by many types of users; internal users log in to OpenSearch Dashboards using SAML-based federation with Okta. The following diagram illustrates the user flow.

Each user accesses the dashboards to view observability items relevant to their role. Fine-grained access control (FGAC) is enforced in OpenSearch using built-in IAM role and SAML group mappings to implement role-based access control (RBAC).When users log in to the OpenSearch domain, they are automatically routed to the appropriate tenant based on their assigned role. This setup makes sure developers can create dashboards tailored to debugging within development environments, and support teams can build dashboards focused on identifying and troubleshooting production issues. The SAML integration alleviates the need to manage internal OpenSearch users entirely.

For each role in Kaltura, a corresponding OpenSearch role was created with only the necessary permissions. For instance, support engineers are granted access to the monitoring plugin to create alerts based on logs, whereas QA engineers, who don’t require this functionality, are not granted that access.

The following screenshot shows the role of the DevOps engineers defined with cluster permissions.

These users are routed to their own dedicated DevOps tenant, to which they only have write access. This makes it possible for different users from different roles in Kaltura to create the dashboard items that focus on their priorities and needs. OpenSearch supports backend role mapping; Kaltura mapped the Okta group to the role so when a user logs in from Okta, they automatically get assigned based on their role.

This also works with IAM roles to facilitate automations in the cluster using external services, such as OpenSearch Ingestion pipelines, as can be seen in the following screenshot.

Using observability features and service mapping for enhanced trace and log correlation

After a user is logged in, they can use the Observability plugins, view surrounding events in logs, correlate logs and traces, and use the Trace Analytics plugin. Users can inspect traces and spans, and group traces with latency information using built-in dashboards. Users can also drill down to a specific trace or span and correlate it back to log events. The service_map processor used in OpenSearch Ingestion sends OpenTelemetry data to create a distributed service map for visualization in OpenSearch Dashboards.

Using the combined signals of traces and spans, OpenSearch discovers the application connectivity and maps them to a service map.

After OpenSearch ingests the traces and spans from Otel, they are aggregated to groups according to paths and trends. Durations are also calculated and presented to the user over time.

With a trace ID, it’s possible to filter out all the relevant spans by the service and see how long each took, identifying issues with external services such as MongoDB and Redis.

From the spans, users can discover the relevant logs.

Post-migration enhancements

After the migration, a strong developer community emerged within Kaltura that embraced the new observability solution. As adoption grew, so did requests for new features and enhancements aimed at improving the overall developer experience.

One key improvement was extending log retention. Kaltura achieved this by re-ingesting historical logs from Amazon Simple Storage Service (Amazon S3) using a dedicated OpenSearch Ingestion pipeline with Amazon S3 read permissions. With this enhancement, teams can access and analyze logs from up to a year ago using the same familiar dashboards and filters.

In addition to monitoring EKS clusters and EC2 instances, Kaltura expanded its observability stack by integrating more AWS services. Amazon API Gateway and AWS Lambda were introduced to support log ingestion from external vendors, allowing for seamless correlation with existing data and broader visibility across systems.

Finally, to empower teams and promote autonomy, data stream templates and ISM policies are managed directly by developers within their own repositories. By using infrastructure as code tools like Terraform, developers can define index mappings, alerts, and dashboards as code—versioned in Git and deployed consistently across environments.

Conclusion

Kaltura successfully implemented a smart log retention strategy, extending real time retention from 5 days for all log types to 30 days for critical logs, while maintaining cost-efficiency through the use of UltraWarm nodes. This approach led to a 60% reduction in costs compared to their previous solution. Additionally, Kaltura consolidated their observability platform, streamlining operations by merging 10 separate systems into a unified, all-in-one solution. This consolidation not only improved operational efficiency but also sparked increased engagement from developer teams, driving feature requests, fostering internal design collaborations, and attracting early adopters for new enhancements. If Kaltura’s journey has inspired you and you’re thinking about implementing a similar solution in your organization, consider these steps:


About the authors

Ido Ziv is a DevOps team leader in Kaltura with over 6 years of experience. His hobbies include sailing and Kubernetes (but not at the same time).

Roi Gamliel is a Senior Solutions Architect helping startups build on AWS. He is passionate about the OpenSearch Project, helping customers fine-tune their workloads and maximize results.

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to use data, gain insights, and derive value.

Introducing GenAI-powered business description recommendations for custom assets in Amazon SageMaker Catalog

Post Syndicated from Ramesh H Singh original https://aws.amazon.com/blogs/big-data/introducing-genai-powered-business-description-recommendations-for-custom-assets-in-amazon-sagemaker-catalog/

An organization’s data can come from various sources, including cloud-based pipelines, partner ecosystems, open table formats like Apache Iceberg, software as a service (SaaS) platforms, and internal applications. Although much of this data is business-critical, the ability to make it documented and discoverable at scale continues to challenge teams—especially when assets don’t originate from pre-integrated AWS based sources.

To help bridge this gap, Amazon SageMaker Catalog—part of the next generation of Amazon SageMaker—now supports generative AI-powered recommendations for business descriptions, including table summaries, use cases, and column-level descriptions for custom structured assets registered programmatically. This new capability, powered by large language models (LLMs) in Amazon Bedrock, extends automated metadata generation to the broader spectrum of enterprise data, including Iceberg tables in Amazon Simple Storage Service (Amazon S3) or datasets from third-party and internal applications.

With just a few clicks, you can create AI-generated suggestions, review and refine descriptions, and publish enriched asset metadata directly to the catalog. This helps reduce manual documentation effort, improves metadata consistency, and accelerates asset discoverability across organizations.

This launch is part of our broader investment in generative AI-powered cataloging and metadata intelligence across SageMaker Catalog. By combining machine learning (ML) with human oversight and governance controls, we’re making it straightforward for organizations to scale trusted, usable data across business units.

In this post, we demonstrate how to generate AI recommendations for business descriptions for custom structured assets in SageMaker Catalog.

Challenges when using incomplete metadata for custom and external data

SageMaker Catalog supports automated documentation for assets harvested from AWS-centered services like AWS Glue and Amazon Redshift. These built-in integrations automatically pull schema and generate contextual metadata, making it straightforward for data consumers to discover and understand what’s available.

However, many critical datasets originate outside of these services, such as:

  • Iceberg tables stored in Amazon S3
  • Structured datasets from third-party platforms like Snowflake or Databricks
  • Relational assets manually registered using APIs

As a result, customers had to manually enter business descriptions and column-level context—a process that delays publishing, introduces inconsistency, and undermines the discoverability of important assets.

With this launch, SageMaker Catalog adds support for generative AI-powered metadata generation for custom schema-based data assets registered programmatically through APIs. We use large language models (LLMs) in Amazon Bedrock to automatically generate key elements for custom structured assets. This includes providing a comprehensive table summary, detailed column-level descriptions, and suggesting potential analytical use cases. These automated capabilities help streamline the documentation process, ensuring consistency and efficiency across data assets.

Customer Spotlight

Across industries, customers are managing thousands of structured datasets that don’t originate from AWS-native pipelines. These datasets often lack documentation—not because they’re unimportant, but because documenting them is time-consuming, repetitive, and often deprioritized.

How Amazon’s Finance is revolutionizing data management with AI-powered metadata generation

As a large-scale organization with diverse data needs, Amazon’s Finance team manages thousands of data assets. Within the Finance organization, numerous datasets often lack proper documentation, creating bottlenecks that hinder critical financial analysis and decision-making.

Balaji Kumar Gopalakrishnan, Principal Engineer at Amazon Finance, shares how the AI-powered metadata generation capability is transforming their data management approach:

“As a finance organization, we manage numerous datasets that lack proper documentation, creating bottlenecks for critical financial analysis. The AI-powered auto-documentation capability would be transformative for our team—alleviating the manual documentation effort that delays asset discovery and usability. This would dramatically reduce our time-to-insight for reporting while enforcing consistent metadata standards across all our manually registered assets.”

This empowers teams like Amazon Finance to streamline metadata generation and documentation, making critical financial data easier to access and work with. By automating metadata creation, teams can focus on high-impact analysis, accelerating their decision-making process and improving the overall efficiency of the organization.

Key Benefits

This new feature directly addresses key challenges faced by cataloging teams by enabling them to:

  • Accelerate time to publish: Minimize the delay between data availability and catalog readiness.
  • Improve metadata quality: Ensure consistent, LLM-generated context, regardless of schema authors.
  • Enhance discoverability: Enable quick and easy access to data through rich, searchable descriptions.
  • Build trust: Provide transparent, editable AI suggestions to ensure metadata aligns with organizational needs and domain accuracy.

For data producers, this capability eliminates the need for repetitive, manual documentation, saving valuable time. By automating metadata generation, it also standardizes how metadata is written and structured across assets, resulting in faster publishing and quicker data access for consumers.

On the consumer side, the enhanced metadata offers greater clarity, allowing users to understand the data and its usage at a glance. With complete and curated metadata, they can trust the source, while working more independently and reducing reliance on subject matter experts (SMEs) and data stewards for interpretation.

Solution overview

In this post, we demonstrate how to manually create a structured asset and use the new AI-powered capability to generate business metadata to improve asset usability. The asset we add is a product inventory table with the following columns:

Table : ProductInventory
   Columns :
        productID : string
        name: string
        price: double
        stock_quantity : integer
        shipped_from : integer

Prerequisites

To follow this post, you must have an Amazon SageMaker Unified Studio domain set up with a domain owner or domain unit owner privileges. You must have a project that we will use to publish assets. For instructions, refer to the SageMaker Unified Studio Getting started guide.

Create an asset

Complete the following steps to manually create the asset:

  1. The manually registered asset types need to use the amazon.datazone.RelationalTableFormType form type. Get the latest revision in your domain. Run the following command, replacing the domain-identifier with your domain:
aws datazone  get-form-type --domain-identifier dzd_xxxxf --form-type-identifier amazon.datazone.RelationalTableFormType

The latest revision returned is 7, which we use in the next steps:

{
    "createdAt": "2024-12-23T21:12:50.484000+00:00",
    "createdBy": "SYSTEM",
    "domainId": "dzd_xxxxf",
    "imports": [
        {
            "name": "amazon.datazone.RelationalColumnMixin",
            "revision": "5"
        },
        {
            "name": "amazon.datazone.RelationalTableMixin",
            "revision": "5"
        }
    ],
    "model": {
        "smithy": "$version: \"2.0\"\n\nnamespace amazon.datazone\n\nstructure RelationalColumn with [ RelationalColumnMixin ] {\n\n}\n\nlist RelationalColumns {\n    member: RelationalColumn\n}\n\n@documentation(\"A generic form-type to capture relational table details\")\nstructure RelationalTableFormType with [ RelationalTableMixin ] {\n\n    columns: RelationalColumns\n}"
    },
    "name": "amazon.datazone.RelationalTableFormType",
    "originDomainId": "dzd_amazon_datazone_domain",
    "originProjectId": "dzd_amazon_datazone_domain_project",
    "owningProjectId": "dzd_amazon_datazone_domain_project",
    
    "status": "ENABLED"
}
  1. Create a new asset type that uses the amazon.datazone.RelationalTableFormType revision returned in the previous step:
aws datazone create-asset-type \
>   --domain-identifier dzd_xxxxf \
>   --name MyAssetType \
>   --description "Manually registered custom asset type" \
>   --owning-project-identifier 4zxxxx3r \
>   --forms-input '{"MyCustomForm": {"required": true, "typeIdentifier": "amazon.datazone.RelationalTableFormType","typeRevision":"7"}}'

You will receive a success response similar to the following:

{
    "description": "Manually registered custom asset type",
    "domainId": "dzd_xxxxf",
    "formsOutput": {
        "AssetCommonDetailsForm": {
            "required": false,
            "typeName": "amazon.datazone.AssetCommonDetailsFormType",
            "typeRevision": "6"
        },
        "MyCustomForm": {
            "required": true,
            "typeName": "amazon.datazone.RelationalTableFormType",
            "typeRevision": "7"
        }
    },
    "name": "MyAssetType",
    "revision": "1"
}
  1. Create the asset for the table using the asset type and replacing the domain and project identifiers in your domain. For this example, we also enable businessNameGeneration:
aws datazone create-asset --domain-identifier dzd_xxxxf \
--name ProductInventory \
--owning-project-identifier 4zxxxx3r \
--type-identifier MyAssetType \
--forms-input  '[{
    "content": "{\r\n  \"tableName\": \"ProductInventory\",\r\n  \"columns\": [\r\n    {\r\n      \"columnName\": \"productID\",\r\n      \"dataType\": \"string\"\r\n    },\r\n    {\r\n      \"columnName\": \"name\",\r\n      \"dataType\": \"string\"\r\n    },\r\n    {\r\n      \"columnName\": \"price\",\r\n      \"dataType\": \"double\"\r\n    },\r\n    {\r\n      \"columnName\": \"stock_quantity\",\r\n      \"dataType\": \"integer\"\r\n    },\r\n    {\r\n      \"columnName\": \"shipped_from\",\r\n      \"dataType\": \"string\"\r\n    }\r\n  ]\r\n}",
    "formName": "MyCustomForm",
    "typeIdentifier": "amazon.datazone.RelationalTableFormType"}]'

The following is an example success response after the asset is created:

{
    "createdAt": "2025-06-24T23:47:51.734000+00:00",
    "createdBy": "9665be38-c692-4474-a41f-5d9793040f08",
    "domainId": "dzd_xxxxf",
    "firstRevisionCreatedAt": "2025-06-24T23:47:51.734000+00:00",
    "firstRevisionCreatedBy": "9665be38-c692-4474-a41f-5d9793040f08",
    "formsOutput": [
        {
            "content": "{\"tableName\":\"ProductInventory\",\"columns\":[{\"columnName\":\"productID\",\"dataType\":\"string\"},{\"columnName\":\"name\",\"dataType\":\"string\"},{\"columnName\":\"price\",\"dataType\":\"double\"},{\"columnName\":\"stock_quantity\",\"dataType\":\"integer\"},{\"columnName\":\"shipped_from\",\"dataType\":\"string\"}]}",
            "formName": "MyCustomForm",
            "typeName": "amazon.datazone.RelationalTableFormType"
        }
    ],
    "id": "4e4w5chq6lf3tz",
    "name": "ProductInventory",
    "owningProjectId": "4zxxxx3r",
    "predictionConfiguration": {
        "businessNameGeneration": {
            "enabled": true
        }
    },
    "readOnlyFormsOutput": [],
    "revision": "1",
    "typeIdentifier": "MyAssetType",
    "typeRevision": "1"
}

When an asset is created with businessNameGeneration enabled, it generates the business name predictions asynchronously. After they are generated, they are returned as suggestions under the asset’s readOnlyForms.

Generate business metadata

Complete the following steps to generate metadata:

  1. Log in to the SageMaker Unified Studio portal, open the project that you used, and choose Assets in the navigation pane.

The business name is already generated for the asset and columns.

  1. To generate descriptions, choose Generate descriptions.

The following screenshot shows the generated names on the Schema tab.

  1. If you approve of the generated names, choose Accept all.

  1. Choose Accept all again to confirm.

  1. Choose Generate descriptions to create suggested table and column descriptions.

  1. Review the generated recommendations and choose Accept all if it looks accurate.

The following screenshot shows the generated descriptions.

Even when assets are registered as custom, you can use this feature to generate business context and seamlessly publish it to SageMaker catalog.

Conclusion

As enterprise data environments become increasingly distributed and sourced from diverse platforms, maintaining metadata quality at scale presents a challenge. This feature uses generative AI to automate the creation of business descriptions, including table summaries, use cases, and column-level metadata, reducing manual effort while preserving alignment with governance requirements.

The feature is available in the next generation of SageMaker through SageMaker Catalog for custom structured assets (with schema) registered programmatically using an API. For implementation details, refer to the product documentation.


About the authors

Ramesh H Singh is a Senior Product Manager Technical (External Services) at AWS in Seattle, Washington, currently with the Amazon SageMaker team. He is passionate about building high-performance ML/AI and analytics products that enable enterprise customers to achieve their critical goals using cutting-edge technology. Connect with him on LinkedIn.

Pradeep Misra PicPradeep Misra is a Principal Analytics Solutions Architect at AWS. He works across Amazon to architect and design modern distributed analytics and AI/ML platform solutions. He is passionate about solving customer challenges using data, analytics, and AI/ML. Outside of work, Pradeep likes exploring new places, trying new cuisines, and playing board games with his family. He also likes doing science experiments, building LEGOs and watching anime with his daughters.

Balaji Kumar Gopalakrishnan is a Principal Engineer at Amazon Finance Technology. He has been with Amazon since 2013, solving real-world challenges through technology that directly impact the lives of Amazon customers. Outside of work, Balaji enjoys hiking, painting, and spending time with his family. He is also a movie buff!

Mohit Dawar is a Senior Software Engineer at AWS working on DataZone and SageMaker Unified Studio. Over the past three years, he has led efforts around the core metadata catalog, generative AI-powered metadata curation, and lineage visualization. He enjoys working on large-scale distributed systems, experimenting with AI to improve user experience, and building tools that make data governance feel effortless. Connect with him on LinkedIn.

Mark Horta is a Software Development Manager at AWS working on DataZone and SageMaker Unified Studio. He is responsible for leading the engineering efforts for SageMaker Catalog focusing on generative-AI metadata generation and curation and data lineage.

Building serverless event streaming applications with Amazon MSK and AWS Lambda

Post Syndicated from Tarun Rai Madan original https://aws.amazon.com/blogs/big-data/building-serverless-event-streaming-applications-with-amazon-msk-and-aws-lambda/

As organizations build modern applications with event-driven architectures (EDA), they often seek solutions that minimize infrastructure management overhead while maximizing developer productivity. Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS Lambda together provide a serverless, scalable, and cost-efficient platform for real-time event-driven processing.

In this post, we describe how you can simplify your event-driven application architecture using AWS Lambda with Amazon MSK. We demonstrate how to configure Lambda as a consumer for Kafka topics, including a cross-account setup and how to optimize price and performance for these applications.

Why use Lambda with Amazon MSK?

Customers building event-driven applications have several key priorities when it comes to their architecture choices. They typically seek to reduce their operational overhead by using Amazon Web Services (AWS) to handle the complex, underlying infrastructure components so their teams can focus on core business logic. Additionally, developers prefer a streamlined experience that minimizes the need for repetitive boilerplate code, enabling them to be more productive and focus on creating value. Furthermore, these customers want to achieve both scalability and cost-effectiveness without the burden of managing compute infrastructure directly. Lambda integration with Amazon MSK effectively addresses these requirements, delivering a comprehensive solution that combines the benefits of serverless computing with managed Kafka services. For example, an ecommerce company can use Amazon MSK to collect real-time clickstream data from its website and process those events using AWS Lambda. With this integration, they can trigger Lambda functions to update recommendation models, send personalized offers, or analyze user behavior instantly—without provisioning or managing servers. The key benefits of using Lambda with Amazon MSK include:

  1. Simplicity through native integration – AWS Lambda offers native integration with Amazon MSK through a connector resource called event source mapping. You can use this integration to directly associate a Kafka topic—whether it’s on Amazon MSK or a self-managed Kafka cluster—as an event source for a Lambda function without writing custom consumer logic. With just a few configuration steps, event source mapping handles partition assignment, offset tracking, and parallelized batch processing under the hood. It uses the Kafka consumer group protocol to distribute topic partitions across multiple concurrent Lambda invocations, supports batch windowing, and enables at-least-once delivery semantics. Moreover, it automatically commits offsets upon successful function execution while handling retries and dead-letter queue (DLQ) routing for failed records, significantly reducing the operational overhead traditionally associated with Kafka consumers.
  2. Auto scaling and throughput controls – When using AWS Lambda with Amazon MSK through event source mapping, Lambda automatically scales by assigning a dedicated event poller per Kafka partition, enabling parallel, partition-based processing. This allows the system to elastically handle varying traffic without manual intervention. For advanced control, provisioned concurrency pre-initializes Lambda execution environments, eliminating cold starts and delivering consistent low-latency performance. Additionally, with provisioned event source mapping, you can configure the minimum and maximum number of Kafka pollers, providing precise control over throughput and concurrency. This is ideal for applications with unpredictable traffic patterns or strict latency requirements.
  3. Cost-effectiveness – AWS Lambda uses a pay-per-use model in which you only pay for compute time and number of invocations. When integrated with Amazon MSK, there are no charges for idle time, making it ideal for bursty or low-frequency Kafka workloads. You can further optimize costs by tuning batch size and batch window settings. For mission-critical workloads, provisioned concurrency provides consistent performance with controlled pricing.
  4. Event filtering – AWS Lambda supports event filtering for Amazon MSK event sources, which means you can process only the Kafka records that match specific criteria. This reduces unnecessary function invocations and optimizes your function costs. You can define up to five filters per event source mapping (with the option to request an increase to ten). Each filter uses a JSON-based pattern to specify the conditions a record must meet to be processed. Filters can be applied using the AWS Management Console, AWS Command Line Interface (AWS CLI), or AWS Serverless Application Model (AWS SAM) templates. For more details and examples, refer to the AWS Lambda documentation on event filtering with Amazon MSK.
  5. Handling Availability Zone outage for your consumer – Amazon MSK enables high availability for your Kafka brokers by distributing them across multiple Availability Zones within a Region. To maintain high availability across your application, you similarly need a consumer that offers high availability. AWS Lambda offers high availability and resilience by running your consumer functions across multiple Availability Zones in a Region. This means that even if one Availability Zone experiences an outage, your Lambda function will continue to operate in other healthy Availability Zones. While Lambda manages security patching and Availability Zone failure scenarios, you can focus on your application logic.
  6. Cross-account event processingCross-account connectivity between AWS Lambda and Amazon MSK allows a Lambda function in one AWS account to consume data from an MSK cluster in another account using MSK multi-VPC private connectivity powered by AWS PrivateLink. This setup is particularly beneficial for organizations that centralize Kafka infrastructure while maintaining separate accounts for different applications or teams.
  7. Support for JSON, Avro, Protobuf, and Schema Registries – AWS Lambda supports Kafka events in JSON, Avro and Protobuf formats via event source mapping. It integrates with AWS Glue Schema registry, Confluent Cloud Schema registry, and self-managed Confluent Schema registry , enabling native schema validation, filtering, and deserialization without custom code.

How Lambda processes messages from your Kafka topic

Lambda uses event source mappings to process records from Amazon MSK by actively polling Kafka topics through event pollers that invoke Lambda functions with batches of records. These mappings are Lambda managed resources designed for high-throughput, stream-based processing. By default, Lambda detects the OffsetLag for all partitions in your Kafka topic and automatically scales pollers based on traffic. For high-throughput applications, you can enable provisioned mode to define minimum and maximum pollers, and your event source mapping auto scales between the minimum and maximum defined values. In the provisioned mode, each poller can process up to 5 MBps and supports concurrent Lambda invocations.

After Lambda processes each batch, it commits the offsets of the messages in that batch. If your function returns an error for a message in a batch, Lambda retries the whole batch of messages until processing succeeds or the messages expire. You can send records that fail all retry attempts to an on-failure destination for later processing. To maintain ordered processing within a partition, Lambda limits the maximum event pollers to the number of partitions in the topic. When setting up Kafka as a Lambda event source, you can specify a consumer group ID to let Lambda join an existing Kafka consumer group. If other consumers are active in that group, Lambda will receive only part of the topic’s messages. If the group exists, Lambda starts from the group’s committed offset, ignoring the StartingPosition. The following diagram illustrates this flow.

Walkthrough: Build a serverless Kafka app with AWS Lambda

Follow these steps to build a serverless application that consumes messages from an MSK cluster using AWS Lambda:

  1. Create an Amazon MSK cluster. Use the AWS Management Console or AWS CLI to create your MSK cluster. When the cluster is up, create your Kafka topic(s). For detailed instructions, refer to the Amazon MSK documentation.
  2. Create a Lambda function using the AWS Management Console or the AWS CLI. To learn more about creating a Lambda function, refer to Create your first Lambda function. The Lambda function’s execution role needs to have the following permissions:
    1. Access to connect to your MSK cluster
    2. Permissions to manage elastic network interfaces in your VPC
  3. To connect Lambda to Amazon MSK as a consumer, set up event source mapping to link your MSK topic with the Lambda function. This allows Lambda to automatically poll for new messages and process them. Follow the guide on how to configure event source mapping.

For reference, configuring event source mapping involves three steps:

  1. Network setup – In the default event source mapping mode, you need to configure a networking setup using a PrivateLink endpoint or NAT gateway for event source mapping to invoke Lambda functions. In provisioned mode, no networking configuration is needed (and you don’t incur the cost of networking components).
  2. Event source mapping parameter configuration – This involves setting necessary configuration parameters for the event source mapping to be able to poll messages from your Kafka cluster. This includes the MSK cluster, topic name, consumer group ID, authentication method, and optionally, schema registry, scaling mode. You can configure the scaling mode for provisioned throughput, along with batch size, batch window, and event filtering for your event source mapping.
  3. Access permissions – This involves configuring required permissions to access the required AWS resources, and includes configuring permissions for the function to execute the code, permissions for the event source mapping to access your MSK cluster, and permissions for Lambda to access your VPC resources.

The following screenshot shows the console setup for configuring Amazon MSK event source mapping, including the Amazon MSK trigger related fields.

The following screenshot shows event poller configuration.

The following screenshot shows additional settings you can use, depending on your use case.

Optimizing AWS Lambda for stream processing with Amazon MSK

When building real-time data processing pipelines with Amazon MSK and AWS Lambda, it’s important to tune your setup for both performance and cost-efficiency. Lambda offers powerful serverless compute capabilities, but to get the most out of it in a streaming context, you need to make a few key optimizations:

  1. Enable provisioned concurrency for low-latency processing – For workloads that are sensitive to latency—cold starts can introduce unwanted delays. By enabling provisioned concurrency, you can pre-warm a specified number of Lambda instances so they’re always ready to handle traffic immediately. This eliminates cold starts and provides consistent response times, which is crucial for latency-critical use cases.
  2. Enable provisioned mode for event source mapping for high-throughput processing – For Kafka workloads with stringent throughput requirements, activate the provisioned mode. The optimal configuration of minimum and maximum event pollers for your Kafka event source mapping depends on your application’s performance requirements. Start with the default minimum event pollers to baseline the performance profile and adjust event pollers based on observed message processing patterns and your application’s performance requirements. For workloads with spiky traffic and strict performance needs, increase the minimum event pollers to handle sudden surges. You can fine-tune the minimum event pollers by evaluating your desired throughput, your observed throughput, which depends on factors such as the ingested messages per second and average payload size, and using the throughput capacity of one event poller (up to 5 MB/s) as reference. To maintain ordered processing within a partition, Lambda caps the maximum event pollers at the number of partitions in the topic.
  3. Optimize message batching using size and windowing – By integrating Lambda with Amazon MSK, you can control how messages are batched before they’re sent to your function. Tuning parameters such as batch size (the number of records per invocation: 1–10,000 records) and maximum batching window (how long to wait for a full batch: 0–300 seconds) can significantly impact performance. Larger batches mean fewer invocations, which reduces overhead and improves throughput. However, it’s important to strike a balance—too large a batch or window might introduce unwanted processing delays. Monitor your stream’s behavior and adjust these settings based on throughput requirements and acceptable latency.
  4. Apply filters to reduce unnecessary invocations – Not every record in your Kafka topic might require processing. To avoid unnecessary Lambda invocations (and associated costs), apply filtering logic directly when configuring the event source mapping. With Lambda, you can define filtering (up to 10 filters) criteria so that only relevant records trigger your function. This helps reduce compute time, minimize noise, and optimize your budget, especially when dealing with high-throughput topics with mixed content. For Amazon MSK, Lambda commits offsets for matched and unmatched messages after successfully invoking the function.

Conclusion

By combining Amazon MSK with AWS Lambda, you can seamlessly build modern, serverless event-driven applications. This integration eliminates the need to manage consumer groups, compute infrastructure, or scaling logic so teams can focus on delivering business value faster.

Whether you’re integrating Kafka into microservices, transforming data pipelines, or building reactive applications, Lambda with Amazon MSK is a powerful and flexible serverless solution. For detailed documentation on how to configure Lambda with Amazon MSK, refer to the AWS Lambda Developer Guide. For more serverless learning resources, visit Serverless Land.


About the Authors

Tarun Rai Madan is a Principal Product Manager at Amazon Web Services (AWS). He specializes in serverless technologies and leads product strategy to help customers achieve accelerated business outcomes with event-driven applications, using services like AWS Lambda, AWS Step Functions, Apache Kafka, and Amazon SQS/SNS. Prior to AWS, he was an engineering leader in the semiconductor industry, and led development of high-performance processors for wireless, automotive, and data center applications.

Masudur Rahaman Sayem is a Streaming Data Architect at AWS with over 25 years of experience in the IT industry. He collaborates with AWS customers worldwide to architect and implement sophisticated data streaming solutions that address complex business challenges. As an expert in distributed computing, Sayem specializes in designing large-scale distributed systems architecture for maximum performance and scalability. He has a keen interest and passion for distributed architecture, which he applies to designing enterprise-grade solutions at internet scale.

Enhance data ingestion performance in Amazon Redshift with concurrent inserts

Post Syndicated from Raghu Kuppala original https://aws.amazon.com/blogs/big-data/enhance-data-ingestion-performance-in-amazon-redshift-with-concurrent-inserts/

Amazon Redshift is a fully managed petabyte data warehousing service in the cloud. Its massively parallel processing (MPP) architecture processes data by distributing queries across compute nodes. Each node executes identical query code on its data portion, enabling parallel processing.

Amazon Redshift employs columnar storage for database tables, reducing overall disk I/O requirements. This storage method significantly improves analytic query performance by minimizing data read during queries. Data has become many organizations’ most valuable asset, driving demand for real-time or near real-time analytics in data warehouses. This demand necessitates systems that support simultaneous data loading while maintaining query performance. This post showcases the key improvements in Amazon Redshift concurrent data ingestion operations.

Challenges and pain points for write workloads

In a data warehouse environment, managing concurrent access to data is crucial yet challenging. Customers using Amazon Redshift ingest data using various approaches. For example, you might commonly use INSERT and COPY statements to load data to a table, which are also called pure write operations. You might have requirements for low-latency ingestions to maximize data freshness. To achieve this, you can submit queries concurrently to the same table. To enable this, Amazon Redshift implements snapshot isolation by default. Snapshot isolation provides data consistency when multiple transactions are running simultaneously. Snapshot isolation guarantees that each transaction sees a consistent snapshot of the database as it existed at the start of the transaction, preventing read and write conflicts that could compromise data integrity. With snapshot isolation, read queries are able to execute in parallel, so you can take advantage of the full performance that the data warehouse has to offer.

However, pure write operations execute sequentially. Specifically, pure write operations need to acquire an exclusive lock during the entire transaction. They only release the lock when the transaction has committed the data. In these cases, the performance of the pure write operations is constrained by the speed of serial execution of the writes across sessions.

To understand this better, let’s look at how a pure write operation works. Every pure write operation includes pre-ingestion tasks such as scanning, sorting, and aggregation on the same table. After the pre-ingestion tasks are complete, the data is written to the table while maintaining data consistency. Because the pure write operations run serially, even the pre-ingestion steps ran serially due to lack of concurrency. This means that when multiple pure write operations are submitted concurrently, they are processed one after another, with no parallelization even for the pre-ingestion steps. To improve the concurrency of ingestion to the same table and meet low latency requirements for ingestion, customers often use workarounds through the use of staging tables. Specifically, you can submit INSERT ... VALUES(..) statements into staging tables. Then, you perform joins with other tables, such FACT and DIMENSION tables, prior to appending data using ALTER TABLE APPEND into your target tables. This approach isn’t desirable because it requires you to maintain staging tables and potentially have a larger storage footprint due to data block fragmentation from the use of ALTER TABLE APPEND statements.

In summary, the sequential execution of concurrent INSERT and COPY statements, due to their exclusive locking behavior, creates challenges if you want to maximize the performance and efficiency of your data ingestion workflows in Amazon Redshift. To overcome these limitations, you must adopt workaround solutions, introducing additional complexity and overhead. The following section outlines how Amazon Redshift has addressed these pain points with improvements to concurrent inserts.

Concurrent inserts and its benefits

With Amazon Redshift patch 187, Amazon Redshift has introduced significant improvement in concurrency for data ingestion with support for concurrent inserts. This improves concurrent execution of pure write operations such as COPY and INSERT statements, accelerating the time for you to load data into Amazon Redshift. Specifically, multiple pure write operations are able to progress simultaneously and complete pre-ingestion tasks such as scanning, sorting, and aggregation in parallel.

To visualize this improvement, let’s consider an example of two queries, executed concurrently from different transactions.

The following is query 1 in transaction 1:

INSERT INTO table_a SELECT * FROM table_b WHERE table_b.column_x = 'value_a';

The following is query 2 in transaction 2:

INSERT INTO table_a SELECT * FROM table_c WHERE table_c.column_y = 'value_b'

The following figure illustrates a simplified visualization of pure write operations without concurrent inserts.

Without concurrent inserts, the key components are as follows:

  • First, both pure write operations (INSERT) need to read data from table b and table c, respectively.
  • The segment in pink is the scan step (reading data) and the segment in green is write step (actually inserting the data).
  • In the “Before concurrent inserts” state, both queries would run sequentially. Specifically, the scan step in query 2 waits for the insert step in query 1 to complete before it begins.

For example, consider two identically sized queries across different transactions. Both queries need to scan the same amount of data and insert the same amount of data into the target table. Let’s say both are issued at 10:00 AM. First, query 1 would spend from 10:00 AM to 10:50 AM scanning the data and 10:50 AM to 11:00 AM inserting the data. Next, because query 2 is identical in scan and insertion volumes, query 2 would spend from 11:00 AM to 11:50 AM scanning the data and 11:50 AM to 12:00 PM inserting the data. Both transactions started at 10:00 AM. The end-to-end runtime is 2 hours (transaction 2 ends at 12:00 PM).The following figure illustrates a simplified visualization of pure write operations with concurrent inserts, compared with the previous example.

With concurrent inserts enabled, the scan step of query 1 and query 2 can progress simultaneously. When either of the queries need to insert data, they now do so serially. Let’s consider the same example, with two identically sized queries across different transactions. Both queries need to scan the same amount of data and insert the same amount of data into the target table. Again, let’s say both are issued at 10:00 AM. At 10:00 AM, query 1 and query 2 begin executing concurrently. From 10:00 AM to 10:50 AM, query 1 and query 2 are able to scan the data in parallel. From 10:50 AM to 11:00 AM, query 1 inserts the data into the target table. Next, from 11:00 AM to 11:10 AM, query 2 inserts the data into the target table. The total end-to-end runtime for both transactions is now reduced to 1 hour and 10 minutes, with query 2 completing at 11:10 AM. In this scenario, the pre-ingestion steps (scanning the data) for both queries are able to run concurrently, taking the same amount of time as in the previous example (50 minutes). However, the actual insertion of data into the target table is now executed serially, with query 1 completing the insertion first, followed by query 2. This demonstrates the performance benefits of the concurrent inserts feature in Amazon Redshift. By allowing the pre-ingestion steps to run concurrently, the overall runtime is improved by 50 minutes compared to the sequential execution before the feature was introduced.

With concurrent inserts, pre-ingestion steps are able to progress simultaneously. Pre-ingestion tasks could be one or a combination of tasks, such as scanning, sorting, and aggregation. There are significant performance benefits achieved in the end-to-end runtime of the queries.

Benefits

You can now benefit from these performance improvements without any additional configuration because the concurrent processing is handled automatically by the service. There are multiple benefits from the improvements in concurrent inserts. You can experience the improvement of end-to-end performance of ingestion workloads when you’re writing to the same table. Internal benchmarking shows that concurrent inserts can improve end-to-end runtime by up to 40% for concurrent insert transactions to the same tables. This feature is particularly beneficial for scan-heavy queries (queries that spend more time reading data than they spend time writing data). The higher the ratio of scan:insert in any query, higher the performance improvement expected.

This feature also improves the throughput and performance for multi-warehouse writes through data sharing. Multi-warehouse writes through data sharing helps you scale your write workloads across dedicated Redshift clusters or serverless workgroups, optimizing resource utilization and achieving more predictable performance for your extract, transform, and load (ETL) pipelines. Specifically, in multi-warehouse writes through data sharing, queries from different warehouses can write data on the same table. Concurrent inserts improve the end-to-end performance of these queries by reducing resource contention and enabling them to make progress simultaneously.

The following figure shows the performance improvements from internal tests from concurrent inserts, with the orange bar indicating the performance improvement for multi-warehouse writes through data sharing and the blue bar denoting the performance improvement for concurrent inserts on the same warehouse. As the graph indicates, queries with higher scan components relative to insert components benefit up to 40% with this new feature.

You can also experience additional benefits as a result of using concurrent inserts to manage your ingestion pipelines. When you directly write data to the same tables by using the benefit of concurrent inserts instead of using workarounds with ALTER TABLE APPEND statements, you can reduce your storage footprint. This comes in two forms: first from the elimination of temporary tables, and second from the reduction in table fragmentation from frequent ALTER TABLE APPEND statements. Additionally, you can avoid operational overhead of managing complex workarounds and rely on frequent background and customer-issued VACUUM DELETE operations to manage the fragmentation caused by appending temporary tables to your target tables.

Considerations

Although the concurrent insert enhancements in Amazon Redshift provide significant benefits, it’s important to be aware of potential deadlock scenarios that can arise in a snapshot isolation environment. Specifically, in a snapshot isolation environment, deadlocks can occur in certain conditions when running concurrent write transactions on the same table. The snapshot isolation deadlock happens when concurrent INSERT and COPY statements are sharing a lock and making progress, and another statement needs to perform an operation (UPDATE, DELETE, MERGE, or DDL operation) that requires an exclusive lock on the same table.

Consider the following scenario:

  • Transaction 1:
    INSERT/COPY INTO table_A;

  • Transaction 2:
    INSERT/COPY INTO table_A;
    <UPDATE/DELETE/MERGE/DDL statement> table_A

A deadlock can occur when multiple transactions with INSERT and COPY operations are running concurrently on the same table with a shared lock, and one of those transactions follows its pure write operation with an operation that requires an exclusive lock, such as an UPDATE, MERGE, DELETE, or DDL statement. To avoid the deadlock in these situations, you can separate statements requiring an exclusive lock (UPDATE, MERGE, DELETE, DDL statements) to a different transaction so that INSERT and COPY statements can progress simultaneously, and the statements requiring exclusive locks can execute after them. Alternatively, for transactions with INSERT and COPY statements and MERGE, UPDATE, and DELETE statements on same table, you can include retry logic in your applications to work around potential deadlocks. Refer to Potential deadlock situation for concurrent write transactions involving a single table for more information about deadlocks, and see Concurrent write examples for examples of concurrent transactions.

Conclusion

In this post, we demonstrated how Amazon Redshift has addressed a key challenge: improving concurrent data ingestion performance into a single table. This enhancement can help you meet your requirements for low latency and stricter SLAs when accessing the latest data. The update exemplifies our commitment to implementing critical features in Amazon Redshift based on customer feedback.


About the authors

Raghu Kuppala is an Analytics Specialist Solutions Architect experienced working in the databases, data warehousing, and analytics space. Outside of work, he enjoys trying different cuisines and spending time with his family and friends.

Sumant Nemmani is a Senior Technical Product Manager at AWS. He is focused on helping customers of Amazon Redshift benefit from features that use machine learning and intelligent mechanisms to enable the service to self-tune and optimize itself, ensuring Redshift remains price-performant as they scale their usage.

Gagan Goel is a Software Development Manager at AWS. He ensures that Amazon Redshift features meet customer needs by prioritising and guiding the team in delivering customer-centric solutions, monitor and enhance query performance for customer workloads.

Kshitij Batra is a Software Development Engineer at Amazon, specializing in building resilient, scalable, and high-performing software solutions.

Sanuj Basu is a Principal Engineer at AWS, driving the evolution of Amazon Redshift into a next-generation, exabyte-scale cloud data warehouse. He leads engineering for Redshift’s core data platform — including managed storage, transactions, and data sharing — enabling customers to power seamless multi-cluster analytics and modern data mesh architectures. Sanuj’s work helps Redshift customers break through th