Tag Archives: Analytics

Unlocking near real-time analytics with petabytes of transaction data using Amazon Aurora Zero-ETL integration with Amazon Redshift and dbt Cloud

Post Syndicated from BP Yau original https://aws.amazon.com/blogs/big-data/unlocking-near-real-time-analytics-with-petabytes-of-transaction-data-using-amazon-aurora-zero-etl-integration-with-amazon-redshift-and-dbt-cloud/

While customers can perform some basic analysis within their operational or transactional databases, many still need to build custom data pipelines that use batch or streaming jobs to extract, transform, and load (ETL) data into their data warehouse for more comprehensive analysis.

Zero-ETL integration with Amazon Redshift reduces the need for custom pipelines, preserves resources for your transactional systems, and gives you access to powerful analytics. Within seconds of transactional data being written into Amazon Aurora (a fully managed modern relational database service offering performance and high availability at scale), the data is seamlessly made available in Amazon Redshift for analytics and machine learning. The data in Amazon Redshift is transactionally consistent and updates are automatically and continuously propagated.

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing ETL, business intelligence (BI), and reporting tools. Together with price-performance, Amazon Redshift offers capabilities such as serverless architecture, machine learning integration within your data warehouse and secure data sharing across the organization.

dbt helps manage data transformation by enabling teams to deploy analytics code following software engineering best practices such as modularity, continuous integration and continuous deployment (CI/CD), and embedded documentation.

dbt Cloud is a hosted service that helps data teams productionize dbt deployments. dbt Cloud offers turnkey support for job scheduling, CI/CD integrations; serving documentation, native git integrations, monitoring and alerting, and an integrated developer environment (IDE) all within a web-based UI.

In this post, we explore how to use Aurora MySQL-Compatible Edition Zero-ETL integration with Amazon Redshift and dbt Cloud to enable near real-time analytics. By using dbt Cloud for data transformation, data teams can focus on writing business rules to drive insights from their transaction data to respond effectively to critical, time sensitive events. This enables the line of business (LOB) to better understand their core business drivers so they can maximize sales, reduce costs, and further grow and optimize their business.

Solution overview

Let’s consider TICKIT, a fictional website where users buy and sell tickets online for sporting events, shows, and concerts. The transactional data from this website is loaded into an Aurora MySQL 3.05.0 (or a later version) database. The company’s business analysts want to generate metrics to identify ticket movement over time, success rates for sellers, and the best-selling events, venues, and seasons. Analysts can use this information to provide incentives to buyers and sellers who frequently use the site, to attract new users, and to drive advertising and promotions.

The Zero-ETL integration between Aurora MySQL and Amazon Redshift is set up by using a CloudFormation template to replicate raw ticket sales information to a Redshift data warehouse. After the data is in Amazon Redshift, dbt models are used to transform the raw data into key metrics such as ticket trends, seller performance, and event popularity. These insights help analysts make data-driven decisions to improve promotions and user engagement.

The following diagram illustrates the solution architecture at a high-level.

To implement this solution, complete the following steps:

  1. Set up Zero-ETL integration from the AWS Management Console for Amazon Relational Database Service (Amazon RDS).
  2. Create dbt models in dbt Cloud.
  3. Deploy dbt models to Amazon Redshift.

Prerequisites

Set up resources with CloudFormation

This post provides a CloudFormation template as a general guide. You can review and customize it to suit your needs. Some of the resources that this stack deploys incur costs when in use.

The CloudFormation template provisions the following components

  • An Aurora MySQL provisioned cluster (source)
  • An Amazon Redshift Serverless data warehouse (target)
  • Zero-ETL integration between the source (Aurora MySQL) and target (Amazon Redshift Serverless)

To create your resources:

  1. Sign in to the console.
  2. Choose the us-east-1 AWS Region in which to create the stack.
  3. Choose Launch Stack

       Launch Cloudformation Stack

  1. Choose Next.

This automatically launches CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the CloudFormation template from within the console.

  1. For Stack name, enter a stack name.
  2. Keep the default values for the rest of the Parameters and choose Next.
  3. On the next screen, choose Next.
  4. Review the details on the final screen and select I acknowledge that AWS CloudFormation might create IAM resources.
  5. Choose Submit.

Stack creation can take up to 30 minutes.

  1. After the stack creation is complete go to the Outputs tab of the stack and record the values of the keys for the following components, which you will use in a later step:
  • NamespaceName
  • PortNumber
  • RDSPassword
  • RDSUsername
  • RedshiftClusterSecurityGroupName
  • RedshiftPassword
  • RedshiftUsername
  • VPC
  • Workinggroupname
  • ZeroETLServicesRoleNameArn

  1. Configure your Amazon Redshift data warehouse security group settings to allow inbound traffic from dbt IP addresses.
  2. You’re now ready to sign in to both Aurora MySQL cluster and Amazon Redshift Serverless data warehouse and run some basic commands to test them.

Create a database from integration in Amazon Redshift

To create a target database using Redshift query editor V2:

  1. On the Amazon Redshift Serverless console, choose the zero-etl-destination workgroup.
  2. Choose Query data to open Query Editor v2.
  3. Connect to an Amazon Redshift Serverless data warehouse using the username and password from the CloudFormation resource creation step.
  4. Get the integration_id from the svv_integration system table.
select integration_id from svv_integration; ---- copy this result, use in the next sql
  1. Use the integration_id from the preceding step to create a new database from the integration.
CREATE DATABASE aurora_zeroetl_integration FROM INTEGRATION '<result from above>';

The integration between Aurora MYSQL and the Amazon Redshift Serverless data warehouse is now complete.

Populate source data in Aurora MySQL

You’re now ready to populate source data in Amazon Aurora MYSQL.

You can use your favorite query editor installed on either an Amazon Elastic Compute Cloud (Amazon EC2) instance or your local system to interact with Aurora MYSQL. However, you need to provide access to Aurora MYSQL from the machine where the query editor is installed. To achieve this, modify the security group inbound rules to allow the IP address of your machine and make Aurora publicly accessible.

To populate source data:

  1. Run the following script on Query Editor to create the sample database DEMO_DB and tables inside DEMO_DB.
create database demodb;

create table demodb.users(
userid integer not null primary key,
username char(8),
firstname varchar(30),
lastname varchar(30),
city varchar(30),
state char(2),
email varchar(100),
phone char(14),
likesports boolean,
liketheatre boolean,
likeconcerts boolean,
likejazz boolean,
likeclassical boolean,
likeopera boolean,
likerock boolean,
likevegas boolean,
likebroadway boolean,
likemusicals boolean);

create table demodb.venue(
venueid integer not null primary key,
venuename varchar(100),
venuecity varchar(30),
venuestate char(2),
venueseats integer);

create table demodb.category(
catid integer not null primary key,
catgroup varchar(10),
catname varchar(10),
catdesc varchar(50));

create table demodb.date (
dateid integer not null primary key,
caldate date not null,
day character(3) not null,
week smallint not null,
month character(5) not null,
qtr character(5) not null,
year smallint not null,
holiday boolean default FALSE );

create table demodb.event(
eventid integer not null primary key,
venueid integer not null,
catid integer not null,
dateid integer not null,
eventname varchar(200),
starttime timestamp);

create table demodb.listing(
listid integer not null primary key,
sellerid integer not null,
eventid integer not null,
dateid integer not null,
numtickets smallint not null,
priceperticket decimal(8,2),
totalprice decimal(8,2),
listtime timestamp);

create table demodb.sales(
salesid integer not null primary key,
listid integer not null,
sellerid integer not null,
buyerid integer not null,
eventid integer not null,
dateid integer not null,
qtysold smallint not null,
pricepaid decimal(8,2),
commission decimal(8,2),
saletime timestamp);
  1. Load data from Amazon Simple Storage Service (Amazon S3) to the corresponding table using the following commands:
LOAD DATA FROM S3 PREFIX 's3-us-east-1://aws-bigdata-blog/artifacts/BDB-3864/data/tickit/users/' 
INTO TABLE demodb.users FIELDS TERMINATED BY '|';

LOAD DATA FROM S3 PREFIX 's3-us-east-1://aws-bigdata-blog/artifacts/BDB-3864/data/tickit/venue/' 
INTO TABLE demodb.venue FIELDS TERMINATED BY '|';

LOAD DATA FROM S3 PREFIX 's3-us-east-1://aws-bigdata-blog/artifacts/BDB-3864/data/tickit/category/' 
INTO TABLE demodb.category FIELDS TERMINATED BY '|';

LOAD DATA FROM S3 PREFIX 's3-us-east-1://aws-bigdata-blog/artifacts/BDB-3864/data/tickit/date/' 
INTO TABLE demodb.date FIELDS TERMINATED BY '|';

LOAD DATA FROM S3 PREFIX 's3-us-east-1://aws-bigdata-blog/artifacts/BDB-3864/data/tickit/event/' 
INTO TABLE demodb.event FIELDS TERMINATED BY '|';

LOAD DATA FROM S3 PREFIX 's3-us-east-1://aws-bigdata-blog/artifacts/BDB-3864/data/tickit/listing/' 
INTO TABLE demodb.listing FIELDS TERMINATED BY '|';

LOAD DATA FROM S3 PREFIX 's3-us-east-1://aws-bigdata-blog/artifacts/BDB-3864/data/tickit/sales/' 
INTO TABLE demodb.sales FIELDS TERMINATED BY '|';

The following are common errors associated with load from Amazon S3:

  • For the current version of the Aurora MySQL cluster, set the aws_default_s3_role parameter in the database cluster parameter group to the role Amazon Resource Name (ARN) that has the necessary Amazon S3 access permissions.
  • If you get an error for missing credentials, such as the following, you probably haven’t associated your IAM role to the cluster. In this case, add the intended IAM role to the source Aurora MySQL cluster.

Error 63985 (HY000): S3 API returned error: Missing Credentials: Cannot instantiate S3 Client),

Validate the source data in your Amazon Redshift data warehouse

To validate the source data

  1. Navigate to the Redshift Serverless dashboard, open Query Editor v2, and select the workgroup and database created from integration from the drop-down list. Expand the database aurora_zeroetl, schema demodb and you should see 7 tables being created.
  2. Wait a few seconds and run the following SQL query to see integration in action.
select * from aurora_zeroetl_integration.demodb.category;

Transforming data with dbtCloud

Connect dbt Cloud to Amazon Redshift

  1. Create a new project in dbt Cloud. From Account settings (using the gear menu in the top right corner), choose + New Project.
  2. Enter a project name and choose Continue.

  1. For Connection, select Add new connection from the drop-down list.
  2. Select Redshift and enter the following information:
    1. Connection name: The Name of the connection.
    2. Server Hostname: Your Amazon Redshift Serverless endpoint.
    3. Port: Redshift 5439.
    4. Database name: dev.
  3. Make sure you allowlist your dbt Cloud IP address in your Redshift data warehouse security group inbound traffic.
  4. Choose Save to set up your connection.

  1. Set your development credentials. These credentials will be used by dbt Cloud to connect to your Amazon Redshift data warehouse. See the CloudFormation template output for the credentials.
  2. Schemadbt_zetl. dbt Cloud automatically generates a schema name for you. By convention, this is dbt_<first-initial><last-name>. This is the schema connected directly to your development environment, and it’s where your models will be built when running dbt within the Cloud integrated development environment (IDE).

  1. Choose Test Connection. This verifies that dbt Cloud can access your Redshift data warehouse.
  2. Choose Next if the test succeeded. If it failed, check your Amazon Redshift settings and credentials.

Set up a dbt Cloud managed repository

When you develop in dbt Cloud, you can use git to version control your code. For the purposes of this post, use a dbt Cloud-hosted managed repository.

To set up a managed repository:

  1. Under Setup a repository, select Managed.
  2. Enter a name for your repo, such as dbt-zeroetl.
  3. Choose Create. It will take a few seconds for your repository to be created and imported.

Initialize your dbt project and start developing

Now that you have a repository configured, initialize your project and start developing in dbt Cloud.

To start development in dbt Cloud:

  1. In dbt Cloud, choose Start developing in the IDE. It might take a few minutes for your project to spin up for the first time as it establishes your git connection, clones your repo, and tests the connection to the warehouse.

  1. Above the file tree to the left, choose Initialize dbt project. This builds out your folder structure with example models.

  1. Make your initial commit by choosing Commit and sync. Use the commit message initial commit and choose Commit Changes. This creates the first commit to your managed repo and allows you to open a branch where you can add new dbt code.

To build your models

  1. Under Version Control on the left, choose Create branch. Enter a name, such as add-redshift-models. You need to create a new branch because the main branch is set to read-only mode.
  2. Choose dbt_project.yml.
  3. Update the models section of dbt_project.yml at the bottom of the file. Change example to staging and make sure the materialized value is set to table.

models:

my_new_project:

# Applies to all files under models/example/

staging:

materialized: table

  1. Choose the three-dot icon () next to the models directory, then select Create Folder.
  2. Name the folder staging, then choose Create.
  3. Choose the three-dot icon () next to the models directory, then select Create Folder.
  4. Name the folder dept_finance, then choose Create.
  5. Choose the three-dot icon () next to the staging directory, then select Create File.

  1. Name the file sources.yml, then choose Create.
  2. Copy the following query into the file and choose Save.
version: 2
sources:
- name: ops
database: aurora_zeroetl_integration
schema: demodb
tables:
- name: category
- name: date
- name: event
- name: listing
- name: users
- name: venue
- name: sales

Be aware that the operation database created on your Amazon Redshift data warehouse is a special read only database and you cannot directly connect to it to create objects. You need to connect to another regular database and use three-part notation as defined in sources.yml to query data from it.

  1. Choose the three-dot icon () directory, then select Create File.
  2. Name the file staging_event.sql, then choose Create.
  3. Copy the following query into the file and choose Save.
with source as (
select * from {{ source('ops', 'event') }}
)
SELECT
eventid::integer AS eventid,
venueid::smallint AS venueid,
catid::smallint AS catid,
dateid::smallint AS dateid,
eventname::varchar(200) AS eventname,
starttime::timestamp AS starttime,
current_timestamp as etl_load_timestamp
from source
  1. Choose the three-dot icon ()  next to the staging directory, then select Create File.
  2. Name the file staging_sales.sql, then choose Create.
  3. Copy the following query into the file and choose Save.
with store_source as (
select * from {{ source('ops', 'sales') }}
)
SELECT
salesid::integer AS salesid,
'store' as salestype,
listid::integer AS listid,
sellerid::integer AS sellerid,
buyerid::integer AS buyerid,
eventid::integer AS eventid,
dateid::smallint AS dateid,
qtysold::smallint AS qtysold,
pricepaid::decimal(8,2) AS pricepaid,
commission::decimal(8,2) AS commission,
saletime::timestamp AS saletime,
current_timestamp as etl_load_timestamp
from store_source
  1. Choose the three-dot icon ()  next to the dept_finance directory, then select Create File.
  2. Name the file rpt_finance_qtr_total_sales_by_event.sql, then choose Create.
  3. Copy the following query into the file and choose Save.
select
date_part('year', a.saletime) as year,
date_part('quarter', a.saletime) as quarter,
b.eventname,
count(a.salesid) as sales_made,
sum(a.pricepaid) as sales_revenue,
sum(a.commission) as staff_commission,
staff_commission / sales_revenue as commission_pcnt
from {{ref('staging_sales')}} a
left join {{ref('staging_event')}} b on a.eventid = b.eventid
group by
year,
quarter,
b.eventname
order by
year,
quarter,
b.eventname
  1. Choose the three-dot icon () next to the dept_finance directory, then select Create File.
  2. Name the file rpt_finance_qtr_top_event_by_sales.sql, then choose Create.
  3. Copy the following query into the file and choose Save.
select *
from
(
select
*,
rank() over (partition by year, quarter order by sales_revenue desc) as row_num
from {{ref('rpt_finance_qtr_total_sales_by_event')}}
)
where row_num <= 3
  1. Choose the three-dot icon () next to the example directory, then select Delete.
  2. Enter dbt run in the command prompt at the bottom of the screen and press Enter.

  1. You should get a successful run and see the four models.

  1. Now that you have successfully run the dbt model, you should be able to find it in the Amazon Redshift data warehouse. Go to Redshift Query Editor v2, refresh the dev database, and verify that you have a new dbt_zetl schema with the staging_event and staging_sales tables and rpt_finance_qtr_top_event_by_sales and rpt_finance_qtr_total_sales_by_event views in it.

  1. Run the following SQL statement to verify that data has been loaded into your Amazon Redshift table.
    SELECT * FROM dbt_zetl.rpt_finance_qtr_total_sales_by_event;
    SELECT * FROM dbt_zetl.rpt_finance_qtr_top_event_by_sales;

Add tests to your models

Adding tests to a project helps validate that your models are working correctly.

To add tests to your project:

  1. Create a new YAML file in the models directory and name it models/schema.yml.
  2. Add the following contents to the file:
version: 2
models:
- name: rpt_finance_qtr_top_events_by_sales
columns:
- name: year
tests:
- not_null
- name: rpt_finance_qtr_total_sales_by_event
columns:
- name: year
tests:
- not_null
- name: staging_event
columns:
- name: eventid
tests:
- not_null
- name: staging_sales
columns:
- name: salesid
tests:
- not_null
  1. Run dbt test, and confirm that all your tests passed.
  2. When you run dbt test, dbt iterates through your YAML files and constructs a query for each test. Each query will return the number of records that fail the test. If this number is 0, then the test is successful.

Document your models

By adding documentation to your project, you can describe your models in detail and share that information with your team.

To add documentation:

  1. Run dbt docs generate to generate the documentation for your project. dbt inspects your project and your warehouse to generate a JSON file documenting your project.

  1. Choose the book icon in the Develop interface to launch documentation in a new tab.

Commit your changes

Now that you’ve built your models, you need to commit the changes you made to the project so that the repository has your latest code.

To commit the changes:

  1. Under Version Control on the left, choose Commit and sync and add a message. For example, Add Aurora zero-ETL integration with Redshift models.

  1. Choose Merge this branch to main to add these changes to the main branch on your repo.

Deploy dbt

Use dbt Cloud’s Scheduler to deploy your production jobs confidently and build observability into your processes. You’ll learn to create a deployment environment and run a job in the following steps.

To create a deployment environment:

  1. In the left pane, select Deploy, then choose Environments.

  1. Choose Create Environment.
  2. In the Name field, enter the name of your deployment environment. For example, Production.
  3. In the dbt Version field, select Versionless from the dropdown.
  4. In the Connection field, select the connection used earlier in development.
  5. Under Deployment Credentials, enter the credentials used to connect to your Redshift data warehouse. Choose Test Connection.

  1. Choose Save.

Create and run a job

Jobs are a set of dbt commands that you want to run on a schedule.

To create and run a job:

  1. After creating your deployment environment, you should be directed to the page for a new environment. If not, select Deploy in the left pane, then choose Jobs.
  2. Choose Create job and select Deploy job.
  3. Enter a Job name, such as,  Production run, and link to the environment you just created.
  4. Under Execution Settings, select Generate docs on run.
  5. Under Commands, add this command as part of your job if you don’t see them:
    • dbt build
  6. For this exercise, don’t set a schedule for your project to run—while your organization’s project should run regularly, there’s no need to run this example project on a schedule. Scheduling a job is sometimes referred to as deploying a project.

  1. Choose Save, then choose Run now to run your job.
  2. Choose the run and watch its progress under Run history.
  3. After the run is complete, choose View Documentation to see the docs for your project.

Clean up

When you’re finished, delete the CloudFormation stack since some of the AWS resources in this walkthrough incur a cost if you continue to use them. Complete the following steps:

  1. On the CloudFormation console, choose Stacks.
  2. Choose the stack you launched in this walkthrough. The stack must be currently running.
  3. In the stack details pane, choose Delete.
  4. Choose Delete stack.

Summary

In this post, we showed you how to set up Amazon Aurora MySQL Zero-ETL integration from Aurora MySQL to Amazon Redshift, which eliminates complex data pipelines and enables near real-time analytics on transactional and operational data. We also showed you how to build dbt models on Aurora MySQL Zero-ETL integration tables in Amazon Redshift to transform the data to get insight.

We look forward to hearing from you about your experience. If you have questions or suggestions, leave a comment.


About the authors

BP Yau is a Sr Partner Solutions Architect at AWS. His role is to help customers architect big data solutions to process data at scale. Before AWS, he helped Amazon.com Supply Chain Optimization Technologies migrate its Oracle data warehouse to Amazon Redshift and build its next generation big data analytics platform using AWS technologies.

Saman Irfan is a Senior Specialist Solutions Architect at Amazon Web Services, based in Berlin, Germany. She collaborates with customers across industries to design and implement scalable, high-performance analytics solutions using cloud technologies. Saman is passionate about helping organizations modernize their data architectures and unlock the full potential of their data to drive innovation and business transformation. Outside of work, she enjoys spending time with her family, watching TV series, and staying updated with the latest advancements in technology.

Raghu Kuppala is an Analytics Specialist Solutions Architect experienced working in the databases, data warehousing, and analytics space. Outside of work, he enjoys trying different cuisines and spending time with his family and friends.

Neela Kulkarni is a Solutions Architect with Amazon Web Services. She primarily serves independent software vendors in the Northeast US, providing architectural guidance and best practice recommendations for new and existing workloads. Outside of work, she enjoys traveling, swimming, and spending time with her family.

Intel Accelerators on Amazon OpenSearch Service improve price-performance on vector search by up to 51%

Post Syndicated from Mulugeta Mammo original https://aws.amazon.com/blogs/big-data/intel-accelerators-on-amazon-opensearch-service-improve-price-performance-on-vector-search-by-up-to-51/

This post is co-written with Mulugeta Mammo and Akash Shankaran from Intel.

Today, we’re excited to announce the availability of Intel Advanced Vector Extensions 512 (AVX-512) technology acceleration on vector search workloads when you run OpenSearch 2.17+ domains with the 4th generation Intel Xeon Intel instances on the Amazon OpenSearch Service. When you run OpenSearch 2.17 domains on C/M/R 7i instances, you can gain up to 51% in vector search performance at no additional cost compared to previous R5 Intel instances.

Increasingly, application builders are using vector search to improve the search quality of their applications. This modern technique involves encoding content into numerical representations (vectors) that can be used to find similarities between content. For instance, it’s used in generative AI applications to match user queries to semantically similar knowledge articles providing context and grounding for generative models to perform tasks. However, vector search is computationally intensive, and higher compute and memory requirements can lead to higher costs than traditional search. Therefore, cost optimization levers are important to achieve a favorable balance of cost vs. benefit.

OpenSearch Service is a managed service for the OpenSearch search and analytics suite, which includes support for vector search. By running your OpenSearch 2.17+ domains on C/M/R 7i instances, you can achieve up to a 51% price-performance gain compared to the past R5 instances on OpenSearch Service. As we discuss in this post, this launch offers improvements to your infrastructure total cost of ownership (TCO) and savings.

Accelerating generative AI applications with vectorization

Let’s understand how these technologies come together through the building of a simple generative AI application. First, you bring vector search online by using machine learning (ML) models to encode your content (such as text, image or audio) into vectors. You then index these vectors into an OpenSearch Service domain, enabling real-time content similarity search that can be scaled to search billions of vectors in milliseconds. These vector searches provide contextually relevant insights, which can be further enriched by AI for hyper-personalization and integrated with generative models to power chatbots.

Vector search use cases extend beyond generative AI applications. Use cases include image to semantic search, and recommendations such as the following real-world use case from Amazon Music. The Amazon Music application uses vectorization to encode 100 million songs into vectors that represent both music tracks and customer preferences. These vectors are then indexed in OpenSearch, which manages over a billion vectors and handles up to 7,100 vector queries per second to analyze user listening behavior and provide real-time recommendations.

The indexing and search processes are computationally intensive, requiring calculations between vectors that are typically represented as 128–2,048 dimensions (numerical values). The Intel Xeon Scalable processors found on the 7th generation Intel instances use Intel AVX-512 to increase the speed and efficiency of vector operations through the following features:

  • Data parallel processing – By processing 512 bits (twice the number of its predecessor) of data at once, Intel AVX-512 efficiently uses SIMD (single input multiple data) to run multiple operations simultaneously, which provides significant speed-up
  • Pathlength reduction – The speed-up is due to a significant improvement in pathlength, which is a measure of the number of instructions required to perform a unit of work in workloads
  • Power performance savings – You can lower power performance costs by processing more data and performing more operations in a shorter amount of time

Benchmarking vector search on OpenSearch

OpenSearch Services R7i Instances with Intel AVX-512 are an excellent choice for OpenSearch vector workloads. They offer a high CPU-to-memory ratio, which further maximizes the compute potential while providing ample memory.

To verify just how much faster the new R7i instances perform, you can run OpenSearch benchmarks firsthand. Using your OpenSearch 2.17 domain, create a k-NN index configured to use either the Lucene or FAISS engine. Use the OpenSearch Benchmark with the public Cohere 10M 768D dataset to replicate the benchmarks published in this post. Replicate these tests using the older R5 instances as the baseline.

In the following sections, we present the benchmarks that demonstrate the 51% price-performance gains between the R7i and the R5 instances.

Lucene engine results

In this post, we define price-performance as the number of documents that can be indexed or search queries executed given a fixed budget ($1), taking into account the instance cost. The following are results of price-performance with the Cohere 10M dataset.

Up to a 44% improvement in price-performance is observed when using the Lucene engine and upgrading from R5 to R7i instances. The difference between the blue and orange bars in the following graphs illustrates the gains contributed by AVX512 acceleration.

FAISS engine results

We also examine results from the same tests performed on k-NN indexes configured on the FAISS engine. Up to 51% price-performance gains is achieved on index performance simply by upgrading from r5 to r7i instances. Again, the difference between the blue and orange bar demonstrates the additional gains contributed by AVX512.

In addition to price-performance gains, search response times also improved by upgrading R5 to R7i instances with AVX512. P90 and P99 latencies were lower by 33% and 38%, respectively.

The FAISS engine has the added benefit of AVX-512 acceleration with FP16 quantized vectors. With FP16 quantization, vectors are compressed to half the size, reducing memory and storage requirements and in turn infrastructure costs. AVX-512 contributes to further price-performance gains.

Conclusion

If you’re looking to modernize search experiences on OpenSearch Service while potentially lowering costs, try out the OpenSearch vector engine on OpenSearch Service C7i, M7i, or R7i instances. Built on 4th Gen Intel Xeon processors, the latest Intel instances provide advanced features like Intel AVX-512 accelerators, improved CPU performance, and higher memory bandwidth than the previous generation, which makes them an excellent choice for optimizing your vector search workloads on OpenSearch Service.

Credits to: Vesa Pehkonen, Noah Staveley, Assane Diop, Naveen Tatikonda


About the Authors

Mulugeta Mammo is a Senior Software Engineer, and currently leads the OpenSearch Optimization team at Intel.

Vamshi Vijay Nakkirtha is a software engineering manager working on the OpenSearch Project and Amazon OpenSearch Service. His primary interests include distributed systems.

Akash Shankaran is a Software Architect and Tech Lead in the Xeon software team at Intel working on OpenSearch. He works on pathfinding opportunities and enabling optimizations within databases, analytics, and data management domains.

Dylan Tong is a Senior Product Manager at Amazon Web Services. He leads the product initiatives for AI and machine learning (ML) on OpenSearch including OpenSearch’s vector database capabilities. Dylan has decades of experience working directly with customers and creating products and solutions in the database, analytics and AI/ML domain. Dylan holds a BSc and MEng degree in Computer Science from Cornell University.


Notices and disclaimers

Performance varies by use, configuration, and other factors. Learn more on the Performance Index website.
Your costs and results may vary.
Intel technologies may require enabled hardware, software, or service activation.

Introducing generative AI troubleshooting for Apache Spark in AWS Glue (preview)

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-generative-ai-troubleshooting-for-apache-spark-in-aws-glue-preview/

Organizations run millions of Apache Spark applications each month to prepare, move, and process their data for analytics and machine learning (ML). Building and maintaining these Spark applications is an iterative process, where developers spend significant time testing and troubleshooting their code. During development, data engineers often spend hours sifting through log files, analyzing execution plans, and making configuration changes to resolve issues. This process becomes even more challenging in production environments due to the distributed nature of Spark, its in-memory processing model, and the multitude of configuration options available. Troubleshooting these production issues requires extensive analysis of logs and metrics, often leading to extended downtimes and delayed insights from critical data pipelines.

Today, we are excited to announce the preview of generative AI troubleshooting for Spark in AWS Glue. This is a new capability that enables data engineers and scientists to quickly identify and resolve issues in their Spark applications. This feature uses ML and generative AI technologies to provide automated root cause analysis for failed Spark applications, along with actionable recommendations and remediation steps. This post demonstrates how you can debug your Spark applications with generative AI troubleshooting.

How generative AI troubleshooting for Spark works

For Spark jobs, the troubleshooting feature analyzes job metadata, metrics and logs associated with the error signature of your job to generates a comprehensive root cause analysis. You can initiate the troubleshooting and optimization process with a single click on the AWS Glue console. With this feature, you can reduce your mean time to resolution from days to minutes, optimize your Spark applications for cost and performance, and focus more on deriving value from your data.

Manually debugging Spark applications can get challenging for data engineers and ETL developers due to a few different reasons:

  • Extensive connectivity and configuration options to a variety of resources with Spark while makes it a popular data processing platform, often makes it challenging to root cause issues when configurations are not correct, especially related to resource setup (S3 bucket, databases, partitions, resolved columns) and access permissions (roles and keys).
  • Spark’s in-memory processing model and distributed partitioning of datasets across its workers while good for parallelism, often make it difficult for users to identify root cause of failures resulting from resource exhaustion issues like out of memory and disk exceptions.
  • Lazy evaluation of Spark transformations while good for performance, makes it challenging to accurately and quickly identify the application code and logic which caused the failure from the distributed logs and metrics emitted from different executors.

Let’s look at a few common and complex Spark troubleshooting scenarios where Generative AI Troubleshooting for Spark can save hours of manual debugging time required to deep dive and come up with the exact root cause.

Resource setup or access errors

Spark applications allows to integrate data from a variety of resources like datasets with several partitions and columns on S3 buckets and Data Catalog tables, use the associated job IAM roles and KMS keys for correct permissions to access these resources, and require these resources to exist and be available in the right regions and locations referenced by their identifiers. Users can mis-configure their applications that result in errors requiring deep dive into the logs to understand the root cause being a resource setup or permission issue.

Manual RCA: Failure reason and Spark application Logs

Following example shows the failure reason for such a common setup issue for S3 buckets in a production job run. The failure reason coming from Spark does not help understand the root cause or the line of code that needs to be inspected for fixing it.

Exception in User Class: org.apache.spark.SparkException : Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3) (172.36.245.14 executor 1): com.amazonaws.services.glue.util.NonFatalException: Error opening file:

After deep diving into the logs of one of the many distributed Spark executors, it becomes clear that the error was caused due to a S3 bucket not existing, however the error stack trace is usually quite long and truncated to understand the precise root cause and location within Spark application where the fix is needed.

Caused by: java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: The specified bucket does not exist (Service: Amazon S3; Status Code: 404; Error Code: NoSuchBucket; Request ID: 80MTEVF2RM7ZYAN9; S3 Extended Request ID: AzRz5f/Amtcs/QatfTvDqU0vgSu5+v7zNIZwcjUn4um5iX3JzExd3a3BkAXGwn/5oYl7hOXRBeo=; Proxy: null), S3 Extended Request ID: AzRz5f/Amtcs/QatfTvDqU0vgSu5+v7zNIZwcjUn4um5iX3JzExd3a3BkAXGwn/5oYl7hOXRBeo=
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.list(Jets3tNativeFileSystemStore.java:423)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.isFolderUsingFolderObject(Jets3tNativeFileSystemStore.java:249)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.isFolder(Jets3tNativeFileSystemStore.java:212)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.getFileStatus(S3NativeFileSystem.java:518)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:935)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.open(S3NativeFileSystem.java:927)
at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:983)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.open(EmrFileSystem.java:197)
at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.initialize(TapeHadoopRecordReaderSplittable.scala:168)
... 29 more

With Generative AI Spark Troubleshooting: RCA and Recommendations

With Spark Troubleshooting, you simply click the Troubleshooting analysis button on your failed job run, and the service analyzes the debug artifacts of your failed job to identify the root cause analysis along with the line number in your Spark application that you can inspect to further resolve the issue.

Spark Out of Memory Errors

Let’s take a common but relatively complex error that requires significant manual analysis to conclude its because of a Spark job running out of memory on Spark driver (master node) or one of the distributed Spark executors. Usually, troubleshooting requires an experienced data engineer to manually go over the following steps to identify the root cause.

  • Search through Spark driver logs to find the exact error message
  • Navigate to the Spark UI to analyze memory usage patterns
  • Review executor metrics to understand memory pressure
  • Analyze the code to identify memory-intensive operations

This process often takes hours because the failure reason from Spark is usually not challenging to understand that it was a out of memory issue on the Spark driver and what is the remedy to fix it.

Manual RCA: Failure reason and Spark application Logs

Following example shows the failure reason for the error.

Py4JJavaError: An error occurred while calling o4138.collectToPython. java.lang.StackOverflowError

Spark driver logs require extensive search to find the exact error message. In this case, the error stack trace consisted of more than hundred function calls and is challenging to understand the precise root cause as the Spark application terminated abruptly.

py4j.protocol.Py4JJavaError: An error occurred while calling o4138.collectToPython.
: java.lang.StackOverflowError
 at org.apache.spark.sql.catalyst.trees.TreeNode$$Lambda$1942/131413145.get$Lambda(Unknown Source)
 at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:798)
 at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:459)
 at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:781)
 at org.apache.spark.sql.catalyst.trees.TreeNode.clone(TreeNode.scala:881)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$clone(LogicalPlan.scala:30)
 at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.clone(AnalysisHelper.scala:295)
 at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.clone$(AnalysisHelper.scala:294)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.clone(LogicalPlan.scala:30)
 at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.clone(LogicalPlan.scala:30)
 at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$clone$1(TreeNode.scala:881)
 at org.apache.spark.sql.catalyst.trees.TreeNode.applyFunctionIfChanged$1(TreeNode.scala:747)
 at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:783)
 at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:459)
 ... repeated several times with hundreds of function calls

With Generative AI Spark Troubleshooting: RCA and Recommendations

With Spark Troubleshooting, you can click the Troubleshooting analysis button on your failed job run and get a detailed root cause analysis with the line of code which you can inspect, and also recommendations on best practices to optimize your Spark application for fixing the problem.

Spark Out of Disk Errors

Another complex error pattern with Spark is when it runs out of disk storage on one of the many Spark executors in the Spark application. Similar to Spark OOM exceptions, manual troubleshooting requires extensive deep dive into distributed executor logs and metrics to understand the root cause and identify the application logic or code causing the error due to Spark’s lazy execution of its transformations.

Manual RCA: Failure Reason and Spark application Logs

The associated failure reason and error stack trace in the application logs is again quiet long requiring the user to gather more insights from Spark UI and Spark metrics to identify the root cause and identify the resolution.

An error occurred while calling o115.parquet. No space left on device
py4j.protocol.Py4JJavaError: An error occurred while calling o115.parquet.
: org.apache.spark.SparkException: Job aborted.
 at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:638)
 at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:279)
 at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:193)
 at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
 at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
 at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
 ....

With Generative AI Spark Troubleshooting: RCA and Recommendations

With Spark Troubleshooting, it provides the RCA and the line number of code in the script where the data shuffle operation was lazily evaluated by Spark. It also points to best practices guide for optimizing the shuffle or wide transforms or using S3 shuffle plugin on AWS Glue.

Debug AWS Glue for Spark jobs

To use this troubleshooting feature for your failed job runs, complete following:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Choose your job.
  3. On the Runs tab, choose your failed job run.
  4. Choose Troubleshoot with AI to start the analysis.
  5. You will be redirected to the Troubleshooting analysis tab with generated analysis.

You will see Root Cause Analysis and Recommendations sections.

The service analyzes your job’s debug artifacts and provide the results. Let’s look at a real example of how this works in practice.

We show below an end-to-end example where Spark Troubleshooting helps a user with identification of the root cause for a resource setup issue and help fix the job to resolve the error.

Considerations

During preview, the service focuses on common Spark errors like resource setup and access issues, out of memory exceptions on Spark driver and executors, out of disk exceptions on Spark executors, and will clearly indicate when an error type is not yet supported. Your jobs must run on AWS Glue version 4.0.

The preview is available at no additional charge in all AWS commercial Regions where AWS Glue is available. When you use this capability, any validation runs triggered by you to test proposed solutions will be charged according to the standard AWS Glue pricing.

Conclusion

This post demonstrated how generative AI troubleshooting for Spark in AWS Glue helps your day-to-day Spark application debugging. It simplifies the debugging process for your Spark applications by using generative AI to automatically identify the root cause of failures and provides actionable recommendations to resolve the issues.

To learn more about this new troubleshooting feature for Spark, please visit Troubleshooting Spark jobs with AI.

A special thanks to everyone who contributed to the launch of generative AI troubleshooting for Apache Spark in AWS Glue: Japson Jeyasekaran, Rahul Sharma, Mukul Prasad, Weijing Cai, Jeremy Samuel, Hirva Patel, Martin Ma, Layth Yassin, Kartik Panjabi, Maya Patwardhan, Anshi Shrivastava, Henry Caballero Corzo, Rohit Das, Peter Tsai, Daniel Greenberg, McCall Peltier, Takashi Onikura, Tomohiro Tanaka, Sotaro Hikita, Chiho Sugimoto, Yukiko Iwazumi, Gyan Radhakrishnan, Victor Pleikis, Sriram Ramarathnam, Matt Sampson, Brian Ross, Alexandra Tello, Andrew King, Joseph Barlan, Daiyan Alamgir, Ranu Shah, Adam Rohrscheib, Nitin Bahadur, Santosh Chandrachood, Matt Su, Kinshuk Pahare, and William Vambenepe.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Vishal Kajjam is a Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and using ML/AI for designing and building end-to-end solutions to address customers’ data integration needs. In his spare time, he enjoys spending time with family and friends.

Shubham Mehta is a Senior Product Manager at AWS Analytics. He leads generative AI feature development across services such as AWS Glue, Amazon EMR, and Amazon MWAA, using AI/ML to simplify and enhance the experience of data practitioners building data applications on AWS.

Wei Tang is a Software Development Engineer on the AWS Glue team. She is strong developer with deep interests in solving recurring customer problems with distributed systems and AI/ML.

XiaoRun Yu is a Software Development Engineer on the AWS Glue team. He is working on building new features for AWS Glue to help customers. Outside of work, Xiaorun enjoys exploring new places in the Bay Area.

Jake Zych is a Software Development Engineer on the AWS Glue team. He has deep interest in distributed systems and machine learning. In his spare time, Jake likes to create video content and play board games.

Savio Dsouza is a Software Development Manager on the AWS Glue team. His team works on distributed systems & new interfaces for data integration and efficiently managing data lakes on AWS.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team focuses on building distributed systems to enable customers with interactive and simple-to-use interfaces to efficiently manage and transform petabytes of data across data lakes on Amazon S3, and databases and data warehouses on the cloud.

Introducing generative AI upgrades for Apache Spark in AWS Glue (preview)

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-generative-ai-upgrades-for-apache-spark-in-aws-glue-preview/

Organizations run millions of Apache Spark applications each month on AWS, moving, processing, and preparing data for analytics and machine learning. As these applications age, keeping them secure and efficient becomes increasingly challenging. Data practitioners need to upgrade to the latest Spark releases to benefit from performance improvements, new features, bug fixes, and security enhancements. However, these upgrades are often complex, costly, and time-consuming.

Today, we are excited to announce the preview of generative AI upgrades for Spark, a new capability that enables data practitioners to quickly upgrade and modernize their Spark applications running on AWS. Starting with Spark jobs in AWS Glue, this feature allows you to upgrade from an older AWS Glue version to AWS Glue version 4.0. This new capability reduces the time data engineers spend on modernizing their Spark applications, allowing them to focus on building new data pipelines and getting valuable analytics faster.

Understanding the Spark upgrade challenge

The traditional process of upgrading Spark applications requires significant manual effort and expertise. Data practitioners must carefully review incremental Spark release notes to understand the intricacies and nuances of breaking changes, some of which may be undocumented. They then need to modify their Spark scripts and configurations, updating features, connectors, and library dependencies as needed.

Testing these upgrades involves running the application and addressing issues as they arise. Each test run may reveal new problems, resulting in multiple iterations of changes. After the upgraded application runs successfully, practitioners must validate the new output against the expected results in production. This process often turns into year-long projects that cost millions of dollars and consume tens of thousands of engineering hours.

How generative AI upgrades for Spark works

The Spark upgrades feature uses AI to automate both the identification and validation of required changes to your AWS Glue Spark applications. Let’s explore how these capabilities work together to simplify your upgrade process.

AI-driven upgrade plan generation

When you initiate an upgrade, the service analyzes your application using AI to identify necessary changes across both PySpark code and Spark configurations. During preview, Spark Upgrades supports upgrading from Glue 2.0 (Spark 2.4.3, Python 3.7) to Glue 4.0 (Spark 3.3.0, Python 3.10), automatically handling changes that would typically require extensive manual review of public Spark, Python and Glue version migration guides, followed by development, testing, and verification. Spark Upgrades addresses four key areas of changes:

  • Spark SQL API methods and functions
  • Spark DataFrame API methods and operations
  • Python language updates (including module deprecations and syntax changes)
  • Spark SQL and Core configuration settings

The complexity of these upgrades becomes evident when you consider migrating from Spark 2.4.3 to Spark 3.3.0 involves over a hundred version-specific changes. Several factors contribute to the challenges of performing manual upgrades:

  • Highly expressive language with a mix of imperative and declarative programming styles, allows users to easily develop Spark applications. However, this increases the complexity of identifying impacted code during upgrades.
  • Lazy execution of transformations in a distributed Spark application improves performance but makes runtime verification of application upgrades challenging for users.
  • Spark configurations changes in default values or the introduction of new configurations across versions can impact application behavior in different ways, making it difficult for users to identify issues during upgrades.

For example, in Spark 3.2, Spark SQL TRANSFORM operator can’t support alias in inputs. In Spark 3.1 and earlier, you could write a script transform like SELECT TRANSFORM(a AS c1, b AS c2) USING 'cat' FROM TBL.

# Original code (Glue 2.0)
query = """
SELECT TRANSFORM(item as product_name, price as product_price, number as product_number)
   USING 'cat'
FROM goods
WHERE goods.price > 5
"""
spark.sql(query)

# Updated code (Glue 4.0)
query = """
SELECT TRANSFORM(item, price, number)
   USING 'cat' AS (product_name, product_price, product_number)
FROM goods
WHERE goods.price > 5
"""
spark.sql(query)

In Spark 3.1, loading and saving timestamps before 1900-01-01 00:00:00Z as INT96 in Parquet files causes errors. In Spark 3.0, this wouldn’t fail but could result in timestamp shifts due to calendar rebasing. To restore the old behavior in Spark 3.1, you would need to configure the Spark SQL configurations for spark.sql.legacy.parquet.int96RebaseModeInRead and spark.sql.legacy.parquet.int96RebaseModeInWrite to LEGACY.

# Original code (Glue 2.0)
data = [(1, "1899-12-31 23:59:59"), (2, "1900-01-01 00:00:00")]
schema = StructType([ StructField("id", IntegerType(), True), StructField("timestamp", TimestampType(), True) ])
df = spark.createDataFrame(data, schema=schema)
df.write.mode("overwrite").parquet("path/to/parquet_file") 

# Updated code (Glue 4.0)
qspark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInRead", "LEGACY") 
spark.conf.set("spark.sql.legacy.parquet.int96RebaseModeInWrite", "LEGACY")

data = [(1, "1899-12-31 23:59:59"), (2, "1900-01-01 00:00:00")]
schema = StructType([ StructField("id", IntegerType(), True), StructField("timestamp", TimestampType(), True) ])
df = spark.createDataFrame(data, schema=schema)
df.write.mode("overwrite").parquet("path/to/parquet_file")

Automated validation in your environment

After identifying the necessary changes, Spark Upgrades validates the upgraded application by running it as an AWS Glue job in your AWS account. The service iterates through multiple validation runs, up to 10, reviewing any errors encountered in each iteration and refining the upgrade plan until it achieves a successful run. You can run a Spark Upgrade Analysis in your development account using mock datasets supplied through Glue job parameters used for validation runs.

After Spark Upgrades has successfully validated the changes, it presents an upgrade plan for you to review. You can then accept and apply the changes to your job in the development account, before replicating them to your job in the production account. The Spark Upgrade plan includes the following:

  • An upgrade summary with an explanation of code updates made during the process
  • The final script that you can use in place of your current script
  • Logs from validation runs showing how issues were identified and resolved

You can review all aspects of the upgrade, including intermediate validation attempts and any error resolutions, before deciding to apply the changes to your production job. This approach ensures you have full visibility into and control over the upgrade process while benefiting from AI-driven automation.

Get started with generative AI Spark upgrades

Let’s walk through the process of upgrading an AWS Glue 2.0 job to AWS Glue 4.0. Complete the following steps:

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Select your AWS Glue 2.0 job, and choose Run upgrade analysis with AI.
  3. For Result path, enter s3://aws-glue-assets-<account-id>-<region>/scripts/upgraded/ (provide your own account ID and AWS Region).
  4. Choose Run.
  5. On the Upgrade analysis tab, wait for the analysis to be completed.

    While an analysis is running, you can view the intermediate job analysis attempts (up to 10) for validation under the Runs tab. Additionally, the Upgraded summary in S3 documents the upgrades made by the Spark Upgrade service so far, refining the upgrade plan with each attempt. Each attempt will display a different failure reason, which the service tries to address in the subsequent attempt through code or configuration updates.
    After a successful analysis, the upgraded script and a summary of changes will be uploaded to Amazon Simple Storage Service (Amazon S3).
  6. Review the changes to make sure they meet your requirements, then choose Apply upgraded script.

Your job has now been successfully upgraded to AWS Glue version 4.0. You can check the Script tab to verify the updated script and the Job details tab to review the modified configuration.

Understanding the upgrade process through an example

We now show a production Glue 2.0 job that we would like to upgrade to Glue 4.0 using the Spark Upgrade feature. This Glue 2.0 job reads a dataset, updated daily in an S3 bucket under different partitions, containing new book reviews from an online marketplace and runs SparkSQL to gather insights into the user votes for the book reviews.

Original code (Glue 2.0) – before upgrade

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
from collections import Sequence
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import lit, to_timestamp, col

def is_data_type_sequence(coming_dict):
    return True if isinstance(coming_dict, Sequence) else False

def dataframe_to_dict_list(df):
    return [row.asDict() for row in df.collect()]

books_input_path = (
    "s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Books/"
)
view_name = "books_temp_view"
static_date = "2010-01-01"
books_source_df = (
    spark.read.option("header", "true")
    .option("recursiveFileLookup", "true")
    .option("path", books_input_path)
    .parquet(books_input_path)
)
books_source_df.createOrReplaceTempView(view_name)
books_with_new_review_dates_df = spark.sql(
    f"""
        SELECT 
        {view_name}.*,
            DATE_ADD(to_date(review_date), "180.8") AS next_review_date,
            CASE 
                WHEN DATE_ADD(to_date(review_date), "365") < to_date('{static_date}') THEN 'Yes' 
                ELSE 'No' 
            END AS Actionable
        FROM {view_name}
    """
)
books_with_new_review_dates_df.createOrReplaceTempView(view_name)
aggregate_books_by_marketplace_df = spark.sql(
    f"SELECT marketplace, count({view_name}.*) as total_count, avg(star_rating) as average_star_ratings, avg(helpful_votes) as average_helpful_votes, avg(total_votes) as average_total_votes  FROM {view_name} group by marketplace"
)
aggregate_books_by_marketplace_df.show()
data = dataframe_to_dict_list(aggregate_books_by_marketplace_df)
if is_data_type_sequence(data):
    print("data is valid")
else:
    raise ValueError("Data is invalid")

aggregated_target_books_df = aggregate_books_by_marketplace_df.withColumn(
    "average_total_votes_decimal", col("average_total_votes").cast(DecimalType(3, -2))
)
aggregated_target_books_df.show()

New code (Glue 4.0) – after upgrade

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from collections.abc import Sequence
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import lit, to_timestamp, col

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
spark.conf.set("spark.sql.adaptive.enabled", "false")
spark.conf.set("spark.sql.legacy.allowStarWithSingleTableIdentifierInCount", "true")
spark.conf.set("spark.sql.legacy.allowNegativeScaleOfDecimal", "true")
job = Job(glueContext)

def is_data_type_sequence(coming_dict):
    return True if isinstance(coming_dict, Sequence) else False

def dataframe_to_dict_list(df):
    return [row.asDict() for row in df.collect()]

books_input_path = (
    "s3://aws-bigdata-blog/generated_synthetic_reviews/data/product_category=Books/"
)
view_name = "books_temp_view"
static_date = "2010-01-01"
books_source_df = (
    spark.read.option("header", "true")
    .option("recursiveFileLookup", "true")
    .load(books_input_path)
)
books_source_df.createOrReplaceTempView(view_name)
books_with_new_review_dates_df = spark.sql(
    f"""
        SELECT 
        {view_name}.*,
            DATE_ADD(to_date(review_date), 180) AS next_review_date,
            CASE 
                WHEN DATE_ADD(to_date(review_date), 365) < to_date('{static_date}') THEN 'Yes' 
                ELSE 'No' 
            END AS Actionable
        FROM {view_name}
    """
)
books_with_new_review_dates_df.createOrReplaceTempView(view_name)
aggregate_books_by_marketplace_df = spark.sql(
    f"SELECT marketplace, count({view_name}.*) as total_count, avg(star_rating) as average_star_ratings, avg(helpful_votes) as average_helpful_votes, avg(total_votes) as average_total_votes  FROM {view_name} group by marketplace"
)
aggregate_books_by_marketplace_df.show()
data = dataframe_to_dict_list(aggregate_books_by_marketplace_df)
if is_data_type_sequence(data):
    print("data is valid")
else:
    raise ValueError("Data is invalid")

aggregated_target_books_df = aggregate_books_by_marketplace_df.withColumn(
    "average_total_votes_decimal", col("average_total_votes").cast(DecimalType(3, -2))
)
aggregated_target_books_df.show()

Upgrade summary

In Spark 3.2, spark.sql.adaptive.enabled is enabled by default. To restore the behavior before Spark 3.2, 
you can set spark.sql.adaptive.enabled to false.

No suitable migration rule was found in the provided context for this specific error. The change was made based on the error message, which indicated that Sequence could not be imported from collections module. In Python 3.10, Sequence has been moved to the collections.abc module.

In Spark 3.1, path option cannot coexist when the following methods are called with path parameter(s): DataFrameReader.load(), DataFrameWriter.save(), DataStreamReader.load(), or DataStreamWriter.start(). In addition, paths option cannot coexist for DataFrameReader.load(). For example, spark.read.format(csv).option(path, /tmp).load(/tmp2) or spark.read.option(path, /tmp).csv(/tmp2) will throw org.apache.spark.sql.AnalysisException. In Spark version 3.0 and below, path option is overwritten if one path parameter is passed to above methods; path option is added to the overall paths if multiple path parameters are passed to DataFrameReader.load(). To restore the behavior before Spark 3.1, you can set spark.sql.legacy.pathOptionBehavior.enabled to true.

In Spark 3.0, the `date_add` and `date_sub` functions accepts only int, smallint, tinyint as the 2nd argument; fractional and non-literal strings are not valid anymore, for example: `date_add(cast('1964-05-23' as date), '12.34')` causes `AnalysisException`. Note that, string literals are still allowed, but Spark will throw `AnalysisException` if the string content is not a valid integer. In Spark version 2.4 and below, if the 2nd argument is fractional or string value, it is coerced to int value, and the result is a date value of `1964-06-04`.

In Spark 3.2, the usage of count(tblName.*) is blocked to avoid producing ambiguous results. Because count(*) and count(tblName.*) will output differently if there is any null values. To restore the behavior before Spark 3.2, you can set spark.sql.legacy.allowStarWithSingleTableIdentifierInCount to true.

In Spark 3.0, negative scale of decimal is not allowed by default, for example, data type of literal like 1E10BD is DecimalType(11, 0). In Spark version 2.4 and below, it was DecimalType(2, -9). To restore the behavior before Spark 3.0, you can set spark.sql.legacy.allowNegativeScaleOfDecimal to true.

As seen in the updated Glue 4.0 (Spark 3.3.0) script diff compared to the Glue 2.0 (Spark 2.4.3) script and the resulting upgrade summary, a total of six different code and configuration updates were applied across the six attempts of the Spark Upgrade Analysis.

  • Attempt #1 included a Spark SQL configuration (spark.sql.adaptive.enabled) to restore the application behavior as a new feature for Spark SQL adaptive query execution is introduced starting Spark 3.2. Users can inspect this configuration change and can further enable or disable it as per their preference.
  • Attempt #2 resolved a Python language change between Python 3.7 and 3.10 with the introduction of a new abstract base class (abc) under the Python collections module for importing Sequence.
  • Attempt #3 resolved an error encountered due to a change in behavior of DataFrame API starting Spark 3.1 where path option cannot exist with other DataFrameReader operations.
  • Attempt #4 resolved an error caused by a change in the Spark SQL function API signature for DATE_ADD which now only accepts integers as the second argument starting from Spark 3.0.
  • Attempt #5 resolved an error encountered due to the change in behavior Spark SQL function API for count(tblName.*) starting Spark 3.2. The behavior was restored with the introduction of a new Spark SQL configuration spark.sql.legacy.allowStarWithSingleTableIdentifierInCount
  • Attempt #6 successfully completed the analysis and ran the new script on Glue 4.0 without any new errors. The final attempt resolved an error encountered due to the prohibited use of negative scale for cast(DecimalType(3, -6) in Spark DataFrame API starting Spark 3.0. The issue was addressed by enabling the new Spark SQL configuration spark.sql.legacy.allowNegativeScaleOfDecimal.

Important considerations for preview

As you begin using automated Spark upgrades during the preview period, there are several important aspects to consider for optimal usage of the service:

  • Service scope and limitations – The preview release focuses on PySpark code upgrades from AWS Glue versions 2.0 to version 4.0. At the time of writing, the service handles PySpark code that doesn’t rely on additional library dependencies. You can run automated upgrades for up to 10 jobs concurrently in an AWS account, allowing you to efficiently modernize multiple jobs while maintaining system stability.
  • Optimizing costs during the upgrade process – Because the service uses generative AI to validate the upgrade plan through multiple iterations, with each iteration running as an AWS Glue job in your account, it’s essential to optimize the validation job run configurations for cost-efficiency. To achieve this, we recommend specifying a run configuration when starting an upgrade analysis as follows:
    • Using non-production developer accounts and selecting sample mock datasets that represent your production data but are smaller in size for validation with Spark Upgrades.
    • Using right-sized compute resources, such as G.1X workers, and selecting an appropriate number of workers for processing your sample data.
    • Enabling Glue auto scaling when applicable to automatically adjust resources based on workload.

    For example, if your production job processes terabytes of data with 20 G.2X workers, you might configure the upgrade job to process a few gigabytes of representative data with 2 G.2X workers and auto scaling enabled for validation.

  • Preview best practices – During the preview period, we strongly recommend starting your upgrade journey with non-production jobs. This approach allows you to familiarize yourself with the upgrade workflow, and understand how the service handles different types of Spark code patterns.

Your experience and feedback are crucial in helping us enhance and improve this feature. We encourage you to share your insights, suggestions, and any challenges you encounter through AWS Support or your account team. This feedback will help us improve the service and add capabilities that matter most to you during preview.

Conclusion

This post demonstrates how automated Spark upgrades can assist with migrating your Spark applications in AWS Glue. It simplifies the migration process by using generative AI to automatically identify the necessary script changes across different Spark versions.

To learn more about this feature in AWS Glue, see Generative AI upgrades for Apache Spark in AWS Glue.

A special thanks to everyone who contributed to the launch of generative AI upgrades for Apache Spark in AWS Glue: Shuai Zhang, Mukul Prasad, Liyuan Lin, Rishabh Nair, Raghavendhar Thiruvoipadi Vidyasagar, Tina Shao, Chris Kha, Neha Poonia, Xiaoxi Liu, Japson Jeyasekaran, Suthan Phillips, Raja Jaya Chandra Mannem, Yu-Ting Su, Neil Jonkers, Boyko Radulov, Sujatha Rudra, Mohammad Sabeel, Mingmei Yang, Matt Su, Daniel Greenberg, Charlie Sim, McCall Petier, Adam Rohrscheib, Andrew King, Ranu Shah, Aleksei Ivanov, Bernie Wang, Karthik Seshadri, Sriram Ramarathnam, Asterios Katsifodimos, Brody Bowman, Sunny Konoplev, Bijay Bisht, Saroj Yadav, Carlos Orozco, Nitin Bahadur, Kinshuk Pahare, Santosh Chandrachood, and William Vambenepe.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his new road bike.

Keerthi Chadalavada is a Senior Software Development Engineer at AWS Glue, focusing on combining generative AI and data integration technologies to design and build comprehensive solutions for customers’ data and analytics needs.

Shubham Mehta is a Senior Product Manager at AWS Analytics. He leads generative AI feature development across services such as AWS Glue, Amazon EMR, and Amazon MWAA, using AI/ML to simplify and enhance the experience of data practitioners building data applications on AWS.

Pradeep Patel is a Software Development Manager on the AWS Glue team. He is passionate about helping customers solve their problems by using the power of the AWS Cloud to deliver highly scalable and robust solutions. In his spare time, he loves to hike and play with web applications.

Chuhan LiuChuhan Liu is a Software Engineer at AWS Glue. He is passionate about building scalable distributed systems for big data processing, analytics, and management. He is also keen on using generative AI technologies to provide brand-new experience to customers. In his spare time, he likes sports and enjoys playing tennis.

Vaibhav Naik is a software engineer at AWS Glue, passionate about building robust, scalable solutions to tackle complex customer problems. With a keen interest in generative AI, he likes to explore innovative ways to develop enterprise-level solutions that harness the power of cutting-edge AI technologies.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue and Amazon EMR team. His team focuses on building distributed systems to enable customers with simple-to-use interfaces and AI-driven capabilities to efficiently transform petabytes of data across data lakes on Amazon S3, and databases and data warehouses on the cloud.

Accelerate your data workflows with Amazon Redshift Data API persistent sessions

Post Syndicated from Dipal Mahajan original https://aws.amazon.com/blogs/big-data/accelerate-your-data-workflows-with-amazon-redshift-data-api-persistent-sessions/

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that you can use to analyze your data at scale. Tens of thousands of customers use Amazon Redshift to process exabytes of data to power their analytical workloads.The Amazon Redshift Data API simplifies programmatic access to Amazon Redshift data warehouses by providing a secure HTTP endpoint for executing SQL queries, so that you don’t have to deal with managing drivers, database connections, network configurations, authentication flows, and other connectivity complexities.

Amazon Redshift has launched a session reuse capability for the Data API that can significantly streamline multi-step, stateful workloads such as exchange, transform, and load (ETL) pipelines, reporting processes, and other flows that involve sequential queries. This persistent session model provides the following key benefits:

  1. The ability to create temporary tables that can be referenced across the entire session lifespan.
  2. Maintaining reusable database sessions to help optimize the use of database connections, preventing the API server from exhausting the available connections and improving overall system scalability.
  3. Reusing database sessions to simplify the connection management logic in your API implementation, reducing the complexity of the code and making it more straightforward to maintain and scale.
  4. Redshift Data API provides a secure HTTP endpoint and integration with AWS SDKs. You can use the endpoint to run SQL statements without managing connections. Calls to the Data API are asynchronous. The Data API uses either credentials stored in AWS Secrets Manager or temporary database credentials

A common use case that can particularly benefit from session reuse is ETL pipelines in Amazon Redshift data warehouses. ETL processes often need to stage raw data extracts into temporary tables, run a series of transformations while referencing those interim datasets, and finally load the transformed results into production data marts. Before session reuse was available, the multi-phase nature of ETL workflows meant that data engineers had to persist the intermediate results and repeatedly re-establish database connections after each step, which resulted in continually tearing down sessions; recreating, repopulating, and truncating temporary tables; and incurring overhead from connection cycling. The engineers could also reuse the entire API call, but this could lead to a single point of failure for the entire script because it doesn’t support restarting from the point where it failed.

With Data API session reuse, you can use a single long-lived session at the start of the ETL pipeline and use that persistent context across all ETL phases. You can create temporary tables once and reference them throughout, without having to constantly refresh database connections and restart from scratch.

In this post, we’ll walk through an example ETL process that uses session reuse to efficiently create, populate, and query temporary staging tables across the full data transformation workflow—all within the same persistent Amazon Redshift database session. You’ll learn best practices for optimizing ETL orchestration code, reducing job runtimes by reducing connection overhead, and simplifying pipeline complexity. Whether you’re a data engineer, an analyst generating reports, or working on any other stateful data, understanding how to use Data API session reuse is worth exploring. Let’s dive in!

Scenario

Imagine you’re building an ETL process to maintain a product dimension table for an ecommerce business. This table needs to track changes to product details over time for analysis purposes.

The ETL will:

  1. Load data extracted from the source system into a temporary table
  2. Identify new and updated products by comparing them to the existing dimension
  3. Merge the staged changes into the product dimension using a slowly changing dimension (SCD) Type 2 approach

Prerequisites

To walk through the example in this post, you need:

  • An AWS Account
  • An Amazon Redshift Serverless workgroup or provisioned cluster

Redshift Data API Commands

This command executes a Redshift Data API query to create a temporary table called stage_stores in Redshift.

 aws redshift-data execute-statement 
       --session-keep-alive-seconds 30 
       --sql "CREATE TEMP TABLE stage_stores (LIKE stores)" 
       --database dev 
       --workgroup-name blog_test

This command performs a COUNT(*) operation on the newly created table from the previous command, using the –session-id returned in the response of the first command.

 aws redshift-data execute-statement
    --sql "select count(*) from dev.stage_stores"
    --session-id 5a254dc6-4fc2-4203-87a8-551155432ee4
    --session-keep-alive-seconds 10

Solution walkthrough

  1. You will use AWS Step Functions to call the Data API because this is one of the more straightforward ways to create a codeless ETL. The first step is to load the extracted data into a temporary table.
    • Start by creating a temporary table based on the same columns as the final table using CREATE TEMP TABLE stage_stores (LIKE stores)”.
    • When using Redshift Serverless you must use WorkgroupName. If using Redshift Provisioned cluster, you should use ClusterIdentifier.

Temporary table creation

  1. In the next step, copy data from Amazon Simple Storage Service (Amazon S3) to the temporary table. Instead of re-establishing the session, reuse it.
    • Use SessionId and Sql as parameters.
    • Database is a required parameter for Step Functions, but it doesn’t have to have a value when using the SessionId.

Copy data to Redshift

  1. Lastly, use Merge to merge the target and temporary (source) tables to insert or update data based on the new data from the files.

Merge to Redshift

As shown in the preceding figures, we used a wait component because the query was fast enough for the session not to be captured. If the session isn’t captured, you will receive a Session is not available error. If you encounter that or a similar error, try adding a 1-second wait component.

At the end, the Data API use case should be completed, as shown in the following figure.

Step Function

Other relevant use cases

The Amazon Redshift Data API isn’t a replacement for JDBC and ODBC drivers and is suitable for use cases where you don’t need a persistent connection to a cluster. It’s applicable in the following use cases:

  • Accessing Amazon Redshift from custom applications with any programming language supported by the AWS SDK. This enables you to integrate web-based applications to access data from Amazon Redshift using an API to run SQL statements. For example, you can run SQL from JavaScript.
  • Building a serverless data processing workflow.
  • Designing asynchronous web dashboards because the Data API lets you run long-running queries without having to wait for it to complete.
  • Running your query one time and retrieving the results multiple times without having to run the query again within 24 hours.
  • Building your ETL pipelines with Step Functions, AWS Lambda, and stored procedures.
  • Having simplified access to Amazon Redshift from Amazon SageMaker and Jupyter Notebooks.
  • Building event-driven applications with Amazon EventBridgeand Lambda.
  • Scheduling SQL scripts to simplify data load, unload, and refresh of materialized views.

Key considerations for using session reuse

When you make a Data API request to run a SQL statement, if the parameter SessionKeepAliveSeconds isn’t set, the session where the SQL runs is terminated when the SQL is finished. To keep the session active for a specified number of seconds you must set SessionKeepAliveSeconds in the Data API ExecuteStatement and BatchExecuteStatement. A SessionId field will be present in the response JSON containing the identity of the session, which can then be used in subsequent ExecuteStatement and BatchExecuteStatement operations. In subsequent calls you can specify another SessionKeepAliveSeconds to change the idle timeout time. If the SessionKeepAliveSeconds isn’t changed, the initial idle timeout setting remains. Consider the following when using session reuse:

  • The maximum value of SessionKeepAliveSeconds is 24 hours. After 24 hours the session is forcibly closed, and in-progress queries are terminated.
  • The maximum number of sessions per Amazon Redshift cluster or Redshift Serverless workgroup is 500. Please refer to Redshift Quotas and Limits here.
  • It’s not possible to run parallel executions of the same session. You need to wait until the query is finished to run the next query in the same session. That is, you cannot run queries in parallel in a single session.
  • The Data API can’t queue queries for a given session.

Best practices

We recommend the following best practices when using the Data API:

  • Federate your IAM credentials to the database to connect with Amazon Redshift. Amazon Redshift allows users to get temporary database credentials with GetClusterCredentials. We recommend scoping the access to a specific cluster and database user if you’re granting your users temporary credentials. For more information, see Example policy for using GetClusterCredentials.
  • Use a custom policy to provide fine-grained access to the Data API in the production environment if you don’t want your users to use temporary credentials. You can use AWS Secrets Manager to manage your credentials in such use cases.
  • The maximum record size to be retrieved is 64 KB. More than that will raise an error.
  • Don’t retrieve a large amount of data from your client and use the UNLOAD command to export the query results to Amazon S3. You’re limited to retrieving no more than 100 MB of data using the Data API.
  • Query results are stored by 24 hours and discarded after that. If you need the same result after 24 hours, you will need to rerun the script to obtain the result.
  • Remember that the session will be available for the amount of time specified by the SessionKeepAliveSeconds parameter in the Redshift Data API call. The session will terminate after the specified duration.Based on your security requirements, configure this value according to your ETL and ensure sessions are properly closed by setting SessionKeepAliveSeconds to 1 second to terminate them.
  • When invoking Redshift API commands, all activities, including the user who executed the command and those who reused the session, are logged in CloudWatch. Additionally, you can configure alerts for monitoring.
  • If a Redshift session is terminated or closed and you attempt to access it via the API, you will receive an error message stating, “Session is not available.”

Conclusion

In this post, we introduced you to the newly launched Amazon Redshift Data API session reuse functionality. We also demonstrated how to use the Data API from the Amazon Redshift console query editor and Python using the AWS SDK. We also provided best practices for using the Data API.

To learn more, see Using the Amazon Redshift Data API or visit the Data API GitHub repository for code examples. For serverless, see Use the Amazon Redshift Data API to interact with Amazon Redshift Serverless.

—————————————————————————————————————————————————–

About the Author

Dipal Mahajan is a Lead Consultant with Amazon Web Services based out of India, where he guides global customers to build highly secure, scalable, reliable, and cost-efficient applications on the cloud. He brings extensive experience on Software Development, Architecture and Analytics from industries like finance, telecom, retail and healthcare.

Anusha Challa is a Senior Analytics Specialist Solutions Architect focused on Amazon Redshift. She has helped many customers build large-scale data warehouse solutions in the cloud and on premises. She is passionate about data analytics and data science.

Debu Panda is a Senior Manager, Product Management at AWS. He is an industry leader in analytics, application platform, and database technologies, and has more than 25 years of experience in the IT world.

Ricardo Serafim is a Senior Analytics Specialist Solutions Architect at AWS.

Accelerate your migration to Amazon OpenSearch Service with Reindexing-from-Snapshot

Post Syndicated from Hang Zuo original https://aws.amazon.com/blogs/big-data/accelerate-your-migration-to-amazon-opensearch-service-with-reindexing-from-snapshot/

It is appealing to migrate from self-managed OpenSearch and Elasticsearch clusters in legacy versions to Amazon OpenSearch Service to enjoy the ease of use, native integration with AWS services, and rich features from the open-source environment (OpenSearch is now part of Linux Foundation). However, the data migration process can be daunting, especially when downtime and data consistency are critical concerns for your production workload.

In this post, we will introduce a new mechanism called Reindexing-from-Snapshot (RFS), and explain how it can address your concerns and simplify migrating to OpenSearch.

Key concepts

To understand the value of RFS and how it works, let’s look at a few key concepts in OpenSearch (and the same in Elasticsearch):

  1. OpenSearch index: An OpenSearch index is a logical container that stores and manages a collection of related documents. OpenSearch indices are composed of multiple OpenSearch shards, and each OpenSearch shard contains a single Lucene index.
  2. Lucene index and shard: OpenSearch is built as a distributed system on top of Apache Lucene, an open-source high-performance text search engine library. An OpenSearch index can contain multiple OpenSearch shards, and each OpenSearch shard maps to a single Lucene index. Each Lucene index (and, therefore, each OpenSearch shard) represents a completely independent search and storage capability hosted on a single machine. OpenSearch combines many independent Lucene indices into a single higher-level system to extend the capability of Lucene beyond what a single machine can support. OpenSearch provides resilience by creating and managing replicas of the Lucene indices as well as managing the allocation of data across Lucene indices and combining search results across all Lucene indices.
  3. Snapshots: Snapshots are backups of an OpenSearch cluster’s indexes and state in an off-cluster storage location (snapshot repository) such as Amazon Simple Storage Service (Amazon S3). As a backup strategy, snapshots can be created automatically in OpenSearch, or users can create a snapshot manually for restoring it on to a different domain or for data migration.

For example, when a document is added to the OpenSearch index, the distributed system layer picks a specific shard to host the document, and the document is ingested into that shard’s Lucene index. Operations on that document are then routed to the same shard (though the shard might have replicas). Search operations are performed across the shards in OpenSearch index individually and then a combined result is returned. A snapshot can be created to backup the cluster’s indexes and state, including cluster settings, node information, index settings and shard allocation, so that the snapshot can be used for data migration.

Why RFS?

RFS can transfer data from OpenSearch and Elasticsearch clusters at high throughput without impacting the performance of the source cluster. This is achieved by using the shard-level codependency and snapshots:

  1. Minimized performance impact to source clusters: Instead of retrieving data directly from the source cluster, RFS can use a snapshot of the source cluster for data migration. Documents are parsed from the snapshot and then reindexed to the target cluster, so that performance impact to the source clusters is minimized during migration. This maintains a smooth transition and minimal performance impact to end users, especially for production workloads.
  2. High throughput: Because shards are separate entities, RFS can retrieve, parse, extract and reindex the documents from each shard in parallel, to achieve high data throughput.
  3. Multi-version upgrades: RFS supports migrating data across multiple major versions (for example, from Elasticsearch 6.8 to OpenSearch 2.x), which can be a significant challenge with other data migration approaches. This is because the data indexed into OpenSearch (and Lucene) is only backward compatible for one major version. By incorporating reindexing as the core mechanism of the migration process, RFS can migrate data across multiple versions in one hop and make sure the data is fully updated and readable in the target cluster’s version, so that you don’t need to worry about the hidden technical debt imposed by having previous-version Lucene files in the new OpenSearch cluster.

How RFS works

OpenSearch and Elasticsearch snapshots are a directory tree that contains both data and metadata. Each index has its own sub-directory, and each shard has its own sub-directory under the directory of its parent index. The raw data for a given shard is stored in its corresponding shard sub-directory as a collection of Lucene files, which OpenSearch and Elasticsearch lightly obfuscates. Metadata files exist in the snapshot to provide details about the snapshot as a whole, the source cluster’s global metadata and settings, each index in the snapshot, and each shard in the snapshot.

The following is an example for the structure of an Elasticsearch 7.10 snapshot, along with a breakdown of its contents:

/snapshot/root
├── index-0 <-------------------------------------------- [1]
├── index.latest
├── indices
│   ├── DG4Ys006RDGOkr3_8lfU7Q <------------------------- [2]
│   │   ├── 0 <------------------------------------------ [3]
│   │   │   ├── __iU-NaYifSrGoeo_12o_WaQ <--------------- [4]
│   │   │   ├── __mqHOLQUtToG23W5r2ZWaKA <--------------- [4]
│   │   │   ├── index-gvxJ-ifiRbGfhuZxmVj9Hg 
│   │   │   └── snap-eBHv508cS4aRon3VuqIzWg.dat <-------- [5]
│   │   └── meta-tDcs8Y0BelM_jrnfY7OE.dat <-------------- [6]
│   └── _iayRgRXQaaRNvtfVfRdvg
│       ├── 0
│       │   ├── __DNRvbH6tSxekhRUifs35CA
│       │   ├── __NRek2UuKTKSBOGczcwftng
│       │   ├── index-VvqHYPQaRcuz0T_vy_bMyw
│       │   └── snap-eBHv508cS4aRon3VuqIzWg.dat
│       └── meta-tTcs8Y0BelM_jrnfY7OE.dat
├── meta-eBHv508cS4aRon3VuqIzWg.dat <-------------------- [7]
└── snap-eBHv508cS4aRon3VuqIzWg.dat <-------------------- [8]

The structure includes the following elements:

  1. Repository metadata file: JSON encoded and contains a mapping between the snapshots within the repository and the OpenSearch or Elasticsearch indices and shards stored within it.
  2. Index directory: Contains the data and metadata for a specific OpenSearch or Elasticsearch index.
  3. Shard directory: Contains the data and metadata for a specific shard of an OpenSearch or Elasticsearch index
  4. Lucene Files: Lucene index files, lightly obfuscated by the snapshotting process. Large files from the source file system are split into multiple parts.
  5. Shard metadata file: SMILE encoded and contains details about all the Lucene files in the shard and a mapping between their in-snapshot representation and their original representation on the source machine they were pulled from (including the original file name and other details).
  6. Index metadata file: SMILE encoded and contains things such as the index aliases, settings, mappings, and number of shards.
  7. Global metadata file: SMILE encoded and contains things such as the legacy, index, and component templates.
  8. Snapshot metadata file: SMILE encoded and contains things such as whether the snapshot succeeded, the number of shards, how many shards succeeded, the OpenSearch or Elasticsearch version, and the indices in the snapshot.

RFS works by retrieving a local copy of a shard-level directory, unpacking its contents and de-obfuscating them, reading them as a Lucene index, and extracting the documents within. This is enabled because OpenSearch and Elasticsearch store the original format of documents added to an OpenSearch or Elasticsearch index in Lucene using the _source field; this feature is enabled by default and is what allows the standard _reindex REST API to work (among other things).

The user workflow for performing a document migration with RFS using the Migration Assistant is shown in the following figure:

The workflow is:

  1. The operator shells into the Migration Assistant console
  2. The operator uses the console command line interface (CLI) to initiate a snapshot on their source cluster. The source cluster stores the snapshot in an S3 Bucket.
  3. The operator starts the document migration with RFS using the console CLI. This creates a single RFS Worker, which is a Docker container running in AWS Fargate.
  4. Each RFS worker provisioned pulls down an un-migrated shard from the snapshot bucket and reindexes its documents against the target cluster. Once finished, it proceeds to the next shard until all shards are completed.
  5. The operator monitors the progress of the migration using the console CLI, which reports both the number of shards yet to be migrated and the number that have been completed. The operator can scale the RFS worker fleet up or down to increase or reduce the rate of indexing on the target cluster.
  6. After all shards have been migrated to the target cluster, the operator scales the RFS worker fleet down to zero.

As previously mentioned, the RFS workers operate at the shard-level, so that you can provision one RFS worker for every shard in the snapshot to achieve maximum throughput. If a RFS worker stops unexpectedly in the middle of migrating a shard, another RFS worker will restart its migration from the beginning. The original document identifiers are preserved in the migration process, so that the restarted migration will be able to over-write the failed attempt. RFS workers coordinate amongst themselves using metadata that they store in an index on the target cluster.

How RFS performs

To highlight the performance of RFS, let’s consider the following scenario: you have an Elasticsearch 7.10 source cluster containing 5 TiB (3.9 billion documents) and wants to migrate to OpenSearch 2.15. With RFS, you can perform this migration in approximately 35 minutes, spending approximately $10 in Amazon Elastic Container Service (Amazon ECS) usage to run the RFS workers during the migration.

To demonstrate this capability, we created an Elasticsearch 7.10 source cluster in Amazon OpenSearch Service, with 1,024 shards and 0 replicas. We used AWS Glue to bulk-load sample data into the source cluster with the AWS Public Blockchain Dataset, and repeated the bulk-load process until 5 TiB of data (3.9 billion documents) was stored. We created an OpenSearch 2.15 cluster as the target cluster in Amazon OpenSearch Service, with 15 r7gd.16xlarge data nodes and 3 m7g.large master nodes, and used Sigv4 for authentication. Using the Migration Assistant solution, we created a snapshot of the source cluster, stored it in S3, and performed a metadata migration so that the indices on the source were recreated on the target cluster with the same shard and replica counts. We then ran console backfill start and console backfill scale 200 to begin the RFS migration with 200 workers. RFS indexed data into the target cluster at 2,497 MiB per second. The migration was completed in approximately 35 minutes. We metered approximately $10 in ECS cost for running the RFS workers.

To better highlight the performance, the following figures show metrics from the OpenSearch target cluster during this process (presented below).

In the preceding figures, you can see the cyclical variation in the document index rate and target cluster resource utilization as the 200 RFS workers pick up shards, complete a shard, and then pick up a new shard. At peak RFS indexing, we see the target cluster nodes maxing their CPU and begin queuing writes. The queue is cleared as shards complete and more workers transition to the downloading state. In general, we find that RFS performance is limited by the ability of the target cluster to absorb the traffic it generates. You can tune the RFS worker fleet to match what your target cluster can reliably ingest.

Conclusion

This blog post is designed to be a starting point for teams seeking guidance on how to use Reindexing-from-Snapshot as a straightforward, high throughput, and low-cost solution for data migration from self-managed OpenSearch and Elasticsearch clusters to Amazon OpenSearch Service. RFS is now part of the Migration Assistant solution and available from the AWS Solution Library. To use RFS to migrate to Amazon OpenSearch Service, try the Migration Assistant solution. To experience OpenSearch, try the OpenSearch Playground. To use the managed implementation of OpenSearch in the AWS Cloud, see Getting started with Amazon OpenSearch Service.


About the authors

Hang (Arthur) Zuo is a Senior Product Manager with Amazon OpenSearch Service. Arthur leads the core experience in the next-gen OpenSearch UI and data migration to Amazon OpenSearch Service. Arthur is passionate about cloud technologies and building data products that help users and businesses gain actionable insights and achieve operational excellence.

Chris Helma is a Senior Engineer at Amazon Web Services based in Austin, Texas. He is currently developing tools and techniques to enable users to shift petabyte-scale data workloads into OpenSearch. He has extensive experience building highly-scalable technologies in diverse areas such as search, security analytics, cryptography, and developer productivity. He has functional domain expertise in distributed systems, AI/ML, cloud-native design, and optimizing DevOps workflows. In his free time, he loves to explore specialty coffee and run through the West Austin hills.

Andre Kurait is a Software Development Engineer II at Amazon Web Services, based in Austin, Texas. He is currently working on Migration Assistant for Amazon OpenSearch Service. Prior to joining Amazon OpenSearch, Andre worked within Amazon Health Services. In his free time, Andre enjoys traveling, cooking, and playing in his church sport leagues. Andre holds Bachelor of the Science degrees from the University of Kansas in Computer Science and Mathematics.

Prashant Agrawal is a Sr. Search Specialist Solutions Architect with Amazon OpenSearch Service. He works closely with customers to help them migrate their workloads to the cloud and helps existing customers fine-tune their clusters to achieve better performance and save on cost. Before joining AWS, he helped various customers use OpenSearch and Elasticsearch for their search and log analytics use cases. When not working, you can find him traveling and exploring new places. In short, he likes doing Eat → Travel → Repeat.

From data lakes to insights: dbt adapter for Amazon Athena now supported in dbt Cloud

Post Syndicated from Darshit Thakkar original https://aws.amazon.com/blogs/big-data/from-data-lakes-to-insights-dbt-adapter-for-amazon-athena-now-supported-in-dbt-cloud/

At AWS, we are committed to empowering organizations with tools that streamline data analytics and transformation processes. We are excited to announce that the dbt adapter for Amazon Athena is now officially supported in dbt Cloud. This integration enables data teams to efficiently transform and manage data using Athena with dbt Cloud’s robust features, enhancing the overall data workflow experience.

In this post, we discuss the advantages of dbt Cloud over dbt Core, common use cases, and how to get started with Amazon Athena using the dbt adapter.

The need for streamlined data transformations

As organizations increasingly adopt cloud-based data lakes and warehouses, the demand for efficient data transformation tools has grown. Athena plays a critical role in this ecosystem by providing a serverless, interactive query service that simplifies analyzing vast amounts of data stored in Amazon Simple Storage Service (Amazon S3) using standard SQL. This enables you to extract insights from your data without the complexity of managing infrastructure.

dbt has emerged as a leading framework, allowing data teams to transform and manage data pipelines effectively. With the dbt adapter for Athena adapter now supported in dbt Cloud, you can seamlessly integrate your AWS data architecture with dbt Cloud, taking advantage of the scalability and performance of Athena to simplify and scale your data workflows efficiently.

Benefits of the dbt adapter for Athena

We have collaborated with dbt Labs and the open source community on an adapter for dbt that enables dbt to interface directly with Athena. Previously, the dbt adapter for Athena was only compatible with dbt Core, requiring teams to manually manage configurations and execute transformations locally or through custom setups. Now, with support for dbt Cloud, you can access a managed, cloud-based environment that automates and enhances your data transformation workflows. This upgrade allows you to build, test, and deploy data models in dbt with greater ease and efficiency, using all the features that dbt Cloud provides.

The support of the dbt adapter for Athena in dbt Cloud offers several advantages over using it with dbt Core:

  • Managed infrastructure – dbt Cloud provides a fully managed environment for running dbt projects, eliminating the need for local setup, maintenance, and configuration. This saves time and effort, especially for teams looking to minimize infrastructure management and focus solely on data modeling.
  • Scheduling and automation – dbt Cloud comes with a job scheduler, allowing you to automate the execution of dbt models. This feature makes sure your datasets are always up to date without needing to set up and maintain external scheduling systems like Apache Airflow. You can also set up dependencies between jobs easily within dbt Cloud, making sure that transformations run in the correct sequence without manual oversight.
  • Enhanced collaboration and version control – You can use a web-based interface for editing and reviewing dbt models, enabling collaboration among data teams. You can review code changes directly on the platform, facilitating efficient teamwork. Additionally, dbt Cloud integrates with Git providers, making version control and code collaboration more streamlined. This makes sure your data models are well-documented, versioned, and straightforward to manage within a collaborative environment.
  • Monitoring and alerting – You get built-in tools for monitoring job executions and performance to set up alerts and notifications for job failures, providing quick response times and minimizing disruptions. Furthermore, you can gain insights into the performance of your data transformations with detailed execution logs and metrics, all accessible through the dbt Cloud interface.

Common use cases for using the dbt adapter with Athena

The following are common use cases for using the dbt adapter with Athena:

  • Building a data warehouse – Many organizations are moving towards a data warehouse architecture, combining the flexibility of data lakes with the performance and structure of data warehouses. Using Athena and the dbt adapter, you can transform raw data in Amazon S3 into well-structured tables suitable for analytics. This setup allows businesses to build a scalable and efficient data lakehouse where they can perform SQL-based transformations and make sure data is clean and ready for analytics without investing heavily in data warehouse infrastructure.
  • Incremental data processing – The adapter allows for incremental data processing, where only new or updated data is transformed and processed. This feature reduces the amount of data scanned by Athena, resulting in faster query performance and lower costs. For example, instead of processing an entire dataset daily, dbt can be configured to transform only the data ingested in the last 24 hours, making data operations more efficient and cost-effective.
  • Cost management and optimization – Because Athena charges based on the amount of data scanned by each query, cost optimization is critical. The adapter enables data teams to optimize transformations by creating efficient data models, such as partitioning and compressing data to minimize scan costs. Additionally, dbt’s automated scheduling in dbt Cloud can be used to manage the frequency of data transformations, making sure queries are run only when necessary, helping to control costs effectively.
  • Data archiving and tiered storage – Organizations with a large amount of historical data can use Athena to query archived data stored in the lower-cost storage classes of Amazon S3 (such as Amazon S3 Glacier). With the adapter, data teams can build models that segment and process data based on usage patterns, making sure frequently accessed data is optimized for quick queries while older data remains accessible but cost-efficient. Alternatively, you can use Amazon S3 Intelligent-Tiering to optimize storage costs by moving data between two access tiers when access patterns change. This approach helps in managing storage costs while maintaining the flexibility to analyze historical trends when needed.
  • Event-driven data transformations – In scenarios where organizations need to process data in near real time, such as for streaming event logs or Internet of Things (IoT) data, you can integrate the adapter into an event-driven architecture. For example, event data can be continuously loaded into Amazon S3, and dbt models can be configured to run incrementally, transforming the new data into structured formats for immediate analysis. This setup supports agile data processing while taking advantage of the serverless architecture of Athena to keep operational costs low.
  • Compliance and data governance – For organizations managing sensitive or regulated data, you can use Athena and the adapter to enforce data governance rules. With dbt, teams can define data quality checks and access controls as part of their transformation workflow. This makes sure that only compliant, high-quality data is made available for analytics, and costs are optimized by processing only the data that meets governance standards. Additionally, dbt’s documentation features help maintain a clear record of data transformations, supporting audit and compliance efforts.

How to use the dbt adapter for Athena

To get started, create a project and set up a connection with Athena in dbt Cloud. The following figure shows the steps to create a project using dbt Cloud and configure the Athena connection.

Next, use the dbt Cloud interactive development environment (IDE) to deploy your project. The following figure demonstrates how to build dbt runs and deploy changes to Athena using the dbt Cloud interface.

Conclusion

At AWS, we are committed to providing you with the best possible tools and services to help you succeed in the cloud. dbt has emerged as a leading data transformation platform, trusted by thousands of organizations worldwide. By partnering with dbt Labs, we are able to bring the power of dbt directly to the AWS Cloud, empowering you to seamlessly integrate your data transformation workflows into the broader cloud infrastructure. This partnership is a testament to our shared vision of making data more accessible, reliable, and valuable for organizations of all sizes.

We are excited to see how you will use the dbt Cloud compatible dbt adapter for Athena to drive your data-driven initiatives forward. The combination of dbt and Athena creates a powerful and efficient environment for transforming and analyzing data in a serverless architecture. This synergy allows you to take advantage of the strengths of both tools, making it straightforward to manage complex data pipelines, reduce costs, and scale your operations.


About the Authors

Darshit Thakkar is a Technical Product Manager with AWS and works with the Amazon Athena team.

Selman Ay is a Data Architect in the AWS Professional Services team.

BP Yau is a Sr Partner Solutions Architect at AWS helping customers architect big data solutions to process data at scale

Run high-availability long-running clusters with Amazon EMR instance fleets

Post Syndicated from Garima Arora original https://aws.amazon.com/blogs/big-data/run-high-availability-long-running-clusters-with-amazon-emr-instance-fleets/

AWS now supports high availability Amazon EMR on EC2 clusters with instance fleet configuration. With high availability instance fleet clusters, you now get the enhanced resiliency and fault tolerance of high availability architecture, along with the improved flexibility and intelligence in Amazon Elastic Compute Cloud (Amazon EC2) instance selection of instance fleets. Amazon EMR is a cloud big data platform for petabyte-scale data processing, interactive analysis, streaming, and machine learning (ML) using open source frameworks such as Apache Spark, Presto and Trino, and Apache Flink. Customers love the scalability and flexibility that Amazon EMR on EC2 offers. However, like most distributed systems running mission-critical workloads, high availability is a core requirement, especially for those with long-running workloads.

In this post, we demonstrate how to launch a high availability instance fleet cluster using the newly redesigned Amazon EMR console, as well as using an AWS CloudFormation template. We also go over the basic concepts of Hadoop high availability, EMR instance fleets, the benefits and trade-offs of high availability, and best practices for running resilient EMR clusters.

High availability in Hadoop

High availability (HA) provides continuous uptime and fault tolerance for a Hadoop cluster. The core components of Hadoop, like Hadoop Distributed File System (HDFS) NameNode and YARN ResourceManager, are single points of failure in clusters with a single primary node. In the event that any of them crash, the entire cluster goes down. High Availability removes this single point of failure by introducing redundant standby nodes that can quickly take over if the primary node fails.

In a high availability EMR cluster, one node serves as the active NameNode that handles client operations, and others act as standby NameNodes. The standby NameNodes constantly synchronize their state with the active one, enabling seamless failover to maintain service availability. To learn more, see Supported applications in an Amazon EMR Cluster with multiple primary nodes.

Key instance fleet differentiations

Amazon EMR recommends using the instance fleet configuration option for provisioning EC2 instances in EMR clusters because it offers a flexible and robust approach to cluster provisioning. Some key advantages include:

  • Flexible instance provisioning – Instance fleets provide a powerful and simple way to specify up to five EC2 instance types on the Amazon EMR console, or up to 30 when using the AWS Command Line Interface (AWS CLI) or API with an allocation strategy. This enhanced diversity helps optimize for cost and performance while increasing the likelihood of fulfilling capacity requirements.
  • Target capacity management – You can specify target capacities for On-Demand and Spot Instances for each fleet. Amazon EMR automatically manages the mix of instances to meet these targets, reducing operational overhead.
  • Improved availability – By spanning multiple instance types and purchasing options such as On-Demand and Spot, instance fleets are more resilient to capacity fluctuations in specific EC2 instance pools.
  • Enhanced Spot Instance handling – Instance fleets offer superior management of Spot Instances, including the ability to set timeouts and specify actions if Spot capacity can’t be provisioned.
  • Reliable cluster launches – You can configure your instance fleet to select multiple subnets for different Availability Zones, allowing Amazon EMR to find the best combination of instances and purchasing options across these zones to launch your cluster in. Amazon EMR will identify the best Availability Zone based on your configuration and available EC2 capacity and launch the cluster.

Prerequisites

Before you launch the high availability EMR instance fleet clusters, make sure you have the following:

  • Latest Amazon EMR release – We recommend that you use the latest Amazon EMR release to benefit from the highest level of resiliency and stability for your high availability clusters. High availability for instance fleets is supported with Amazon EMR releases 5.36.1, 6.8.1, 6.9.1, 6.10.1, 6.11.1, 6.12.0, and later.
  • Supported applications – High availability for instance fleets is supported for applications such as Apache Spark, Presto, Trino, and Apache Flink. Refer to Supported applications in an Amazon EMR Cluster with multiple primary nodes for the complete list of supported applications and their failover processes.

Launch a high availability instance fleet cluster using the Amazon EMR console

Complete the following steps on the Amazon EMR console to configure and launch a high availability EMR cluster with instance fleets:

  1. On the Amazon EMR console, create a new cluster.
  2. For Name, enter a name.
  3. For Amazon EMR release, choose the Amazon EMR release that supports high availability clusters with instance fleets. The setting will default to the latest available Amazon EMR release.

CreateHACluster-EMRRelease

  1. Under Cluster configuration, choose the desired instance types for the primary fleet. (You can select up to five when using the Amazon EMR console.)
  2. Select Use high availability to launch the cluster with three primary nodes.

CreateHACluster

  1. Choose the instance types and target On-Demand and Spot size for the core and task fleet according to your requirements.

InstanceFleet-CreateFleets

  1. Under Allocation strategy, select Apply allocation strategy.
    1. 1 We recommend that you select Price-capacity optimized for your allocation strategy for your cluster for faster cluster provisioning, more accurate Spot Instance allocation, and fewer Spot Instance interruptions.
  2. Under Networking, you can choose multiple subnets for different Availability Zones. This allows Amazon EMR to look across those subnets and launch the cluster in an Availability Zone that best suits your instance and purchasing option requirements.

allocationStrategy

  1. Review your cluster configuration and choose Create cluster.

Amazon EMR will launch your cluster in a few minutes. You can view the cluster details on the Amazon EMR console.
ClusterDetailPage

Launch a high availability cluster with AWS CloudFormation

To launch a high availability cluster using AWS CloudFormation, complete the following steps:

  1. Create a CloudFormation template with EMR resource type AWS::EMR::Cluster and JobFlowInstancesConfig property types MasterInstanceFleet, CoreInstanceFleet and (optional) TaskInstanceFleets. To launch a high availability cluster, configure TargetOnDemandCapacity=3, TargetSpotCapacity=0 for the primary instance fleet and weightedCapacity=1 for each instance type configured for the fleet. See the following code:
{
  "AWSTemplateFormatVersion": "2010-09-09",
  "Resources": {
    "cluster": {
      "Type": "AWS::EMR::Cluster",
      "Properties": {
        "Instances": {
          "Ec2SubnetIds": [
            "subnet-003c889b8379f42d1",
            "subnet-0382aadd4de4f5da9",
            "subnet-078fbbb77c92ab099"
          ],
          "MasterInstanceFleet": {
            "Name": "HAPrimaryFleet",
            "TargetOnDemandCapacity": 3,
            "TargetSpotCapacity": 0,
            "InstanceTypeConfigs": [
              {
                "InstanceType": "m5.xlarge",
                "WeightedCapacity": 1
              },
              {
                "InstanceType": "m5.2xlarge",
                "WeightedCapacity": 1
              },
              {
                "InstanceType": "m5.4xlarge",
                "WeightedCapacity": 1
              }
            ]
          },
          "CoreInstanceFleet": {
            "Name": "cfnCore",
            "InstanceTypeConfigs": [
              {
                "InstanceType": "m5.xlarge",
                "WeightedCapacity": 1
              },
              {
                "InstanceType": "m5.2xlarge",
                "WeightedCapacity": 2
              },
              {
                "InstanceType": "m5.4xlarge",
                "WeightedCapacity": 4
              }
            ],
            "LaunchSpecifications": {
              "SpotSpecification": {
                "TimeoutAction": "SWITCH_TO_ON_DEMAND",
                "TimeoutDurationMinutes": 20,
                "AllocationStrategy": "PRICE_CAPACITY_OPTIMIZED"
              }
            },
            "TargetOnDemandCapacity": "4",
            "TargetSpotCapacity": 0
          },
          "TaskInstanceFleets": [
            {
              "Name": "cfnTask",
              "InstanceTypeConfigs": [
                {
                  "InstanceType": "m5.xlarge",
                  "WeightedCapacity": 1
                },
                {
                  "InstanceType": "m5.2xlarge",
                  "WeightedCapacity": 2
                },
                {
                  "InstanceType": "m5.4xlarge",
                  "WeightedCapacity": 4
                }
              ],
              "LaunchSpecifications": {
                "SpotSpecification": {
                  "TimeoutAction": "SWITCH_TO_ON_DEMAND",
                  "TimeoutDurationMinutes": 20,
                  "AllocationStrategy": "PRICE_CAPACITY_OPTIMIZED"
                }
              },
              "TargetOnDemandCapacity": "0",
              "TargetSpotCapacity": 4
            }
          ]
        },
        "Name": "TestHACluster",
        "ServiceRole": "EMR_DefaultRole",
        "JobFlowRole": "EMR_EC2_DefaultRole",
        "ReleaseLabel": "emr-6.15.0",
        "PlacementGroupConfigs": [
          {
            "InstanceRole": "MASTER",
            "PlacementStrategy": "SPREAD"
          }
        ]
      }
    }
  }
}

Make sure to use an Amazon EMR release that supports high availability clusters with instance fleets.

  1. Create a CloudFormation stack with the preceding template:
aws cloudformation create-stack --stack-name HAInstanceFleetCluster --template-body file://cfn-template.json --region us-east-1
  1. Retrieve the cluster ID from the list-clusters response to use in the following steps. You can further filter this list based on filters like cluster status, creation date, and time.
aws emr list-clusters --query "Clusters[?Name=='<YourClusterName>']"
  1. Run the following describe-cluster command:
aws emr describe-cluster --cluster-id j-XXXXXXXXXXX --region us-east-1

If the high availability cluster was launched successfully, the describe-cluster response will return the state of the primary fleet as RUNNING and provisionedOnDemandCapacity as 3. By this point, all three primary nodes have been started successfully.

DescribeClusterResponse

Primary node failover with High Availability clusters

To fetch information on all EC2 instances for an instance fleet, use the list-instances command:

aws emr list-instances --cluster-id j-XXXXXXXXXXX --instance-fleet-type MASTER --region us-east-1

For high availability clusters, it will return three instances in RUNNING state for the primary fleet and other attributes like public and private DNS names.

PrimaryInstance-DescribeCluster

The following screenshot shows the instance fleet status on the Amazon EMR console.

Instancefleet status

Let’s examine two cases for primary node failover.

Case 1: One of the three primary instances is accidentally stopped

When an EC2 instance is accidentally stopped by a user, Amazon EMR detects this and performs a failover for the stopped primary node. Amazon EMR also attempts to launch a new primary node with the same private IP and DNS name to recover back the quorum. During this failover, the cluster remains fully operational, providing true resiliency to single primary node failures.

The following screenshots illustrate the instance fleet details.

InstanceFleetDetail-PrimaryInstanceTerminated

instanceFleerRecovery

This automatic recovery for primary nodes is also reflected in the MultiMasterInstanceGroupNodesRunning or MultiMasterInstanceGroupNodesRunningPercentage Amazon CloudWatch metric emitted by Amazon EMR for your cluster. The following screenshot shows an example of these metrics.

CloudwatchMetrics

Case 2: One of the three primary instances becomes unhealthy

If Amazon EMR continuously receives failures when trying to connect to a primary instance, it is deemed as unhealthy and Amazon EMR will attempt to replace it. Similar to case 1, Amazon EMR will perform a failover for the stopped primary node and also attempt to launch a new primary node with the same private IP and DNS name to recover the quorum.

UnhealthyPrimaryInstance
PrimaryInstanceFailover-2

If you list the instances for the primary fleet, the response will include information for the EC2 instance that was stopped by the user and the new primary instance that replaced it with the same private IP and DNS name.
DescribeClusterResponse-instanceFailover

The following screenshot shows an example of the CloudWatch metrics.

An instance can have connection failures for multiple reasons, including but not limited to disk space unavailable on the instance, critical cluster daemons like instance controller shut down with errors, high CPU utilization, and more. Amazon EMR is continuously improving its health monitoring criteria to better identify unhealthy nodes on an EMR cluster.

Considerations and best practices

The following are some of the key considerations and best practices for using EMR instance fleets to launch a high availability cluster with multiple primary nodes:

  • Use the latest EMR release – With the latest EMR releases, you get the highest level of resiliency and stability for your high availability EMR clusters with multiple primary nodes.
  • Configure subnets for high availability – Amazon EMR can’t replace a failed primary node if the subnet is oversubscribed (there aren’t any available private IP addresses in the subnet). This results in a cluster failure as soon as the second primary node fails. Limited availability of IP addresses in a subnet can also result in cluster launch or scaling failures. To avoid such scenarios, we recommend that you dedicate an entire subnet to an EMR cluster.
  • Configure core nodes for enhanced data availability – To minimize the risk of local HDFS data loss on your production clusters, we recommend that you set the dfs.replication parameter to 3 and launch at least four core nodes. Setting dfs.replication to 1 on clusters with fewer than four core nodes can lead to data loss if a single core node goes down. For clusters with three or fewer core nodes, set dfs.replication parameter to at least 2 to achieve sufficient HDFS data replication. For more information, see HDFS configuration.
  • Use an allocation strategy – We recommend enabling an allocation strategy option for your instance fleet cluster to provide faster cluster provisioning, more accurate Spot Instance allocation, and fewer Spot Instance interruptions.
  • Set alarms for monitoring primary nodes – You should monitor the health and status of primary nodes of your long-running clusters to maintain smooth operations. Configure alarms using CloudWatch metrics such as MultiMasterInstanceGroupNodesRunning, MultiMasterInstanceGroupNodesRunningPercentage, or MultiMasterInstanceGroupNodesRequested.
  • Integrate with EC2 placement groups – You can also choose to protect primary instances against hardware failures by using a placement group strategy for your primary fleet. This will spread the three primary instances across separate underlying hardware to avoid loss of multiple primary nodes at the same time in the event of a hardware failure. See Amazon EMR integration with EC2 placement groups for more details.

When setting up a high availability instance fleet cluster with Amazon EMR on EC2, it’s important to understand that all EMR nodes, including the three primary nodes, are launched within a single Availability Zone. Although this configuration maintains high availability within that Availability Zone, it also means that the entire cluster can’t tolerate an Availability Zone outage. To mitigate the risk of cluster failures due to Spot Instance reclamation, Amazon EMR launches the primary nodes using On-Demand instances, providing an additional layer of reliability for these critical components of the cluster.

Conclusion

This post demonstrated how you can use high availability with EMR on EC2 instance fleets to enhance the resiliency and reliability of your big data workloads. By using instance fleets with multiple primary nodes, EMR clusters can withstand failures and maintain uninterrupted operations, while providing enhanced instance diversity and better Spot capacity management within a single Availability Zone. You can quickly set up these high availability clusters using the Amazon EMR console or AWS CloudFormation, and monitor their health using CloudWatch metrics.

To learn more about the supported applications and their failover process, see Supported applications in an Amazon EMR Cluster with multiple primary nodes. To get started with this feature and launch a high availability EMR on EC2 cluster, refer to Plan and configure primary nodes.


About the Authors

Garima Arora is a Software Development Engineer for Amazon EMR at Amazon Web Services. She specializes in capacity optimization and helps build services that allow customers to run big data applications and petabyte-scale data analytics faster. When not hard at work, she enjoys reading fiction novels and watching anime.

Ravi Kumar is a Senior Product Manager Technical-ES (PMT) at Amazon Web Services, specialized in building exabyte-scale data infrastructure and analytics platforms. With a passion for building innovative tools, he helps customers unlock valuable insights from their structured and unstructured data. Ravi’s expertise lies in creating robust data foundations using open-source technologies and advanced cloud computing, that powers advanced artificial intelligence and machine learning use cases. A recognized thought leader in the field, he advances the data and AI ecosystem through pioneering solutions and collaborative industry initiatives. As a strong advocate for customer-centric solutions, Ravi constantly seeks ways to simplify complex data challenges and enhance user experiences. Outside of work, Ravi is an avid technology enthusiast who enjoys exploring emerging trends in data science, cloud computing, and machine learning.

Tarun Chanana is a Software Development Manager for Amazon EMR at Amazon Web Services.

Enhance data governance with enforced metadata rules in Amazon DataZone

Post Syndicated from Ramesh H Singh original https://aws.amazon.com/blogs/big-data/enhance-data-governance-with-enforced-metadata-rules-in-amazon-datazone/

We’re excited to announce a new feature in Amazon DataZone that offers enhanced metadata governance for your subscription approval process. With this update, domain owners can define and enforce metadata requirements for data consumers when they request access to data assets. By making it mandatory for data consumers to provide specific metadata, domain owners can achieve compliance, meet organizational standards, and support audit and reporting needs.

Many organizations require additional metadata from data consumers during the subscription request process to align with internal workflows and regulatory requirements. With enforced metadata rules, domain unit owners can establish consistent governance practices across all data subscriptions. For example, financial services organizations can mandate specific compliance-related metadata when data consumers request access to sensitive financial data. Similarly, healthcare providers can enforce metadata requirements to align with regulatory standards for patient data access. This feature simplifies the approval process by guiding data consumers through completing mandatory fields and enabling data owners to make informed decisions, ensuring data access requests meet organizational policies.

By streamlining metadata governance, Amazon DataZone empowers customers to meet compliance standards, maintain audit readiness, and simplify access workflows for enhanced efficiency and control. For example, one of our customers, Bristol Myers Squibb (BMS), leverages Amazon DataZone to address their specific data governance needs. Sitikantha Sarangi, Director of Data Engineering and ML Ops Platform at BMS, says:

“At BMS, our teams have been leveraging Amazon DataZone’s comprehensive data governance solution to catalog and enable secure data subscriptions across the organization within governed project environments. With the new custom metadata enforcement feature, we now can more easily navigate our data catalog. This capability allows us to set specific requirements for data consumers, such as providing a compliance certification link or detailing data usage intentions, ensuring that access requests for sensitive data are thoroughly reviewed and approved in alignment with our standards. This customization helps us more efficiently ensure we are appropriately utilizing data while facilitating efficient, secure data sharing across teams.” 

Key benefits

The feature benefits multiple stakeholders. Domain unit owners can ensure compliance by enforcing metadata requirements, granting access only after thorough reviews. Data consumers benefit from a streamlined subscription request process, guided by metadata requirements that reduce complexity. Data producers gain clarity with detailed subscription requests, enabling informed decisions aligned with required standards. Overall, the key benefits are:

  • Enhanced control for domain owners – Admins and domain unit owners can now enforce additional metadata requirements on subscription requests, making sure that data consumers supply essential information for thorough review and compliance checks
  • Custom workflow support – Organizations can build custom workflows for assets by capturing critical metadata from data consumers, such as AWS account IDs or project-specific identifiers, to fulfill access requests

In this post, we walk you through setting up and using metadata enforcement to create seamless, compliant data access workflows.

Solution overview

The solution in this post is composed of two parts. In the first part, we walk through the steps necessary to enforce metadata for subscription requests for managed assets. In the second part, we walk through the steps necessary to request subscriptions for custom assets.

Prerequisites

To follow this post, user should already have Amazon DataZone setup with respective projects to publish and consume the assets. The publisher of the Retail project must have published a shipments data asset in Amazon DataZone. The domain owner or admin must have created a metadata form required for the subscription request.

This feature also supports metadata enforcement for subscription requests of a data product. For instructions on how to set this up, refer to Amazon DataZone data products.

Solution walkthrough: Enhance data governance with enforced metadata rules for Managed Assets

To perform the solution in this post, follow the steps in the next sections.

Metadata enforcement for subscription requests

To enforce metadata for subscription requests, use the following steps.

Step 1: Domain owner configures metadata requirements

Domain unit owners can configure metadata enforcement in Amazon DataZone as follows:

  1. On the Amazon DataZone console, choose Domain to open your domain or domain unit settings.
  2. Choose dataplatform, as shown in the following screenshot.
  3. To add metadata forms for subscription requests, on the RULES tab, choose ADD, as shown in the following screenshot.
  4. Provide the name to the metadata form rule.
  5. Choose ADD ANOTHER METADATA FORM.
  6. Choose from a list of available metadata forms within the domain or domain unit. Search options make navigation straightforward.

You can select multiple forms for enforcement on subscription requests.

  1. Choose Add, as shown in the following screenshot.

Create metadata form rule as below:

  1. In the next screen, you can specify additional settings. You can apply metadata forms across all asset types or limit them to specific asset types. Additionally, choose whether the rule applies to a specific project or all projects within the domain. After the scope is defined as shown in the screenshot, choose ADD RULE.

    Note: Enable metadata enforcement across child domains, with optional permissions allowing child domains to override the parent domain’s enforced forms. This option is available while defining the scope, if the domain owner chooses All projects, as shown in the following screenshot.

Step 2: Data consumer submits subscription request

After metadata enforcement is configured, data consumers follow these steps to request access:

  1. To find and select an asset in the Amazon DataZone catalog, choose MARKETING and then sign in to the Amazon DataZone console as a data consumer. On the search bar, enter the shipments data asset, as shown in following screenshot.
  2. Choose SUBSCRIBE to open the subscription request modal, as shown in the following screenshot.
  3. Choose a project and provide a Reason for request, as shown in the following screenshot.
  4. Fill in the required metadata fields as specified by the domain unit. If mandatory fields are incomplete, they will be highlighted, and the submission will be disabled until resolved. After all the mandatory fields are entered, choose APPLY, as shown in the following screenshot.
  5. Choose Request to submit the subscription request, as shown in the following screenshot.

After submitting, an event is generated in Amazon EventBridge, which can be used in custom workflows outside of Amazon DataZone as needed.

Step 3: Data producer (owner) approves the subscription

After a data consumer submits a subscription request, they review the metadata. The data producer receives the subscription request with all metadata provided by the data consumer.

  1. Sign in to the Amazon DataZone console as a data producer. Choose RETAIL as the
  2. In the navigation pane, choose Incoming requests and find the subscription request. Choose View request, as shown in the following screenshot.
  3. Data producers can review the metadata, including document links and account IDs, to determine if the request meets compliance and workflow requirements before granting access, as shown in the following screenshot.
  4. Under Approval access, choose Full access to provide full access to data. For fine-grain access control, choose Approve with row or column filters. For this post, we choose Full access.
  5. Provide the Decision comment.
  6. Choose APPROVE, as shown in the following screenshot.

Step 4: Data consumer consumes the data

Now, data consumers follow these steps:

  1. After the subscription grants are approved and fulfilled, sign in to the Amazon DataZone console as data consumer from MARKETING project to query the subscribed data.
  2. Choose MARKETING On the Environments tab, choose Query data through Amazon Athena, as shown in the following screenshot.
  3. Query the subscribed data asset shipments in Amazon Athena, with below query and as shown in the screenshot.
    SELECT * from “env_mkt_datalake_sub_db”.“shipments” limit 10;

Solution walkthrough: Enhance data governance with enforced metadata rules for Custom Assets

Customers can manage access grants for unmanaged assets using Amazon DataZone. When a subscription to an asset in the business data catalog is approved by the data owner, Amazon DataZone publishes an event in Amazon EventBridge in the account along with all the necessary information in the payload that you can use to create the access grants between the source and the target. Using metadata enforcement for unmanaged assets, customers can provide all context in the single request.

STEP 1: Create a custom asset type

To create a custom asset type Metrics with an attached metadata form to describe the metric asset type, follow these steps:

Below is an example of a custom asset type – “Metrics” which has two fields 1/Dashboard Link and 2/Calculation

Step 2: Data producer creates a custom asset using the “Metrics” asset type

The data producer creates a Conversion Rate Metric with all metadata along with associated metadata forms by following these steps:

Below is “Conversion Rate Metric” asset created in DataZone. The highlighted boxes show that is an Unmanaged asset and of type “Metrics” that was created in the previous step.

Step 3: Domain owner configures metadata requirements

Domain unit owners can configure metadata enforcement in Amazon DataZone as follows:

  1. On the Amazon DataZone console, choose Domain to open your domain or domain unit settings.
  2. To add metadata forms for subscription requests, on the RULES tab, choose ADD, as shown in the following screenshot.
  3. To select metadata forms, provide the Name to the metadata form rule.
  4. Choose ADD METADATA FORM, as shown in the following screenshot.
  5. Remaining fields can be left as default. For this blog, please set it as shown in below
  6. In the Add metadata form pop-up, enter MetricsRequestForm, as shown in the following screenshot.

  7. Choose ADD Rule as shown above to create the rule for all metrics assets. Below is the screenshot of the rule once created.

Step 4: Admins sets up an EventBridge rule

To set up an EventBridge rule, follow these steps:

  1. Create an EventBridge rule to capture all new subscription requests. Please see the documentation Amazon DataZone events and notifications for details to setup.
  2. Create an AWS Lambda function as a target to action on the event. Please see documentation – Event bus targets in Amazon EventBridge to setup targets.

For this blog, set the below event pattern that triggers the lambda only for new Subscription requests.

{
  "source": ["aws.datazone"],
  "detail-type": ["Subscription Request Created"]
}

Step 5: Data consumer submits subscription request

After metadata enforcement is configured, data consumers follow these steps to request access:

  1. To locate the asset in the Amazon DataZone catalog, sign in to the Amazon DataZone console as a data consumer from the marketing Use the search bar to find the Conversion Rate Metric asset. Choose SUBSCRIBE, as shown in the following screenshot.
  2. Provide details, including the Metrics Request Form associated with the Metrics asset type.
  3. Choose REQUEST, as shown in the following screenshot.

You will receive notification confirming that your subscription request is submitted, as shown in the following screenshot.

For the request, EventBridge will capture the following request event and send it to the setup target:

{
    'version': '0',
    'id': '3fdf59a2-f95c-192f-0901-4025dc6e6a61',
    'detail-type': 'Subscription Request Created',
    'source': 'aws.datazone',
    'account': '1234567890', 
    'time': '2024-11-15T18:57:16Z', 
    'region': 'us-east-1', 
    'resources': [], 
    'detail': 
        {
            'version': '283',
            'internal': None,
            'metadata': 
                {'
                    id': 'cwaxxxlj', 
                    'version': '1',
                    'typeName': 'SubscriptionRequestEntityType',
                    'domain': 'dzd_xxxxxxxxx1z',
                    'user': 'd1xxxxx-eexxx-xxxx-axxxx-0xxxxxxxx8ce',
                    'awsAccountId': '1234567890', 
                    'owningProjectId': '555xxxxxxrmv', 
                    'clientToken': '3bxxxxxxxxxxc91bb76d6'
                }, 
            'data': 
                {
                    'autoApproved': False, 
                    'requesterId': 'd1xxxxx848ce',
                    'reviewerId': '54uxxxxxxd3',
                    'status': 'PENDING',
                    'subscribedListings': [{'id': '6ixxgev', 'item': {'assetListing': {'entityId': 'xxxxxxxxx7', 'entityType': 'Metrics'}}, 'ownerProjectId': '5xxxxxx3', 'version': '2'}], 
                    'subscribedPrincipals': [{'id': '555xxxxxxrmv', 'type': 'PROJECT'}]
                }
            }
}

The data steward and asset owner can get details for the request with the  GetSubscriptionRequestDetails API and view the asset details and form associated with the request:

{
    "id": "cwxxxlj",
    "createdBy": "d17xxxxxxx848ce",
    "domainId": "dzd_xxxxxxz",
    "status": "PENDING",
    "createdAt": "2024-11-15T20:26:01.014000+00:00",
    "updatedAt": "2024-11-15T20:26:01.014000+00:00",
    "requestReason": "Marketing Analytics use case",
    "subscribedPrincipals": [
        {
            "project": {
                "id": "bxxxxx23hj",
                "name": "Marketing"
            }
        }
    ],
    "subscribedListings": [
        {
            "id": "6xxxxxxx1ev",
            "revision": "2",
            "name": "Conversion Rate Metric",
            "description": "Conversion rate calculates the percentage of web visitors who complete a desired action, such as creating an account, placing an order or clicking a link",
            "item": {
                "assetListing": {
                    "entityId": "b8xxxxxd7",
                    "entityRevision": "7",
                    "entityType": "Metrics",
                    "forms": "{\n  \"DZ_Internal_Basic_Form\" : {\n    \"name\" : \"Conversion Rate Metric\",\n    \"description\" : \"Conversion rate calculates the percentage of web visitors who complete a desired action, such as creating an account, placing an order or clicking a link\"\n  },\n  \"amazonstatus\" : {\n    \"publishingPrecedence\" : \"PUBLISHED_INDIVIDUALLY\",\n    \"status\" : \"ACTIVE\"\n  },\n  \"AssetCommonDetailsForm\" : {\n    \"readMe\" : \"Conversion Rate is a key performance metric used in marketing, e-commerce, and digital analytics. It measures the percentage of users or visitors who take a desired action out of the total number of users or visitors. This desired action, known as a \\\"conversion,\\\" can vary depending on the specific goals of a business or campaign.\\n\\n\\nApplications:\\n\\n- E-commerce: Percentage of website visitors who make a purchase\\n- Marketing: Percentage of leads who become customers\\n- Digital Advertising: Percentage of ad viewers who click on an ad or complete a form\\n- Email Marketing: Percentage of email recipients who click a link or perform a desired action\\n\\n\\nImportance:\\n\\n- Measures effectiveness of marketing efforts and user experience\\n- Helps in understanding customer behavior and preferences\\n- Guides optimization efforts for websites, ads, and marketing campaigns\\n- Often used as a key metric for ROI (Return on Investment) calculations\"\n  },\n  \"MarketingMetrics\" : {\n    \"DashboardLink\" : \"www.anycompany.com/marketing/conversion_rate\",\n    \"Calculation\" : \"Conversion rate = Conversions / Total visitors x 100\"\n  },\n  \"amazonmetadata\" : {\n    \"entityVersion\" : \"7\",\n    \"createdAt\" : \"2024-11-15T16:43:15.325935428Z\",\n    \"typeNamespace\" : \"dzd_6xxxxxx1z\",\n    \"sourceCategory\" : \"asset\",\n    \"typeName\" : \"Metrics\",\n    \"entityId\" : \"byxxxxxdolk7\",\n    \"sourceEntityFormDetails\" : [ {\n      \"typeNamespace\" : \"dzd_xxxxx1z\",\n      \"typeVersion\" : \"15\",\n      \"formName\" : \"MarketingMetrics\",\n      \"typeName\" : \"MarketingMetrics\"\n    }, {\n      \"typeNamespace\" : \"amazon.datazone\",\n      \"typeVersion\" : \"10\",\n      \"formName\" : \"DZ_Internal_Basic_Form\",\n      \"typeName\" : \"NamedDataZoneBasicFormType\"\n    }, {\n      \"typeNamespace\" : \"amazon.datazone\",\n      \"typeVersion\" : \"6\",\n      \"formName\" : \"AssetCommonDetailsForm\",\n      \"typeName\" : \"AssetCommonDetailsFormType\"\n    }, {\n      \"typeNamespace\" : \"amazon.datazone.internal\",\n      \"typeVersion\" : \"1\",\n      \"formName\" : \"DZ_Internal_Rendering_Config_Form\",\n      \"typeName\" : \"RenderingConfigFormType\"\n    } ]\n  },\n  \"DZ_Internal_Rendering_Config_Form\" : {\n    \"metadataFormItems\" : [ {\n      \"formName\" : \"MarketingMetrics\",\n      \"collapse\" : false\n    }, {\n      \"formName\" : \"AssetCommonDetailsForm\",\n      \"collapse\" : false\n    } ]\n  }\n}",
                    "glossaryTerms": []
                }
            },
            "ownerProjectId": "54xxxxxd3",
            "ownerProjectName": "Custom-Metrics-Assets"
        }
    ],
    "metadataForms": [
        {
            "formName": "MetricsRequestForm",
            "typeName": "MetricsRequestForm",
            "typeRevision": "5",
            "content": "{\"BusinessUnit\": \"AWS\",\"ContactEmail\": \"[email protected]\",\"Team\": \"DataZone\"}"
        }
    ]
}

The data and asset owner can use these details to orchestrate an approval workflow using the Lambda function. After it has been validated, the asset owner or steward can then call the AcceptSubscriptionRequest API to grant access. The data consumer will be notified after access is approved. The following screenshot shows the notification that the subscription was approved.

Now that the subscription is approved, users can use the dashboard URL to access the metric.

Cleanup

To make sure no additional charges are incurred after testing, delete the Amazon DataZone domain. Refer to Delete Amazon DataZone domains for the process.

Conclusion

The new metadata enforcement rule for subscription requests in Amazon DataZone strengthens data governance by empowering domain unit owners to establish clear metadata requirements for data consumers, streamlining access requests and enhancing data governance. This feature enables organizations to align with the organization’s metadata standards, implement custom workflows, and provide a consistent, governed data access experience.

The feature is supported in all AWS Regions where Amazon DataZone is available at the time of this writing. To check which Regions are available, refer to AWS Services by Region. Check out the video below to learn more about how to set up metadata rules for subscription workflows. Get started with the technical documentation.


About the Authors

Ramesh H Singh is a Senior Product Manager Technical (External Services) at AWS in Seattle, Washington, currently with the Amazon DataZone team. He is passionate about building high-performance ML/AI and analytics products that enable enterprise customers to achieve their critical goals using cutting-edge technology. Connect with him on LinkedIn.

Pradeep Misra PicPradeep Misra is a Principal Analytics Solutions Architect at AWS. He works across Amazon to architect and design modern distributed analytics and AI/ML platform solutions. He is passionate about solving customer challenges using data, analytics, and AI/ML. Outside of work, Pradeep likes exploring new places, trying new cuisines, and playing board games with his family. He also likes doing science experiments, building LEGOs and watching anime with his daughters.

Lakshmi Nair is a Senior Analytics Specialist Solutions Architect at AWS. She specializes in designing advanced analytics systems across industries. She focuses on crafting cloud-based data platforms, enabling real-time streaming, big data processing, and robust data governance.

Santhosh Padmanabhan is a Software Development Manager at AWS, leading the Amazon DataZone engineering team. His team designs, builds, and operates services specializing in data, machine learning, and AI governance. With deep expertise in building distributed data systems at scale, Santhosh plays a key role in advancing AWS’s data governance capabilities.

Introducing Point in Time queries and SQL/PPL support in Amazon OpenSearch Serverless

Post Syndicated from Jagadish Kumar original https://aws.amazon.com/blogs/big-data/introducing-point-in-time-queries-and-sql-ppl-support-in-amazon-opensearch-serverless/

Today we announced support for three new features for Amazon OpenSearch Serverless: Point in Time (PIT) search, which enables you to maintain stable sorting for deep pagination in the presence of updates, and Piped Processing Language (PPL) and Structured Query Language (SQL), which give you new ways to query your data. Querying with SQL or PPL is useful if you’re already familiar with the language or want to integrate your domain with an application that uses them.

OpenSearch Serverless is a powerful and scalable search and analytics engine that enables you to store, search, and analyze large volumes of data while reducing the burden of manual infrastructure provisioning and scaling as you ingest, analyze, and visualize your time series and search data, simplifying data management and enabling you to derive actionable insights from data. The vector engine for OpenSearch Serverless also makes it easy for you to build modern machine learning (ML) augmented search experiences and generative artificial intelligence (generative AI) applications without needing to manage the underlying vector database infrastructure.

PIT search

Point in Time (PIT) search lets you run different queries against a dataset that’s fixed in time. Typically, when you run the same query on the same index at different points in time, you receive different results because documents are constantly indexed, updated, and deleted. With PIT, you can query against a state of your dataset for a point in time. Although OpenSearch still supports other ways of paginating results, PIT search provides superior capabilities and performance because it isn’t bound to a query and supports consistent pagination. When you create a PIT for a set of indexes, OpenSearch creates contexts to access data at that point in time and when you use a query with a PIT ID, it searches the contexts that are frozen in time to provide consistent results.

Using PIT involves the following high-level steps:

  1. Create a PIT.
  2. Run search queries with a PIT ID and use the search_after parameter for the next page of results.
  3. Close the PIT.

Create a PIT

When you create a PIT, OpenSearch Serverless provides a PIT ID, which you can use to run multiple queries on the frozen dataset. Even though the indexes continue to ingest data and modify or delete documents, the PIT references the data that hasn’t changed since the PIT creation.

Run a search query with the PIT ID

PIT search isn’t bound to a query, so you can run different queries on the same dataset, which is frozen in time.

When you run a query with a PIT ID, you can use the search_after parameter to retrieve the next page of results. This gives you control over the order of documents in the pages of results.

The following response contains the first 100 documents that match the query. To get the next set of documents, you can run the same query with the last document’s sort values as the search_after parameter, keeping the same sort and pit.id. You can use the optional keep_alive parameter to extend the PIT time.

Close the PIT

When your queries on the dataset are complete, you can delete the PIT using the DELETE operation. PITs automatically expire after the keep_alive duration.

Considerations and limitations

Keep in mind the following limitations when using this feature:

SQL and PPL support

OpenSearch Serverless provides a primary query interface called query DSL that you can use to search your data. Query DSL is a flexible language with a JSON interface. In addition to DSL, you can now extract insights out of OpenSearch Serverless using the familiar SQL query syntax.

You can use the SQL and PPL API, the /plugins/_sql and /plugins/_ppl endpoints respectively, to search the data. You can use aggregations, group by, and where clauses to investigate your data and read your data as JSON documents or CSV tables, so you have the flexibility to use the format that works best for you. By default, queries return data in JDBC format. You can specify the response format as JDBC, standard OpenSearch JSON, CSV, or raw.

Use the /plugins/_sql endpoint to send SQL queries to the SQL plugin, as shown in the following example.

Besides basic filtering and aggregation, OpenSearch SQL also supports complex queries, such as querying semi-structured data, set operations, sub-queries and limited JOINs. Beyond the standard functions, OpenSearch functions are provided for better analytics and visualization.

For PPL queries, use the /plugins/_ppl endpoint to send queries to the SQL plugin.

Considerations and limitations

Keep in mind the following:

  • Query Workbench is not supported for SQL and PPL queries
  • The SQL and PPL CLI is supported and can be used to issue SQL and PPL queries
  • DELETE statements are not supported
  • SQL plugin data sources are not supported
  • The SQL query stats API is not supported

Summary

In this post, we discussed new features in OpenSearch Serverless. PIT is a useful feature when you need to maintain a consistent view of your data for pagination during search operations. SQL in OpenSearch Service bridges the gap between traditional relational database concepts and the flexibility of OpenSearch’s document-oriented data storage. You can send SQL and PPL queries to the _sql and _ppl endpoints, respectively, and use aggregations, group by, and where clauses to analyze their data.

For more information, refer to :


About the Authors

Jagadish Kumar (Jag) is a Senior Specialist Solutions Architect at AWS focused on Amazon OpenSearch Service. He is deeply passionate about Data Architecture and helps customers build analytics solutions at scale on AWS.

Frank Dattalo is a Software Engineer with Amazon OpenSearch Service. He focuses on the search and plugin experience in Amazon OpenSearch Serverless. He has an extensive background in search, data ingestion, and AI/ML. In his free time, he likes to explore Seattle’s coffee landscape.

Milav Shah is an Engineering Leader with Amazon OpenSearch Service. He focuses on the search experience for OpenSearch customers. He has extensive experience building highly scalable solutions in databases, real-time streaming, and distributed computing. He also possesses functional domain expertise in verticals like Internet of Things, fraud protection, gaming, and ML/AI. In his free time, he likes to ride his bicycle, hike, and play chess.

Integrate custom applications with AWS Lake Formation – Part 1

Post Syndicated from Stefano Sandona original https://aws.amazon.com/blogs/big-data/integrate-custom-applications-with-aws-lake-formation-part-1/

AWS Lake Formation makes it straightforward to centrally govern, secure, and globally share data for analytics and machine learning (ML).

With Lake Formation, you can centralize data security and governance using the AWS Glue Data Catalog, letting you manage metadata and data permissions in one place with familiar database-style features. It also delivers fine-grained data access control, so you can make sure users have access to the right data down to the row and column level.

Lake Formation also makes it straightforward to share data internally across your organization and externally, which lets you create a data mesh or meet other data sharing needs with no data movement.

Additionally, because Lake Formation tracks data interactions by role and user, it provides comprehensive data access auditing to verify the right data was accessed by the right users at the right time.

In this two-part series, we show how to integrate custom applications or data processing engines with Lake Formation using the third-party services integration feature.

In this post, we dive deep into the required Lake Formation and AWS Glue APIs. We walk through the steps to enforce Lake Formation policies within custom data applications. As an example, we present a sample Lake Formation integrated application implemented using AWS Lambda.

The second part of the series introduces a sample web application built with AWS Amplify. This web application showcases how to use the custom data processing engine implemented in the first post.

By the end of this series, you will have a comprehensive understanding of how to extend the capabilities of Lake Formation by building and integrating your own custom data processing components.

Integrate an external application

The process of integrating a third-party application with Lake Formation is described in detail in How Lake Formation application integration works.

In this section, we dive deeper into the steps required to establish trust between Lake Formation and an external application, the API operations that are involved, and the AWS Identity and Access Management (IAM) permissions that must be set up to enable the integration.

Lake Formation application integration external data filtering

In Lake Formation, it’s possible to control which third-party engines or applications are allowed to read and filter data in Amazon Simple Storage Service (Amazon S3) locations registered with Lake Formation.

To do so, you can navigate to the Application integration settings page on the Lake Formation console and enable Allow external engines to filter data in Amazon S3 locations registered with Lake Formation, specifying the AWS account IDs from where third-party engines are allowed to access locations registered with Lake Formation. In addition, you have to specify the allowed session tag values to identify trusted requests. We discuss in later sections how these tags are used.

LakeFormation Application integration

Lake Formation application integration involved AWS APIs

The following is a list of the main AWS APIs needed to integrate an application with Lake Formation:

  • sts:AssumeRole – Returns a set of temporary security credentials that you can use to access AWS resources.
  • glue:GetUnfilteredTableMetadata – Allows a third-party analytical engine to retrieve unfiltered table metadata from the Data Catalog.
  • glue:GetUnfilteredPartitionsMetadata – Retrieves partition metadata from the Data Catalog that contains unfiltered metadata.
  • lakeformation:GetTemporaryGlueTableCredentials – Allows a caller in a secure environment to assume a role with permission to access Amazon S3. To vend such credentials, Lake Formation assumes the role associated with a registered location, for example an S3 bucket, with a scope down policy that restricts the access to a single prefix.
  • lakeformation:GetTemporaryGluePartitionCredentials – This API is identical to GetTemporaryTableCredentials except that it’s used when the target Data Catalog resource is of type Partition. Lake Formation restricts the permission of the vended credentials with the same scope down policy that restricts access to a single Amazon S3 prefix.

Later in this post, we present a sample architecture illustrating how you can use these APIs.

External application and IAM roles to access data

For an external application to access resources in an Lake Formation environment, it needs to run under an IAM principal (user or role) with the appropriate credentials. Let’s consider a scenario where the external application runs under the IAM role MyApplicationRole that is part of the AWS account 123456789012.

In Lake Formation, you have granted access to various tables and databases to two specific IAM roles:

  • AccessRole1
  • AccessRole2

To enable MyApplicationRole to access the resources that have been granted to AccessRole1 and AccessRole2, you need to configure the trust relationships for these access roles. Specifically, you need to configure the following:

  • Allow MyApplicationRole to assume each of the access roles (AccessRole1 and AccessRole2) using the sts:AssumeRole
  • Allow MyApplicationRole to tag the assumed session with a specific tag, which is required by Lake Formation. The tag key should be LakeFormationAuthorizedCaller, and the value should match one of the session tag values specified in the Application integration settings page on the Lake Formation console (for example, “application1“).

The following code is an example of the trust relationships configuration for an access role (AccessRole1 or AccessRole2):

[
    {
        "Effect": "Allow",
        "Principal": {
            "AWS": "arn:aws:iam::123456789012:role/MyApplicationRole"
        },
        "Action": "sts:AssumeRole"
    },
    {
        "Effect": "Allow",
        "Principal": {
            "AWS": "arn:aws:iam::123456789012:role/MyApplicationRole"
        },
        "Action": "sts:TagSession",
        "Condition": {
            "StringEquals": {
                "aws:RequestTag/LakeFormationAuthorizedCaller": "application1"
            }
        }
    }
]

Additionally, the data access IAM roles (AccessRole1 and AccessRole2) must have the following IAM permissions assigned in order to read Lake Formation protected tables:

{
    "Version": "2012-10-17",
    "Statement": {
        "Sid": "LakeFormationManagedAccess",
        "Effect": "Allow",
        "Action": [
            "lakeformation:GetDataAccess",
            "glue:GetTable",
            "glue:GetTables",
            "glue:GetDatabase",
            "glue:GetDatabases",
            "glue:GetPartition",
            "glue:GetPartitions"
        ],
        "Resource": "*"
    }
}

Solution overview

For our solution, Lambda serves as our external trusted engine and application integrated with Lake Formation. This example is provided in order to understand and see in action the access flow and the Lake Formation API responses. Because it’s based on a single Lambda function, it’s not meant to be used in production settings or with high volumes of data.

Moreover, the Lambda based engine has been configured to support a limited set of data files (CSV, Parquet, and JSON), a limited set of table configurations (no nested data), and a limited set of table operations (SELECT only). Due to these limitations, the application should not be used for arbitrary tests.

In this post, we provide instructions on how to deploy a sample API application integrated with Lake Formation that implements the solution architecture. The core of the API is implemented with a Python Lambda function. We also show how to test the function with Lambda tests. In the second post in this series, we provide instructions on how to deploy a web frontend application that integrates with this Lambda function.

Access flow for unpartitioned tables

The following diagram summarizes the access flow when accessing unpartitioned tables.

Solution Architecture - Unpartitioned tables

The workflow consists of the following steps:

  1. User A (authenticated with Amazon Cognito or other equivalent systems) sends a request to the application API endpoint, requesting access to a specific table inside a specific database.
  2. The API endpoint, created with AWS AppSync, handles the request, invoking a Lambda function.
  3. The function checks which IAM data access role the user is mapped to. For simplicity, the example uses a static hardcoded mapping (mappings={ "user1": "lf-app-access-role-1", "user2": "lf-app-access-role-2"}).
  4. The function invokes the sts:AssumeRole API to assume the user-related IAM data access role (lf-app-access-role-1AccessRole1). The AssumeRole operation is performed with the tag LakeFormationAuthorizedCaller, having as its value one of the session tag values specified when configuring the application integration settings in Lake Formation (for example, {'Key': 'LakeFormationAuthorizedCaller','Value': 'application1'}). The API returns a set of temporary credentials, which we refer to as StsCredentials1.
  5. Using StsCredentials1, the function invokes the glue:GetUnfilteredTableMetadata API, passing the requested database and table name. The API returns information like table location, a list of authorized columns, and data filters, if defined.
  6. Using StsCredentials1, the function invokes the lakeformation:GetTemporaryGlueTableCredentials API, passing the requested database and table name, the type of requested access (SELECT), and CELL_FILTER_PERMISSION as the supported permission types (because the Lambda function implements logic to apply row-level filters). The API returns a set of temporary Amazon S3 credentials, which we refer to as S3Credentials1.
  7. Using S3Credentials1, the function lists the S3 files stored in the table location S3 prefix and downloads them.
  8. The retrieved Amazon S3 data is filtered to remove those columns and rows that the user is not allowed access to (authorized columns and row filters were retrieved in Step 5) and authorized data is returned to the user.

Access flow for partitioned tables

The following diagram summarizes the access flow when accessing partitioned tables.

Solution Architecture - Partitioned tables

The steps involved are almost identical to the ones presented for partitioned tables, with the following changes:

  • After invoking the glue:GetUnfilteredTableMetadata API (Step 5) and identifying the table as partitioned, the Lambda function invokes the glue:GetUnfilteredPartitionsMetadata API using StsCredentials1 (Step 6). The API returns, in addition to other information, the list of partition values and locations.
  • For each partition, the function performs the following actions:
    • Invokes the lakeformation:GetTemporaryGluePartitionCredentials API (Step 7), passing the requested database and table name, the partition value, the type of requested access (SELECT), and CELL_FILTER_PERMISSION as the supported permissions type (because the Lambda function implements logic to apply row-level filters). The API returns a set of temporary Amazon S3 credentials, which we refer to as S3CredentialsPartitionX.
    • Uses S3CredentialsPartitionX to list the partition location S3 files and download them (Step 8).
  • The function appends the retrieved data.
  • Before the Lambda function returns the results to the user (Step 9), the retrieved Amazon S3 data is filtered to remove those columns and rows that the user is not allowed access to (authorized columns and row filters were retrieved in Step 5).

Prerequisites

The following prerequisites are needed to deploy and test the solution:

  • Lake Formation should be enabled in the AWS Region where the sample application will be deployed
  • The steps must be run with an IAM principal with sufficient permissions to create the needed resources, including Lake Formation databases and tables

Deploy solution resources with AWS CloudFormation

We create the solution resources using AWS CloudFormation. The provided CloudFormation template creates the following resources:

  • One S3 bucket to store table data (lf-app-data-<account-id>)
  • Two IAM roles, which will be mapped to client users and their associated Lake Formation permission policies (lf-app-access-role-1 and lf-app-access-role-2)
  • Two IAM roles used for the two created Lambda functions (lf-app-lambda-datalake-population-role and lf-app-lambda-role)
  • One AWS Glue database (lf-app-entities) with two AWS Glue tables, one unpartitioned (users_tbl) and one partitioned (users_partitioned_tbl)
  • One Lambda function used to populate the data lake data (lf-app-lambda-datalake-population)
  • One Lambda function used for the Lake Formation integrated application (lf-app-lambda-engine)
  • One IAM role used by Lake Formation to access the table data and perform credentials vending (lf-app-datalake-location-role)
  • One Lake Formation data lake location (s3://lf-app-data-<account-id>/datasets) associated with the IAM role created for credentials vending (lf-app-datalake-location-role)
  • One Lake Formation data filter (lf-app-filter-1)
  • One Lake Formation tag (key: sensitive, values: true or false)
  • Tag associations to tag the created unpartitioned AWS Glue table (users_tbl) columns with the created tag

To launch the stack and provision your resources, complete the following steps:

  1. Download the code zip bundle for the Lambda function used for the Lake Formation integrated application (lf-integrated-app.zip).
  2. Download the code zip bundle for the Lambda function used to populate the data lake data (datalake-population-function.zip).
  3. Upload the zip bundles to an existing S3 bucket location (for example, s3://mybucket/myfolder1/myfolder2/lf-integrated-app.zip and s3://mybucket/myfolder1/myfolder2/datalake-population-function.zip)
  4. Choose Launch Stack.

This automatically launches AWS CloudFormation in your AWS account with a template. Make sure that you create the stack in your intended Region.

  1. Choose Next to move to the Specify stack details section
  2. For Parameters, provide the following parameters:
    1. For powertoolsLogLevel, specify how verbose the Lambda function logger should be, from the most verbose to the least verbose (no logs). For this post, we choose DEBUG.
    2. For s3DeploymentBucketName, enter the name of the S3 bucket containing the Lambda functions’ code zip bundles. For this post, we use mybucket.
    3. For s3KeyLambdaDataPopulationCode, enter the Amazon S3 location containing the code zip bundle for the Lambda function used to populate the data lake data (datalake-population-function.zip). For example, myfolder1/myfolder2/datalake-population-function.zip.
    4. For s3KeyLambdaEngineCode, enter the Amazon S3 location containing the code zip bundle for the Lambda function used for the Lake Formation integrated application (lf-integrated-app.zip). For example, myfolder1/myfolder2/lf-integrated-app.zip.
  3. Choose Next.

Cloudformation Create Stack with properties

  1. Add additional AWS tags if required.
  2. Choose Next.
  3. Acknowledge the final requirements.
  4. Choose Create stack.

Enable the Lake Formation application integration

Complete the following steps to enable the Lake Formation application integration:

  1. On the Lake Formation console, choose Application integration settings in the navigation pane.
  2. Enable Allow external engines to filter data in Amazon S3 locations registered with Lake Formation.
  3. For Session tag values, choose application1.
  4. For AWS account IDs, enter the current AWS account ID.
  5. Choose Save.

LakeFormation Application integration

Enforce Lake Formation permissions

The CloudFormation stack created one database named lf-app-entities with two tables named users_tbl and users_partitioned_tbl.

To be sure you’re using Lake Formation permissions, you should confirm that you don’t have any grants set up on those tables for the principal IAMAllowedPrincipals. The IAMAllowedPrincipals group includes any IAM users and roles that are allowed access to your Data Catalog resources by your IAM policies, and it’s used to maintain backward compatibility with AWS Glue.

To confirm Lake Formations permissions are enforced, navigate to the Lake Formation console and choose Data lake permissions in the navigation pane. Filter permissions by Database=lf-app-entities and remove all the permissions given to the principal IAMAllowedPrincipals.

For more details on IAMAllowedPrincipals and backward compatibility with AWS Glue, refer to Changing the default security settings for your data lake.

Check the created Lake Formation resources and permissions

The CloudFormation stack created two IAM roles—lf-app-access-role-1 and lf-app-access-role-2—and assigned them different permissions on the users_tbl (unpartitioned) and users_partitioned_tbl (partitioned) tables. The specific Lake Formation grants are summarized in the following table.

IAM Roles
lf-app-entities (Database)
  users _tbl (Table) _tbl _partitioned_tbl (Table)
lf-app-access-role-1 No access Read access on columns uid, state, and city for all the records. Read access to all columns except for address only on rows with value state=united kingdom.
lf-app-access-role-2 Read access on columns with the tag sensitive = false Read access to all columns and rows.

To better understand the full permissions setup, you should review the CloudFormation created Lake Formation resources and permissions. On the Lake Formation console, complete the following steps:

  1. Review the data filters:
    1. Choose Data filters in the navigation pane.
    2. Inspect the lf-app-filter-1
  2. Review the tags:
    1. Choose LF-Tags and permissions in the navigation pane.
    2. Inspect the sensitive
  3. Review the tag associations:
    1. Choose Tables in the navigation pane.
    2. Choose the users_tbl
    3. Inspect the LF-Tags associated to the different columns in the Schema
  4. Review the Lake Formation permissions:
    1. Choose Data lake permissions in the navigation pane.
    2. Filter by Principal = lf-app-access-role-1 and inspect the assigned permissions.
    3. Filter by Principal = lf-app-access-role-2 and inspect the assigned permissions.

Test the Lambda function

The Lambda function created by the CloudFormation template accepts JSON objects as input events. The JSON events have the following structure:

 {
  "identity": {
    "username": "XXX"
  },
  "fieldName": "YYY",
  "arguments": {
    "AA": "BB",
    ...
  }
}

Although the identity field is always needed in order to identify the called identity, depending on the requested operation (fieldName), different arguments should be provided. The following table lists these arguments.

Operation Description Needed Arguments Output
getDbs List databases No arguments needed List of databases the user has access to
getTablesByDb List tables db: <db_name> List of tables inside a database the user has access to
getUnfilteredTableMetadata Return the table metadata

db: <db_name>

table: <table_name>

Returns the output of the glue:GetUnfilteredTableMetadata API
getUnfilteredPartitionsMetadata Return the table partitions metadata

db: <db_name>

table: <table_name>

Returns the output of the glue:GetUnfilteredPartitionsMetadata API
getTableData Get table data

db: <db_name>

table: <table_name>

noOfRecs: N (number of records to pull)

nonNullRowsOnly: true/false (true to filter out records with all null values)

location: Table location

authorizedData: records of the table the user has access to

allColumns: All the columns of the table (returned only for demonstration and comparison purposes)

allData: All the records of the table without any filtering (returned only for demonstration and comparison purposes)

cellFilters: Lake Formation filters (applied to allData to return authorizedData)

authorizedColumns: Columns to which the user has access to (projection applied to allData to return authorizedData)

To test the Lambda function, you can create some sample Lambda test events. Complete the following steps:

  1. On the Lambda console, choose Functions on the navigation pane.
  2. Choose the lf-app-lambda-engine
  3. On the Test tab, select Create new event.
  4. For Event JSON, enter a valid JSON (we provide some sample JSON events).
  5. Choose Test.

Creata Lambda Test

  1. Check the test results (JSON response).

Lambda Test Result

The following are some sample test events you can try to see how different identities can access different sets of information.

user1 user2
{ 
  "identity": {
    "username": "user1"
  },
  "fieldName": "getDbs"
}

{ 
  "identity": {
    "username": "user2"
  },
  "fieldName": "getDbs"
}

{
  "identity": {
    "username": "user1"
  },
  "fieldName": "getTablesByDb",
  "arguments": {
    "db": "lf-app-entities"
  }
}

{
  "identity": {
    "username": "user2"
  },
  "fieldName": "getTablesByDb",
  "arguments": {
    "db": "lf-app-entities"
  }
}

{
  "identity": {
    "username": "user1"
  },
  "fieldName": "getUnfilteredTableMetadata",
  "arguments": {
    "db": "lf-app-entities",
    "table": "users_tbl" 
  }
}

{
  "identity": {
    "username": "user2"
  },
  "fieldName": "getUnfilteredTableMetadata",
  "arguments": {
    "db": "lf-app-entities",
    "table": "users_tbl" 
  }
}

{
  "identity": {
    "username": "user1"
  },
  "fieldName": "getUnfilteredTableMetadata",
  "arguments": {
    "db": "lf-app-entities",
    "table": "users_partitioned_tbl" 
  }
}

{
  "identity": {
    "username": "user2"
  },
  "fieldName": "getUnfilteredTableMetadata",
  "arguments": {
    "db": "lf-app-entities",
    "table": "users_partitioned_tbl" 
  }
}

{
  "identity": {
    "username": "user1"
  },
  "fieldName": "getUnfilteredPartitionsMetadata",
  "arguments": {
    "db": "lf-app-entities",
    "table": "users_tbl" 
  }
}

{
  "identity": {
    "username": "user2"
  },
  "fieldName": "getUnfilteredPartitionsMetadata",
  "arguments": {
    "db": "lf-app-entities",
    "table": "users_tbl" 
  }
}

{
  "identity": {
    "username": "user1"
  },
  "fieldName": "getUnfilteredPartitionsMetadata",
  "arguments": {
    "db": "lf-app-entities",
    "table": "users_partitioned_tbl" 
  }
}

{
  "identity": {
    "username": "user2"
  },
  "fieldName": "getUnfilteredPartitionsMetadata",
  "arguments": {
    "db": "lf-app-entities",
    "table": "users_partitioned_tbl" 
  }
}

{
  "identity": {
    "username": "user1"
  },
  "fieldName": "getTableData",
  "arguments": {
    "db": "lf-app-entities",
    "table": "users_tbl",
    "noOfRecs": 10,
    "nonNullRowsOnly": true
  }
}

{
  "identity": {
    "username": "user2"
  },
  "fieldName": "getTableData",
  "arguments": {
    "db": "lf-app-entities",
    "table": "users_tbl",
    "noOfRecs": 10,
    "nonNullRowsOnly": true
  }
}

{
  "identity": {
    "username": "user1"
  },
  "fieldName": "getTableData",
  "arguments": {
    "db": "lf-app-entities",
    "table": "users_partitioned_tbl",
    "noOfRecs": 10,
    "nonNullRowsOnly": true
  }
}

{
  "identity": {
    "username": "user2"
  },
  "fieldName": "getTableData",
  "arguments": {
    "db": "lf-app-entities",
    "table": "users_partitioned_tbl",
    "noOfRecs": 10,
    "nonNullRowsOnly": true
  }
}

As an example, in the following test, we request users_partitioned_tbl table data in the context of user1:

{
  "identity": {
    "username": "user1"
  },
  "fieldName": "getTableData",
  "arguments": {
    "db": "lf-app-entities",
    "table": "users_partitioned_tbl",
    "noOfRecs": 10,
    "nonNullRowsOnly": true
  }
}

The following is the related API response:

{
  "database": "lf-app-entities",
  "name": "users_partitioned_tbl",
  "location": "s3://lf-app-data-123456789012/datasets/lf-app-entities/users_partitioned/",
  "authorizedColumns": [
    {
      "Name": "born_year",
      "Type": "string"
    },
    {
      "Name": "city",
      "Type": "string"
    },
    {
      "Name": "name",
      "Type": "string"
    },
    {
      "Name": "state",
      "Type": "string"
    },
    {
      "Name": "surname",
      "Type": "string"
    },
    {
      "Name": "uid",
      "Type": "int"
    }
  ],
  "authorizedData": [
    [
      "1980",
      "bristol",
      "emily",
      "united kingdom",
      "brown",
      4
    ],
    [
      "1980",
      "vancouver",
      "<FILTEREDCELL>",
      "canada",
      "<FILTEREDCELL>",
      5
    ],
    [
      "1980",
      "madrid",
      "<FILTEREDCELL>",
      "spain",
      "<FILTEREDCELL>",
      6
    ],
    [
      "1980",
      "mexico city",
      "<FILTEREDCELL>",
      "mexico",
      "<FILTEREDCELL>",
      10
    ],
    [
      "1980",
      "zurich",
      "<FILTEREDCELL>",
      "switzerland",
      "<FILTEREDCELL>",
      11
    ],
    [
      "1980",
      "buenos aires",
      "<FILTEREDCELL>",
      "argentina",
      "<FILTEREDCELL>",
      12
    ],
    [
      "1990",
      "london",
      "john",
      "united kingdom",
      "pike",
      1
    ],
    [
      "1990",
      "milan",
      "<FILTEREDCELL>",
      "italy",
      "<FILTEREDCELL>",
      2
    ],
    [
      "1990",
      "berlin",
      "<FILTEREDCELL>",
      "germany",
      "<FILTEREDCELL>",
      3
    ],
    [
      "1990",
      "munich",
      "<FILTEREDCELL>",
      "germany",
      "<FILTEREDCELL>",
      7
    ]
  ],
  "allColumns": [
    {
      "Name": "address",
      "Type": "string"
    },
    {
      "Name": "born_year",
      "Type": "string"
    },
    {
      "Name": "city",
      "Type": "string"
    },
    {
      "Name": "name",
      "Type": "string"
    },
    {
      "Name": "state",
      "Type": "string"
    },
    {
      "Name": "surname",
      "Type": "string"
    },
    {
      "Name": "uid",
      "Type": "int"
    }
  ],
  "allData": [
    [
      "beautiful avenue 123",
      "1980",
      "bristol",
      "emily",
      "united kingdom",
      "brown",
      4
    ],
    [
      "lake street 45",
      "1980",
      "vancouver",
      "david",
      "canada",
      "lee",
      5
    ],
    [
      "plaza principal 6",
      "1980",
      "madrid",
      "sophia",
      "spain",
      "luz",
      6
    ],
    [
      "avenida de arboles 40",
      "1980",
      "mexico city",
      "olivia",
      "mexico",
      "garcia",
      10
    ],
    [
      "pflanzenstrasse 34",
      "1980",
      "zurich",
      "lucas",
      "switzerland",
      "fischer",
      11
    ],
    [
      "avenida de luces 456",
      "1980",
      "buenos aires",
      "isabella",
      "argentina",
      "afortunado",
      12
    ],
    [
      "hidden road 78",
      "1990",
      "london",
      "john",
      "united kingdom",
      "pike",
      1
    ],
    [
      "via degli alberi 56A",
      "1990",
      "milan",
      "mario",
      "italy",
      "rossi",
      2
    ],
    [
      "green road 90",
      "1990",
      "berlin",
      "july",
      "germany",
      "finn",
      3
    ],
    [
      "parkstrasse 789",
      "1990",
      "munich",
      "oliver",
      "germany",
      "schmidt",
      7
    ]
  ],
  "filteredCellPh": "<FILTEREDCELL>",
  "cellFilters": [
    {
      "ColumnName": "born_year",
      "RowFilterExpression": "TRUE"
    },
    {
      "ColumnName": "city",
      "RowFilterExpression": "TRUE"
    },
    {
      "ColumnName": "name",
      "RowFilterExpression": "state='united kingdom'"
    },
    {
      "ColumnName": "state",
      "RowFilterExpression": "TRUE"
    },
    {
      "ColumnName": "surname",
      "RowFilterExpression": "state='united kingdom'"
    },
    {
      "ColumnName": "uid",
      "RowFilterExpression": "TRUE"
    }
  ]
}

To troubleshoot the Lambda function, you can navigate to the Monitoring tab, choose View CloudWatch logs, and inspect the latest log stream.

Clean up

If you plan to explore Part 2 of this series, you can skip this part, because you will need the resources created here. You can refer to this section at the end of your testing.

Complete the following steps to remove the resources you created following this post and avoid incurring additional costs:

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Choose the stack you created and choose Delete.

Additional considerations

In the proposed architecture, Lake Formation permissions were granted to specific IAM data access roles that requesting users (for example, the identity field) were mapped to. Another possibility is to assign permissions in Lake Formation to SAML users and groups and then work with the AssumeDecoratedRoleWithSAML API.

Conclusion

In the first part of this series, we explored how to integrate custom applications and data processing engines with Lake Formation. We delved into the required configuration, APIs, and steps to enforce Lake Formation policies within custom data applications. As an example, we presented a sample Lake Formation integrated application built on Lambda.

The information provided in this post can serve as a foundation for developing your own custom applications or data processing engines that need to operate on an Lake Formation protected data lake.

Refer to the second part of this series to see how to build a sample web application that uses the Lambda based Lake Formation application.


About the Authors

Stefano Sandona Picture Stefano Sandonà is a Senior Big Data Specialist Solution Architect at AWS. Passionate about data, distributed systems, and security, he helps customers worldwide architect high-performance, efficient, and secure data platforms.

Francesco Marelli PictureFrancesco Marelli is a Principal Solutions Architect at AWS. He specializes in the design, implementation, and optimization of large-scale data platforms. Francesco leads the AWS Solution Architect (SA) analytics team in Italy. He loves sharing his professional knowledge and is a frequent speaker at AWS events. Francesco is also passionate about music.

Integrate custom applications with AWS Lake Formation – Part 2

Post Syndicated from Stefano Sandona original https://aws.amazon.com/blogs/big-data/integrate-custom-applications-with-aws-lake-formation-part-2/

In the first part of this series, we demonstrated how to implement an engine that uses the capabilities of AWS Lake Formation to integrate third-party applications. This engine was built using an AWS Lambda Python function.

In this post, we explore how to deploy a fully functional web client application, built with JavaScript/React through AWS Amplify (Gen 1), that uses the same Lambda function as the backend. The provisioned web application provides a user-friendly and intuitive way to view the Lake Formation policies that have been enforced.

For the purposes of this post, we use a local machine based on MacOS and Visual Studio Code as our integrated development environment (IDE), but you could use your preferred development environment and IDE.

Solution overview

AWS AppSync creates serverless GraphQL and pub/sub APIs that simplify application development through a single endpoint to securely query, update, or publish data.

GraphQL is a data language to enable client apps to fetch, change, and subscribe to data from servers. In a GraphQL query, the client specifies how the data is to be structured when it’s returned by the server. This makes it possible for the client to query only for the data it needs, in the format that it needs it in.

Amplify streamlines full-stack app development. With its libraries, CLI, and services, you can connect your frontend to the cloud for authentication, storage, APIs, and more. Amplify provides libraries for popular web and mobile frameworks, like JavaScript, Flutter, Swift, and React.

Prerequisites

The web application that we deploy depends on the Lambda function that was deployed in the first post of this series. Make sure the function is already deployed and working in your account.

Install and configure the AWS CLI

The AWS Command Line Interface (AWS CLI) is an open source tool that enables you to interact with AWS services using commands in your command line shell. To install and configure the AWS CLI, see Getting started with the AWS CLI.

Install and configure the Amplify CLI

To install and configure the Amplify CLI, see Set up Amplify CLI. Your development machine must have the following installed:

  • Node.js v14.x or later
  • npm v6.14.4 or later
  • git v2.14.1 or later

Create the application

We create a JavaScript application using the React framework.

  1. In the terminal, enter the following command:
npm create vite@latest
  1. Enter a name for your project (we use lfappblog), choose React for the framework, and choose JavaScript for the variant.

You can now run the next steps, ignore any warning messages. Don’t run the npm run dev command yet.

  1. Enter the following command:
cd lfappblog && npm install

You should now see the directory structure shown in the following screenshot.

  1. You can now test the newly created application by running the following command:
npm run dev

By default, the application is available on port 5173 on your local machine.

The base application is shown in the workspace browser.

You can close the browser window and then the test web server by entering the following in the terminal: q + enter

Set up and configure Amplify for the application

To set up Amplify for the application, complete the following steps:

  1. Run the following command in the application directory to initialize Amplify:
amplify init
  1. Refer to the following screenshot for all the options required. Make sure to change the value of Distribution Directory Path to dist. The command creates and runs the required AWS CloudFormation template to create the backend environment in your AWS account.

amplify init command and output - animated

amplify init command and output

  1. Install the node modules required by the application with the following command:
npm install aws-amplify \
@aws-amplify/ui-react \
ace-builds \
file-loader \
@cloudscape-design/components @cloudscape-design/global-styles

npm install for required packages command and output

The output of this command will vary depending on the packages already installed on your development machine.

Add Amplify authentication

Amplify can implement authentication with Amazon Cognito user pools. You run this step before adding the function and the Amplify API capabilities so that the user pool created can be set as the authentication mechanism for the API, otherwise it would default to the API key and further modifications would be required.

Run the following command and accept all the defaults:

amplify add auth

amplify add auth command and output - animated

amplify add auth command and output

Add the Amplify API

The application backend is based on a GraphQL API with resolvers implemented as a Python Lambda function. The API feature of Amplify can create the required resources for GraphQL APIs based on AWS AppSync (default) or REST APIs based on Amazon API Gateway.

  1. Run the following command to add and initialize the GraphQL API:
amplify add api
  1. Make sure to set Blank Schema as the schema template (a full schema is provided as part of this post; further instructions are provided in the next sections).
  2. Make sure to select Authorization modes and then Amazon Cognito User Pool.

amplify add api command and output - animated

amplify add api command and output

Add Amplify hosting

Amplify can host applications using either the Amplify console or Amazon CloudFront and Amazon Simple Storage Service (Amazon S3) with the option to have manual or continuous deployment. For simplicity, we use the Hosting with Amplify Console and Manual Deployment options.

Run the following command:

amplify add hosting

amplify add hosting command and output - animated

amplify add hosting command and output

Copy and configure the GraphQL API schema

You’re now ready to copy and configure the GraphQL schema file and update it with the current Lambda function name.

Run the following commands:

export PROJ_NAME=lfappblog
aws s3 cp s3://aws-blogs-artifacts-public/BDB-3934/schema.graphql \
~/${PROJ_NAME}/amplify/backend/api/${PROJ_NAME}/schema.graphql

In the schema.graphql file, you can see that the lf-app-lambda-engine function is set as the data source for the GraphQL queries.

schema.graphql file content

Copy and configure the AWS AppSync resolver template

AWS AppSync uses templates to preprocess the request payload from the client before it’s sent to the backend and postprocess the response payload from the backend before it’s sent to the client. The application requires a modified template to correctly process custom backend error messages.

Run the following commands:

export PROJ_NAME=lfappblog
aws s3 cp s3://aws-blogs-artifacts-public/BDB-3934/InvokeLfAppLambdaEngineLambdaDataSource.res.vtl \
~/${PROJ_NAME}/amplify/backend/api/${PROJ_NAME}/resolvers/

In the InvokeLfAppLambdaEngineLambdaDataSource.res.vtl file, you can inspect the .vtl resolver definition.

InvokeLfAppLambdaEngineLambdaDataSource.res.vtl file content

Copy the application client code

As last step, copy the application client code:

export PROJ_NAME=lfappblog
aws s3 cp s3://aws-blogs-artifacts-public/BDB-3934/App.jsx \
~/${PROJ_NAME}/src/App.jsx

You can now open App.jsx to inspect it.

Publish the full application

From the project directory, run the following command to verify all resources are ready to be created on AWS:

amplify status

amplify status command and output

Run the following command to publish the full application:

amplify publish

This will take several minutes to complete. Accept all defaults apart from Enter maximum statement depth [increase from default if your schema is deeply nested], which must be set to 5.

amplify publish command and output - animated

amplify publish command and output

All the resources are now deployed on AWS and ready for use.

Use the application

You can start using the application from the Amplify hosted domain.

  1. Run the following command to retrieve the application URL:
amplify status

amplify status command and output

At first access, the application shows the Amazon Cognito login page.

  1. Choose Create Account and create a user with user name user1 (this is mapped in the application to the role lf-app-access-role-1 for which we created Lake Formation permissions in the first post).

  1. Enter the confirmation code that you received through email and choose Sign In.

When you’re logged in, you can start interacting with the application.

Application starting screen

Controls

The application offers several controls:

  • Database – You can select a database registered with Lake Formation with the Describe permission.

Application database control

  • Table – You can choose a table with Select permission.

Application Table and Number of Records controls

  • Number of records – This indicates the number of records (between 5–40) to display on the Data Because this is a sample application, no pagination was implemented in the backend.
  • Row type – Enable this option to display only rows that have at least one cell with authorized data. If all cells in a row are unauthorized and checkbox is selected, the row is not displayed.

Outputs

The application has four outputs, organized in tabs.

Unfiltered Table Metadata

This tab displays the response of the AWS Glue API GetUnfilteredTableMetadata policies for the selected table. The following is an example of the content:

{
  "Table": {
    "Name": "users_tbl",
    "DatabaseName": "lf-app-entities",
    "CreateTime": "2024-07-10T10:00:26+00:00",
    "UpdateTime": "2024-07-10T11:41:36+00:00",
    "Retention": 0,
    "StorageDescriptor": {
      "Columns": [
        {
          "Name": "uid",
          "Type": "int"
        },
        {
          "Name": "name",
          "Type": "string"
        },
        {
          "Name": "surname",
          "Type": "string"
        },
        {
          "Name": "state",
          "Type": "string"
        },
        {
          "Name": "city",
          "Type": "string"
        },
        {
          "Name": "address",
          "Type": "string"
        }
      ],
      "Location": "s3://lf-app-data-123456789012/datasets/lf-app-entities/users/",
      "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
      "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
      "Compressed": false,
      "NumberOfBuckets": 0,
      "SerdeInfo": {
        "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
        "Parameters": {
          "field.delim": ","
        }
      },
      "SortColumns": [],
      "StoredAsSubDirectories": false
    },
    "PartitionKeys": [],
    "TableType": "EXTERNAL_TABLE",
    "Parameters": {
      "classification": "csv"
    },
    "CreatedBy": "arn:aws:sts::123456789012:assumed-role/Admin/fmarelli",
    "IsRegisteredWithLakeFormation": true,
    "CatalogId": "123456789012",
    "VersionId": "1"
  },
  "AuthorizedColumns": [
    "city",
    "state",
    "uid"
  ],
  "IsRegisteredWithLakeFormation": true,
  "CellFilters": [
    {
      "ColumnName": "city",
      "RowFilterExpression": "TRUE"
    },
    {
      "ColumnName": "state",
      "RowFilterExpression": "TRUE"
    },
    {
      "ColumnName": "uid",
      "RowFilterExpression": "TRUE"
    }
  ],
  "ResourceArn": "arn:aws:glue:us-east-1:123456789012:table/lf-app-entities/users"
}

Unfiltered Partitions Metadata

This tab displays the response of the AWS Glue API GetUnfileteredPartitionsMetadata policies for the selected table. The following is an example of the content:

{
  "UnfilteredPartitions": [
    {
      "Partition": {
        "Values": [
          "1991"
        ],
        "DatabaseName": "lf-app-entities",
        "TableName": "users_partitioned_tbl",
        "CreationTime": "2024-07-10T11:34:32+00:00",
        "LastAccessTime": "1970-01-01T00:00:00+00:00",
        "StorageDescriptor": {
          "Columns": [
            {
              "Name": "uid",
              "Type": "int"
            },
            {
              "Name": "name",
              "Type": "string"
            },
            {
              "Name": "surname",
              "Type": "string"
            },
            {
              "Name": "state",
              "Type": "string"
            },
            {
              "Name": "city",
              "Type": "string"
            },
            {
              "Name": "address",
              "Type": "string"
            }
          ],
          "Location": "s3://lf-app-data-123456789012/datasets/lf-app-entities/users_partitioned/born_year=1991",
          "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
          "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
          "Compressed": false,
          "NumberOfBuckets": 0,
          "SerdeInfo": {
            "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
            "Parameters": {
              "field.delim": ","
            }
          },
          "BucketColumns": [],
          "SortColumns": [],
          "Parameters": {},
          "StoredAsSubDirectories": false
        },
        "CatalogId": "123456789012"
      },
      "AuthorizedColumns": [
        "address",
        "city",
        "name",
        "state",
        "surname",
        "uid"
      ],
      "IsRegisteredWithLakeFormation": true
    },
    {
      "Partition": {
        "Values": [
          "1990"
        ],
        "DatabaseName": "lf-app-entities",
        "TableName": "users_partitioned_tbl",
        "CreationTime": "2024-07-10T11:34:32+00:00",
        "LastAccessTime": "1970-01-01T00:00:00+00:00",
        "StorageDescriptor": {
          "Columns": [
            {
              "Name": "uid",
              "Type": "int"
            },
            {
              "Name": "name",
              "Type": "string"
            },
            {
              "Name": "surname",
              "Type": "string"
            },
            {
              "Name": "state",
              "Type": "string"
            },
            {
              "Name": "city",
              "Type": "string"
            },
            {
              "Name": "address",
              "Type": "string"
            }
          ],
          "Location": "s3://lf-app-data-123456789012/datasets/lf-app-entities/users_partitioned/born_year=1990",
          "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
          "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
          "Compressed": false,
          "NumberOfBuckets": 0,
          "SerdeInfo": {
            "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
            "Parameters": {
              "field.delim": ","
            }
          },
          "BucketColumns": [],
          "SortColumns": [],
          "Parameters": {},
          "StoredAsSubDirectories": false
        },
        "CatalogId": "123456789012"
      },
      "AuthorizedColumns": [
        "address",
        "city",
        "name",
        "state",
        "surname",
        "uid"
      ],
      "IsRegisteredWithLakeFormation": true
    }
  ]
}

Authorized Data

This tab displays a table that shows the columns, rows, and cells that the user is authorized to access.

Application Authorized Data tab

A cell is marked as Unauthorized if the user has no permissions to access its contents, according to the cell filter definition. You can choose the unauthorized cell to view the relevant cell filter condition.

Application Authorized Data tab cell pop up example

In this example, the user can’t access the value of column surname in the first row because for the row, state is canada, but the cell can only be accessed when state=’united kingdom’.

If the Only rows with authorized data control is unchecked, rows with all cells set to Unauthorized are also displayed.

All Data

This tab contains a table that contains all the rows and columns in the table (the unfiltered data). This is useful for comparison with authorized data to understand how cell filters are applied to the unfiltered data.

Application All Data tab

Test Lake Formation permissions

Log out of the application and go to the Amazon Cognito login form, choose Create Account, and create a new user with called user2 (this is mapped in the application to the role lf-app-access-role-2 that we created Lake Formation permissions for in the first post). Get table data and metadata for this user to see how Lake Formation permissions are enforced and so the two users can see different data (on the Authorized Data tab).

The following screenshot shows that the Lake Formation permissions we created grant access to the following data (all rows, all columns) of table users_partitioned_tbl to user2 (mapped to lf-app-access-role-2).

Application Authorized Data tab for user2 on table users_partitioned_tbl

The following screenshot shows that the Lake Formation permissions we created grant access to the following data (all rows, but only city, state, and uid columns) of table users_tbl to user2 (mapped to lf-app-access-role-2).

Application Authorized Data tab for user2 on table users_partitioned

Considerations for the GraphQL API

You can use the AWS AppSync GraphQL API deployed in this post for other applications; the responses of the GetUnfilteredTableMetadata and GetUnfileteredPartitionsMetadata AWS Glue APIs were fully mapped in the GraphQL schema. You can use the Queries page on the AWS AppSync console to run the queries; this is based on GraphiQL.

AWS AppSync Queries page

You can use the following object to define the query variables:

{ 
  "db": "lf-app-entities",
  "table": "users_partitioned_tbl",
  "noOfRecs": 30,
  "nonNullRowsOnly": true
} 

The following code shows the queries available with input parameters and all fields defined in the schema as output:

  query GetDbs {
    getDbs {
      catalogId
      name
      description
    }
  }

  query GetTablesByDb($db: String!) {
    getTablesByDb(db: $db) {
      Name
      DatabaseName
      Location
      IsPartitioned
    }
  }
  
  query GetTableData(
    $db: String!
    $table: String!
    $noOfRecs: Int
    $nonNullRowsOnly: Boolean!
  ) {
    getTableData(
      db: $db
      table: $table
      noOfRecs: $noOfRecs
      nonNullRowsOnly: $nonNullRowsOnly
    ) {
      database
      name
      location
      authorizedColumns {
        Name
        Type
      }
      authorizedData
      allColumns {
        Name
        Type
      }
      allData
      filteredCellPh
      cellFilters {
        ColumnName
        RowFilterExpression
      }
    }
  }

  query GetUnfilteredTableMetadata($db: String!, $table: String!) {
    getUnfilteredTableMetadata(db: $db, table: $table) {
      JsonResp
      ApiResp {
        Table {
          Name
          DatabaseName
          Description
          Owner
          CreateTime
          UpdateTime
          LastAccessTime
          LastAnalyzedTime
          Retention
          StorageDescriptor {
            Columns {
              Name
              Type
              Comment
            }
            Location
            AdditionalLocations
            InputFormat
            OutputFormat
            Compressed
            NumberOfBuckets
            SerdeInfo {
              Name
              SerializationLibrary
            }
            BucketColumns
            SortColumns {
              Column
              SortOrder
            }
            Parameters {
              Name
              Value
            }
            SkewedInfo {
              SkewedColumnNames
              SkewedColumnValues
            }
            StoredAsSubDirectories
            SchemaReference {
              SchemaVersionId
              SchemaVersionNumber
            }
          }
          PartitionKeys {
            Name
            Type
            Comment
            Parameters {
              Name
              Value
            }
          }
          ViewOriginalText
          ViewExpandedText
          TableType
          Parameters {
            Name
            Value
          }
          CreatedBy
          IsRegisteredWithLakeFormation
          TargetTable {
            CatalogId
            DatabaseName
            Name
            Region
          }
          CatalogId
          VersionId
          FederatedTable {
            Identifier
            DatabaseIdentifier
            ConnectionName
          }
          ViewDefinition {
            IsProtected
            Definer
            SubObjects
            Representations {
              Dialect
              DialectVersion
              ViewOriginalText
              ViewExpandedText
              ValidationConnection
              IsStale
            }
          }
          IsMultiDialectView
        }
        AuthorizedColumns
        IsRegisteredWithLakeFormation
        CellFilters {
          ColumnName
          RowFilterExpression
        }
        QueryAuthorizationId
        IsMultiDialectView
        ResourceArn
        IsProtected
        Permissions
        RowFilter
      }
    }
  }

  query GetUnfilteredPartitionsMetadata($db: String!, $table: String!) {
    getUnfilteredPartitionsMetadata(db: $db, table: $table) {
      JsonResp
      ApiResp {
        Partition {
          Values
          DatabaseName
          TableName
          CreationTime
          LastAccessTime
          StorageDescriptor {
            Columns {
              Name
              Type
              Comment
            }
            Location
            AdditionalLocations
            InputFormat
            OutputFormat
            Compressed
            NumberOfBuckets
            SerdeInfo {
              Name
              SerializationLibrary
            }
            BucketColumns
            SortColumns {
              Column
              SortOrder
            }
            Parameters {
              Name
              Value
            }
            SkewedInfo {
              SkewedColumnNames
              SkewedColumnValues
            }
            StoredAsSubDirectories
            SchemaReference {
              SchemaVersionId
              SchemaVersionNumber
            }
          }
          Parameters {
            Name
            Value
          }
          LastAnalyzedTime
          CatalogId
        }
        AuthorizedColumns
        IsRegisteredWithLakeFormation
      }
    }
  }

Clean up

To remove the resources created in this post, run the following command:

amplify delete

amplify delete command and output

Refer to Part 1 to clean up the resources created in the first part of this series.

Conclusion

In this post, we showed how to implement a web application that uses a GraphQL API implemented with AWS AppSync and Lambda as the backend for a web application integrated with Lake Formation. You should now have a comprehensive understanding of how to extend the capabilities of Lake Formation by building and integrating your own custom data processing applications.

Try out this solution for yourself, and share your feedback and questions in the comments.


About the Authors

Stefano Sandona Picture Stefano Sandonà is a Senior Big Data Specialist Solution Architect at AWS. Passionate about data, distributed systems, and security, he helps customers worldwide architect high-performance, efficient, and secure data platforms.

Francesco Marelli PictureFrancesco Marelli is a Principal Solutions Architect at AWS. He specializes in the design, implementation, and optimization of large-scale data platforms. Francesco leads the AWS Solution Architect (SA) analytics team in Italy. He loves sharing his professional knowledge and is a frequent speaker at AWS events. Francesco is also passionate about music.

Replicate changes from databases to Apache Iceberg tables using Amazon Data Firehose (in preview)

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/replicate-changes-from-databases-to-apache-iceberg-tables-using-amazon-data-firehose/

Today, we’re announcing the availability, in preview, of a new capability in Amazon Data Firehose that captures changes made in databases such as PostgreSQL and MySQL and replicates the updates to Apache Iceberg tables on Amazon Simple Storage Service (Amazon S3).

Apache Iceberg is a high-performance open-source table format for performing big data analytics. Apache Iceberg brings the reliability and simplicity of SQL tables to S3 data lakes and makes it possible for open source analytics engines such as Apache Spark, Apache Flink, Trino, Apache Hive, and Apache Impala to concurrently work with the same data.

This new capability provides a simple, end-to-end solution to stream database updates without impacting transaction performance of database applications. You can set up a Data Firehose stream in minutes to deliver change data capture (CDC) updates from your database. Now, you can easily replicate data from different databases into Iceberg tables on Amazon S3 and use up-to-date data for large-scale analytics and machine learning (ML) applications.

Typical Amazon Web Services (AWS) enterprise customers use hundreds of databases for transactional applications. To perform large scale analytics and ML on the latest data, they want to capture changes made in databases, such as when records in a table are inserted, modified, or deleted, and deliver the updates to their data warehouse or Amazon S3 data lake in open source table formats such as Apache Iceberg.

To do so, many customers develop extract, transform, and load (ETL) jobs to periodically read from databases. However, ETL readers impact database transaction performance, and batch jobs can add several hours of delay before data is available for analytics. To mitigate impact on database transaction performance, customers want the ability to stream changes made in the database. This stream is referred to as a change data capture (CDC) stream.

I met multiple customers that use open source distributed systems, such as Debezium, with connectors to popular databases, an Apache Kafka Connect cluster, and Kafka Connect Sink to read the events and deliver them to the destination. The initial configuration and test of such systems involves installing and configuring multiple open source components. It might take days or weeks. After setup, engineers have to monitor and manage clusters, and validate and apply open source updates, which adds to the operational overhead.

With this new data streaming capability, Amazon Data Firehose adds the ability to acquire and continually replicate CDC streams from databases to Apache Iceberg tables on Amazon S3. You set up a Data Firehose stream by specifying the source and destination. Data Firehose captures and continually replicates an initial data snapshot and then all subsequent changes made to the selected database tables as a data stream. To acquire CDC streams, Data Firehose uses the database replication log, which reduces impact on database transaction performance. When the volume of database updates increases or decreases, Data Firehose automatically partitions the data, and persists records until they’re delivered to the destination. You don’t have to provision capacity or manage and fine-tune clusters. In addition to the data itself, Data Firehose can automatically create Apache Iceberg tables using the same schema as the database tables as part of the initial Data Firehose stream creation and automatically evolve the target schema, such as new column addition, based on source schema changes.

Since Data Firehose is a fully managed service, you don’t have to rely on open source components, apply software updates, or incur operational overhead.

The continual replication of database changes to Apache Iceberg tables in Amazon S3 using Amazon Data Firehose provides you with a simple, scalable, end-to-end managed solution to deliver CDC streams into your data lake or data warehouse, where you can run large-scale analysis and ML applications.

Let’ see how to configure a new pipeline
To show you how to create a new CDC pipeline, I setup a Data Firehose stream using the AWS Management Console. As usual, I also have the choice to use the AWS Command Line Interface (AWS CLI), AWS SDKs, AWS CloudFormation, or Terraform.

For this demo, I choose a MySQL database on Amazon Relational Database Service (Amazon RDS) as source. Data Firehose also works with self-managed databases on Amazon Elastic Compute Cloud (Amazon EC2). To establish connectivity between my virtual private cloud (VPC)—where the database is deployed—and the RDS API without exposing the traffic to the internet, I create an AWS PrivateLink VPC service endpoint. You can learn how to create a VPC service endpoint for RDS API by following instructions in the Amazon RDS documentation.

I also have an S3 bucket to host the Iceberg table, and I have an AWS Identity and Access Management (IAM) role setup with correct permissions. You can refer to the list of prerequisites in the Data Firehose documentation.

To get started, I open the console and navigate to the Amazon Data Firehose section. I can see the stream already created. To create a new one, I select Create Firehose stream.

Create Firehose Stream

I select a Source and Destination. In this example: a MySQL database and Apache Iceberg Tables. I also enter a Firehose stream name for my stream.

Create Firehose Stream - screen 1

I enter the fully qualified DNS name of my Database endpoint and the Database VPC endpoint service name. I verify that Enable SSL is checked and, under Secret name, I select the name of the secret in AWS Secrets Manager where the database username and password are securely stored.

Create Firehose Stream - screen 2

Next, I configure Data Firehose to capture specific data by specifying databases, tables, and columns using explicit names or regular expressions.

I must create a watermark table. A watermark, in this context, is a marker used by Data Firehose to track the progress of incremental snapshots of database tables. It helps Data Firehose identify which parts of the table have already been captured and which parts still need to be processed. I can create the watermark table manually or let Data Firehose automatically create it for me. In that case, the database credentials passed to Data Firehose must have permissions to create a table in the source database.

Create Firehose Stream - screen 3

Next, I configure the S3 bucket Region and name to use. Data Firehose can automatically create the Iceberg tables when they don’t exist yet. Similarly, it can update the Iceberg table schema when detecting a change in your database schema.

Create Firehose Stream - screen 4

As a final step, it’s important to enable Amazon CloudWatch error logging to get feedback about the stream progress and the eventual errors. You can configure a short retention period on the CloudWatch log group to reduce the cost of log storage.

After having reviewed my configuration, I select Create Firehose stream.

Create Firehose Stream - screen 5

Once the stream is created, it will start to replicate the data. I can monitor the stream’s status and check for eventual errors.

Create Firehose Stream - screen 6

Now, it’s time to test the stream.

I open a connection to the database and insert a new line in a table.

Firehose - MySQL

Then, I navigate to the S3 bucket configured as the destination and I observe that a file has been created to store the data from the table.

View parquet files on S3 bucket

I download the file and inspect its content with the parq command (you can install that command with pip install parquet-cli)

Parquet file content

Of course, downloading and inspecting Parquet files is something I do only for demos. In real life, you’re going to use AWS Glue and Amazon Athena to manage your data catalog and to run SQL queries on your data.

Things to know
Here are a few additional things to know.

This new capability supports self-managed PostgreSQL and MySQL databases on Amazon EC2 and the following databases on Amazon RDS:

The team will continue to add support for additional databases during the preview period and after general availability. They told me they are already working on supporting SQL Server, Oracle, and MongoDB databases.

Data Firehose uses AWS PrivateLink to connect to databases in your Amazon Virtual Private Cloud (Amazon VPC).

When setting up an Amazon Data Firehose delivery stream, you can either specify specific tables and columns or use wildcards to specify a class of tables and columns. When you use wildcards, if new tables and columns are added to the database after the Data Firehose stream is created and if they match the wildcard, Data Firehose will automatically create those tables and columns in the destination.

Pricing and availability
The new data streaming capability is available today in all AWS Regions except China Regions, AWS GovCloud (US) Regions, and Asia Pacific (Malaysia) Regions. We want you to evaluate this new capability and provide us with feedback. There are no charges for your usage at the beginning of the preview. At some point in the future, it will be priced based on your actual usage, for example, based on the quantity of bytes read and delivered. There are no commitments or upfront investments. Make sure to read the pricing page to get the details.

Now, go configure your first continual database replication to Apache Iceberg tables on Amazon S3 and visit http://aws.amazon.com/firehose.

— seb

Enrich your AWS Glue Data Catalog with generative AI metadata using Amazon Bedrock

Post Syndicated from Manos Samatas original https://aws.amazon.com/blogs/big-data/enrich-your-aws-glue-data-catalog-with-generative-ai-metadata-using-amazon-bedrock/

Metadata can play a very important role in using data assets to make data driven decisions. Generating metadata for your data assets is often a time-consuming and manual task. By harnessing the capabilities of generative AI, you can automate the generation of comprehensive metadata descriptions for your data assets based on their documentation, enhancing discoverability, understanding, and the overall data governance within your AWS Cloud environment. This post shows you how to enrich your AWS Glue Data Catalog with dynamic metadata using foundation models (FMs) on Amazon Bedrock and your data documentation.

AWS Glue is a serverless data integration service that makes it straightforward for analytics users to discover, prepare, move, and integrate data from multiple sources. Amazon Bedrock is a fully managed service that offers a choice of high-performing FMs from leading AI companies like AI21 Labs, Anthropic, Cohere, Meta, Mistral AI, Stability AI, and Amazon through a single API.

Solution overview

In this solution, we automatically generate metadata for table definitions in the Data Catalog by using large language models (LLMs) through Amazon Bedrock. First, we explore the option of in-context learning, where the LLM generates the requested metadata without documentation. Then we improve the metadata generation by adding the data documentation to the LLM prompt using Retrieval Augmented Generation (RAG).

AWS Glue Data Catalog

This post uses the Data Catalog, a centralized metadata repository for your data assets across various data sources. The Data Catalog provides a unified interface to store and query information about data formats, schemas, and sources. It acts as an index to the location, schema, and runtime metrics of your data sources.

The most common method to populate the Data Catalog is to use an AWS Glue crawler, which automatically discovers and catalogs data sources. When you run the crawler, it creates metadata tables that are added to a database you specify or the default database. Each table represents a single data store.

Generative AI models

LLMs are trained on vast volumes of data and use billions of parameters to generate outputs for common tasks like answering questions, translating languages, and completing sentences. To use an LLM for a specific task like metadata generation, you need an approach to guide the model to produce the outputs you expect.

This post shows you how to generate descriptive metadata for your data with two different approaches:

  • In-context learning
  • Retrieval Augmented Generation (RAG)

The solutions uses two generative AI models available in Amazon Bedrock: for text generation and Amazon Titan Embeddings V2 for text retrieval tasks.

The following sections describe the implementation details of each approach using the Python programming language. You can find the accompanying code in the GitHub repository. You can implement it step by step in Amazon SageMaker Studio and JupyterLab or your own environment. If you’re new to SageMaker Studio, check out the Quick setup experience, which allows you to launch it with default settings in minutes. You can also use the code in an AWS Lambda function or your own application.

Approach 1: In-context learning

In this approach, you use an LLM to generate the metadata descriptions. You employ prompt engineering techniques to guide the LLM on the outputs you want it to generate. This approach is ideal for AWS Glue databases with a small number of tables. You can send the table information from the Data Catalog as context in your prompt without exceeding the context window (the number of input tokens that most Amazon Bedrock models accept). The following diagram illustrates this architecture.

Approach 2: RAG architecture

If you have hundreds of tables, adding all of the Data Catalog information as context to the prompt may lead to a prompt that exceeds the LLM’s context window. In some cases, you may also have additional content such as business requirements documents or technical documentation you want the FM to reference before generating the output. Such documents can be several pages that typically exceed the maximum number of input tokens most LLMs will accept. As a result, they can’t be included in the prompt as they are.

The solution is to use a RAG approach. With RAG, you can optimize the output of an LLM so it references an authoritative knowledge base outside of its training data sources before generating a response. RAG extends the already powerful capabilities of LLMs to specific domains or an organization’s internal knowledge base, without the need to fine-tune the model. It is a cost-effective approach to improving LLM output, so it remains relevant, accurate, and useful in various contexts.

With RAG, the LLM can reference technical documents and other information about your data before generating the metadata. As a result, the generated descriptions are expected to be richer and more accurate.

The example in this post ingests data from a public Amazon Simple Storage Service (Amazon S3): s3://awsglue-datasets/examples/us-legislators/all. The dataset contains data in JSON format about US legislators and the seats that they have held in the U.S. House of Representatives and U.S. Senate. The data documentation was retrieved from and the Popolo specification http://www.popoloproject.com/.

The following architecture diagram illustrates the RAG approach.

 

The steps are as follows:

  1. Ingest the information from the data documentation. The documentation can be in a variety of formats. For this post, the documentation is a website.
  2. Chunk the contents of the HTML page of the data documentation. Generate and store vector embeddings for the data documentation.
  3. Fetch information for the database tables from the Data Catalog.
  4. Perform a similarity search in the vector store and retrieve the most relevant information from the vector store.
  5. Build the prompt. Provide instructions on how to create metadata and add the retrieved information and the Data Catalog table information as context. Because this is a rather small database, containing six tables, all of the information about the database is included.
  6. Send the prompt to the LLM, get the response, and update the Data Catalog.

Prerequisites

To follow the steps in this post and deploy the solution in your own AWS account, refer to the GitHub repository.

You need the following prerequisite resources:

 {
   "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Action": [
              "s3:GetObject",
              "s3:PutObject"
          ],
          "Resource": [
              "arn:aws:s3:::aws-gen-ai-glue-metadata-*/*"
          ]
        }
    ]
}
  • An IAM role for your notebook environment. The IAM role should have the appropriate permissions for AWS Glue, Amazon Bedrock, and Amazon S3. The following is an example policy. You can apply additional conditions to restrict it further for your own environment.
{
      "Version": "2012-10-17",
      "Statement": [
           {
                 "Sid": "GluePermissions",
                 "Effect": "Allow",
                 "Action": [
                      "glue:GetCrawler",
                      "glue:DeleteDatabase",
                      "glue:GetTables",
                      "glue:DeleteCrawler",
                      "glue:StartCrawler",
                      "glue:CreateDatabase",
                      "glue:UpdateTable",
                      "glue:DeleteTable",
                      "glue:UpdateCrawler",
                      "glue:GetTable",
                      "glue:CreateCrawler"
                 ],
                 "Resource": "*"
           },
           {
                 "Sid": "S3Permissions",
                 "Effect": "Allow",
                 "Action": [
                      "s3:PutObject",
                      "s3:GetObject",
                      "s3:CreateBucket",
                      "s3:ListBucket",
                      "s3:DeleteObject",
                      "s3:DeleteBucket"
                 ],
                 "Resource": "arn:aws:s3:::<bucket_name>"
           },
           {
                 "Sid": "IAMPermissions",
                 "Effect": "Allow",
                 "Action": "iam:PassRole",
                 "Resource": "arn:aws:iam::<account_ID>:role/GlueCrawlerRoleBlog"

           },
           {
                 "Sid": "BedrockPermissions",
                 "Effect": "Allow",
                 "Action": "bedrock:InvokeModel",
                 "Resource": [
                      "arn:aws:bedrock:*::foundation-model/anthropic.claude-3-sonnet-20240229-v1:0",
                      "arn:aws:bedrock:*::foundation-model/amazon.titan-embed-text-v2:0"
                 ]
           }
      ]
}
  • Model access for Anthropic’s Claude 3 and Amazon Titan Text Embeddings V2 on Amazon Bedrock.
  • The notebook glue-catalog-genai_claude.ipynb.

Set up the resources and environment

Now that you have completed the prerequisites, you can switch to the notebook environment to run the next steps. First, the notebook will create the required resources:

  • S3 bucket
  • AWS Glue database
  • AWS Glue crawler, which will run and automatically generate the database tables

After you finish the setup steps, you will have an AWS Glue database called legislators.

The crawler creates the following metadata tables:

  • persons
  • memberships
  • organizations
  • events
  • areas
  • countries

This is a semi-normalized collection of tables containing legislators and their histories.

Follow the rest of the steps in the notebook to complete the environment setup. It should only take a few minutes.

Inspect the Data Catalog

Now that you have completed the setup, you can inspect the Data Catalog to familiarize yourself with it and the metadata it captured. On the AWS Glue console, choose Databases in the navigation pane, then open the newly created legislators database. It should contain six tables, as shown in the following screenshot:

You can open any table to inspect the details. The table description and comment for each column is empty because they aren’t completed automatically by the AWS Glue crawlers.

You can use the AWS Glue API to programmatically access the technical metadata for each table. The following code snippet uses the AWS Glue API through the AWS SDK for Python (Boto3) to retrieve tables for a chosen database and then prints them on the screen for validation. The following code, found in the notebook of this post, is used to get the data catalog information programmatically.

def get_alltables(database):
    tables = []
    get_tables_paginator = glue_client.get_paginator('get_tables')
    for page in get_tables_paginator.paginate(DatabaseName=database):
        tables.extend(page['TableList'])
    return tables

def json_serial(obj):
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    raise TypeError ("Type %s not serializable" % type(obj))

database_tables =  get_alltables(database)

for table in database_tables:
    print(f"Table: {table['Name']}")
    print(f"Columns: {[col['Name'] for col in table['StorageDescriptor']['Columns']]}")

Now that you’re familiar with the AWS Glue database and tables, you can move to the next step to generate table metadata descriptions with generative AI.

Generate table metadata descriptions with Anthropic’s Claude 3 using Amazon Bedrock and LangChain

In this step, we generate technical metadata for a selected table that belongs to an AWS Glue database. This post uses the persons table. First, we get all the tables from the Data Catalog and include it as part of the prompt. Even though our code aims to generate metadata for a single table, giving the LLM wider information is useful because you want the LLM to detect foreign keys. In our notebook environment we install LangChain v0.2.1. See the following code:

from langchain_core.output_parsers import StrOutputParser
from langchain_core.prompts import ChatPromptTemplate
from botocore.config import Config
from langchain_aws import ChatBedrock

glue_data_catalog = json.dumps(get_alltables(database),default=json_serial)


model_kwargs ={
    "temperature": 0.5, # You can increase or decrease this value depending on the amount of randomness you want injected into the response. A value closer to 1 increases the amount of randomness.
    "top_p": 0.999
}

model = ChatBedrock(
    client = bedrock_client,
    model_id=model_id,
    model_kwargs=model_kwargs
)

table = "persons"
response_get_table = glue_client.get_table( DatabaseName = database, Name = table )
pprint.pp(response_get_table)

user_msg_template_table="""
I'd like you to create metadata descriptions for the table called {table} in your AWS Glue data catalog. Please follow these steps:
1. Review the data catalog carefully
2. Use all the data catalog information to generate the table description
3. If a column is a primary key or foreign key to another table mention it in the description.
4. In your response, reply with the entire JSON object for the table {table}
5. Remove the DatabaseName, CreatedBy, IsRegisteredWithLakeFormation, CatalogId,VersionId,IsMultiDialectView,CreateTime, UpdateTime.
6. Write the table description in the Description attribute
7. List all the table columns under the attribute "StorageDescriptor" and then the attribute Columns. Add Location, InputFormat, and SerdeInfo
8. For each column in the StorageDescriptor, add the attribute "Comment". If a table uses a composite primary key, then the order of a given column in a table’s primary key is listed in parentheses following the column name.
9. Your response must be a valid JSON object.
10. Ensure that the data is accurately represented and properly formatted within the JSON structure. The resulting JSON table should provide a clear, structured overview of the information presented in the original text.
11. If you cannot think of an accurate description of a column, say 'not available'
Here is the data catalog json in <glue_data_catalog></glue_data_catalog> tags.
<glue_data_catalog>
{data_catalog}
</glue_data_catalog>
Here is some additional information about the database in <notes></notes> tags.
<notes>
Typically foreign key columns consist of the name of the table plus the id suffix
<notes>
"""
messages = [
    ("system", "You are a helpful assistant"),
    ("user", user_msg_template_table),
]

prompt = ChatPromptTemplate.from_messages(messages)

chain = prompt | model | StrOutputParser()

# Chain Invoke

TableInputFromLLM = chain.invoke({"data_catalog": {glue_data_catalog}, "table":table})
print(TableInputFromLLM)

In the preceding code, you instructed the LLM to provide a JSON response that fits the TableInput object expected by the Data Catalog update API action. The following is an example response:

{
  "Name": "persons",
  "Description": "This table contains information about individual persons, including their names, identifiers, contact details, and other relevant personal data.",
  "StorageDescriptor": {
    "Columns": [
      {
        "Name": "family_name",
        "Type": "string",
        "Comment": "The family name or surname of the person."
      },
      {
        "Name": "name",
        "Type": "string",
        "Comment": "The full name of the person."
      },
      {
        "Name": "links",
        "Type": "array<struct<note:string,url:string>>",
        "Comment": "An array of links related to the person, containing a note and URL."
      },
      {
        "Name": "gender",
        "Type": "string",
        "Comment": "The gender of the person."
      },
      {
        "Name": "image",
        "Type": "string",
        "Comment": "A URL or path to an image of the person."
      },
      {
        "Name": "identifiers",
        "Type": "array<struct<scheme:string,identifier:string>>",
        "Comment": "An array of identifiers for the person, each with a scheme and identifier value."
      },
      {
        "Name": "other_names",
        "Type": "array<struct<lang:string,note:string,name:string>>",
        "Comment": "An array of other names the person may be known by, including the language, a note, and the name itself."
      },

      {
        "Name": "sort_name",
        "Type": "string",
        "Comment": "The name to be used for sorting or alphabetical ordering."
      },
      {
        "Name": "images",
        "Type": "array<struct<url:string>>",
        "Comment": "An array of URLs or paths to additional images of the person."
      },
      {
        "Name": "given_name",
        "Type": "string",
        "Comment": "The given name or first name of the person."
      },
      {
        "Name": "birth_date",
        "Type": "string",
        "Comment": "The date of birth of the person."
      },
      {
        "Name": "id",
        "Type": "string",
        "Comment": "The unique identifier for the person (likely a primary key)."
      },
      {
        "Name": "contact_details",
        "Type": "array<struct<type:string,value:string>>",
        "Comment": "An array of contact details for the person, including the type (e.g., email, phone) and the value."
      },
      {
        "Name": "death_date",
        "Type": "string",
        "Comment": "The date of death of the person, if applicable."
      }
    ],
    "Location": "s3://<your-s3-bucket>/persons/",
    "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
    "SerdeInfo": {
      "SerializationLibrary": "org.openx.data.jsonserde.JsonSerDe",
      "Parameters": {
        "paths": "birth_date,contact_details,death_date,family_name,gender,given_name,id,identifiers,image,images,links,name,other_names,sort_name"
      }
    }
  },
  "PartitionKeys": [],
  "TableType": "EXTERNAL_TABLE"
}

You can also validate the JSON generated to make sure it conforms to the format expected by the AWS Glue API:

from jsonschema import validate

schema_table_input = {
    "type": "object",
    "properties" : {
            "Name" : {"type" : "string"},
            "Description" : {"type" : "string"},
            "StorageDescriptor" : {
            "Columns" : {"type" : "array"},
            "Location" : {"type" : "string"} ,
            "InputFormat": {"type" : "string"} ,
            "SerdeInfo": {"type" : "object"}
        }
    }
}
validate(instance=json.loads(TableInputFromLLM), schema=schema_table_input)

Now that you have generated table and column descriptions, you can update the Data Catalog.

Update the Data Catalog with metadata

In this step, use the AWS Glue API to update the Data Catalog:

response = glue_client.update_table(DatabaseName=database, TableInput= json.loads(TableInputFromLLM) )
print(f"Table {table} metadata updated!")

The following screenshot shows the persons table metadata with a description.

The following screenshot shows the table metadata with column descriptions.

Now that you have enriched the technical metadata stored in Data Catalog, you can improve the descriptions by adding external documentation.

Improve metadata descriptions by adding external documentation with RAG

In this step, we add external documentation to generate more accurate metadata. The documentation for our dataset can be found online as an HTML. We use the LangChain HTML community loader to load the HTML content:

from langchain_community.document_loaders import AsyncHtmlLoader

# We will use an HTML Community loader to load the external documentation stored on HTLM
urls = ["http://www.popoloproject.com/specs/person.html", "http://docs.everypolitician.org/data_structure.html",'http://www.popoloproject.com/specs/organization.html','http://www.popoloproject.com/specs/membership.html','http://www.popoloproject.com/specs/area.html']
loader = AsyncHtmlLoader(urls)
docs = loader.load()

After you download the documents, split the documents into chunks:

text_splitter = CharacterTextSplitter(
    separator='\n',
    chunk_size=1000,
    chunk_overlap=200,

)
split_docs = text_splitter.split_documents(docs)

embedding_model = BedrockEmbeddings(
    client=bedrock_client,
    model_id=embeddings_model_id
)

Next, vectorize and store the documents locally and perform a similarity search. For production workloads, you can use a managed service for your vector store such as Amazon OpenSearch Service or a fully managed solution for implementing the RAG architecture such as Amazon Bedrock Knowledge Bases.

vs = FAISS.from_documents(split_docs, embedding_model)
search_results = vs.similarity_search(
    'What standards are used in the dataset?', k=2
)
print(search_results[0].page_content)

Next, include the catalog information along with the documentation to generate more accurate metadata:

from operator import itemgetter
from langchain_core.callbacks import BaseCallbackHandler
from typing import Dict, List, Any


class PromptHandler(BaseCallbackHandler):
    def on_llm_start( self, serialized: Dict[str, Any], prompts: List[str], **kwargs: Any) -> Any:
        output = "\n".join(prompts)
        print(output)

system = "You are a helpful assistant. You do not generate any harmful content."
# specify a user message
user_msg_rag = """
Here is the guidance document you should reference when answering the user:

<documentation>{context}</documentation>
I'd like to you create metadata descriptions for the table called {table} in your AWS Glue data catalog. Please follow these steps:

1. Review the data catalog carefully.
2. Use all the data catalog information and the documentation to generate the table description.
3. If a column is a primary key or foreign key to another table mention it in the description.
4. In your response, reply with the entire JSON object for the table {table}
5. Remove the DatabaseName, CreatedBy, IsRegisteredWithLakeFormation, CatalogId,VersionId,IsMultiDialectView,CreateTime, UpdateTime.
6. Write the table description in the Description attribute. Ensure you use any relevant information from the <documentation>
7. List all the table columns under the attribute "StorageDescriptor" and then the attribute Columns. Add Location, InputFormat, and SerdeInfo
8. For each column in the StorageDescriptor, add the attribute "Comment". If a table uses a composite primary key, then the order of a given column in a table’s primary key is listed in parentheses following the column name.
9. Your response must be a valid JSON object.
10. Ensure that the data is accurately represented and properly formatted within the JSON structure. The resulting JSON table should provide a clear, structured overview of the information presented in the original text.
11. If you cannot think of an accurate description of a column, say 'not available'
<glue_data_catalog>
{data_catalog}
</glue_data_catalog>
Here is some additional information about the database in <notes></notes> tags.
<notes>
Typically foreign key columns consist of the name of the table plus the id suffix
<notes>
"""
messages = [
    ("system", system),
    ("user", user_msg_rag),
]
prompt = ChatPromptTemplate.from_messages(messages)

# Retrieve and Generate
retriever = vs.as_retriever(
    search_type="similarity",
    search_kwargs={"k": 3},
)

chain = (  
    {"context": itemgetter("table")| retriever, "data_catalog": itemgetter("data_catalog"), "table": itemgetter("table")}
    | prompt
    | model
    | StrOutputParser()
)

TableInputFromLLM = chain.invoke({"data_catalog":glue_data_catalog, "table":table})
print(TableInputFromLLM)

The following is the response from the LLM:

{
  "Name": "persons",
  "Description": "This table contains information about individual persons, including their names, identifiers, contact details, and other personal information. It follows the Popolo data specification for representing persons involved in government and organizations. The 'person_id' column relates a person to an organization through the 'memberships' table.",
  "StorageDescriptor": {
    "Columns": [
      {
        "Name": "family_name",
        "Type": "string",
        "Comment": "The family or last name of the person."
      },
      {
        "Name": "name",
        "Type": "string",
        "Comment": "The full name of the person."
      },
      {
        "Name": "links",
        "Type": "array<struct<note:string,url:string>>",
        "Comment": "An array of links related to the person, with a note and URL for each link."
      },
      {
        "Name": "gender",
        "Type": "string",
        "Comment": "The gender of the person."
      },
      {
        "Name": "image",
        "Type": "string",
        "Comment": "A URL or path to an image representing the person."
      },
      {
        "Name": "identifiers",
        "Type": "array<struct<scheme:string,identifier:string>>",
        "Comment": "An array of identifiers for the person, with a scheme and identifier value for each."
      },
      {
        "Name": "other_names",
        "Type": "array<struct<lang:string,note:string,name:string>>",
        "Comment": "An array of other names the person may be known by, with language, note, and name for each."
      },
      {
        "Name": "sort_name",
        "Type": "string",
        "Comment": "The name to be used for sorting or alphabetical ordering of the person."
      },
      {
        "Name": "images",
        "Type": "array<struct<url:string>>",
        "Comment": "An array of URLs or paths to additional images representing the person."
      },
      {
        "Name": "given_name",
        "Type": "string",
        "Comment": "The given or first name of the person."
      },
      {
        "Name": "birth_date",
        "Type": "string",
        "Comment": "The date of birth of the person."
      },
      {
        "Name": "id",
        "Type": "string",
        "Comment": "The unique identifier for the person. This is likely a primary key."
      },
      {
        "Name": "contact_details",
        "Type": "array<struct<type:string,value:string>>",
        "Comment": "An array of contact details for the person, with a type and value for each."
      },
      {
        "Name": "death_date",
        "Type": "string",
        "Comment": "The date of death of the person, if applicable."
      }
    ],
    "Location": "s3:<your-s3-bucket>/persons/",
    "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
    "SerdeInfo": {
      "SerializationLibrary": "org.openx.data.jsonserde.JsonSerDe"
    }
  }
}

Similar to the first approach, you can validate the output to make sure it conforms to the AWS Glue API.

Update the Data Catalog with new metadata

Now that you have generated the metadata, you can update the Data Catalog:

response = glue_client.update_table(DatabaseName=database, TableInput= json.loads(TableInputFromLLM) )
print(f"Table {table} metadata updated!")

Let’s inspect the technical metadata generated. You should now see a newer version in the Data Catalog for the persons table. You can access schema versions on the AWS Glue console.

Note the persons table description this time. It should differ slightly from the descriptions provided earlier:

  • In-context learning table description – “This table contains information about persons, including their names, identifiers, contact details, birth and death dates, and associated images and links. The ‘id’ column is the primary key for this table.”
  • RAG table description – “This table contains information about individual persons, including their names, identifiers, contact details, and other personal information. It follows the Popolo data specification for representing persons involved in government and organizations. The ‘person_id’ column relates a person to an organization through the ‘memberships’ table.”

The LLM demonstrated knowledge around the Popolo specification, which was part of the documentation provided to the LLM.

Clean up

Now that you have completed the steps described in the post, don’t forget to clean up the resources with the code provided in the notebook so you don’t incur unnecessary costs.

Conclusion

In this post, we explored how you can use generative AI, specifically Amazon Bedrock FMs, to enrich the Data Catalog with dynamic metadata to improve the discoverability and understanding of existing data assets. The two approaches we demonstrated, in-context learning and RAG, showcase the flexibility and versatility of this solution. In-context learning works well for AWS Glue databases with a small number of tables, whereas the RAG approach uses external documentation to generate more accurate and detailed metadata, making it suitable for larger and more complex data landscapes. By implementing this solution, you can unlock new levels of data intelligence, empowering your organization to make more informed decisions, drive data-driven innovation, and unlock the full value of your data. We encourage you to explore the resources and recommendations provided in this post to further enhance your data management practices.


About the Authors

Manos Samatas is a Principal Solutions Architect in Data and AI with Amazon Web Services. He works with government, non-profit, education and healthcare customers in the UK on data and AI projects, helping build solutions using AWS. Manos lives and works in London. In his spare time, he enjoys reading, watching sports, playing video games and socialising with friends.

Anastasia Tzeveleka is a Senior GenAI/ML Specialist Solutions Architect at AWS. As part of her work, she helps customers across EMEA build foundation models and create scalable generative AI and machine learning solutions using AWS services.

Expand data access through Apache Iceberg using Delta Lake UniForm on AWS

Post Syndicated from Tomohiro Tanaka original https://aws.amazon.com/blogs/big-data/expand-data-access-through-apache-iceberg-using-delta-lake-uniform-on-aws/

The landscape of big data management has been transformed by the rising popularity of open table formats such as Apache Iceberg, Apache Hudi, and Linux Foundation Delta Lake. These formats, designed to address the limitations of traditional data storage systems, have become essential in modern data architectures. As organizations adopt various open table formats to suit their specific needs, the demand for interoperability between these formats has grown significantly. This interoperability is crucial for enabling seamless data access, reducing data silos, and fostering a more flexible and efficient data ecosystem.

Delta Lake UniForm is an open table format extension designed to provide a universal data representation that can be efficiently read by different processing engines. It aims to bridge the gap between various data formats and processing systems, offering a standardized approach to data storage and retrieval. With UniForm, you can read Delta Lake tables as Apache Iceberg tables. This expands data access to broader options of analytics engines.

This post explores how to start using Delta Lake UniForm on Amazon Web Services (AWS). You can learn how to query Delta Lake native tables through UniForm from different data warehouses or engines such as Amazon Redshift as an example of expanding data access to more engines.

How Delta Lake UniForm works

UniForm allows other table format clients such as Apache Iceberg to access Delta Lake tables. Under the hood, UniForm generates Iceberg metadata files (including metadata and manifest files) that are required for Iceberg clients to access the underlying data files in Delta Lake tables. Both Delta Lake and Iceberg metadata files reference the same data files. UniForm generates multiple table format metadata without duplicating the actual data files. When an Iceberg client reads a UniForm table, it first accesses the Iceberg metadata files for the UniForm table, which then allows the Iceberg client to read the underlying data files.

There are two options to use UniForm:

  • Create a new Delta Lake table with UniForm
  • Enable UniForm on your existing Delta Lake table

First, to create a new Delta Lake table enabling UniForm, you configure table properties for UniForm in a CREATE TABLE DDL query. The table properties are 'delta.universalFormat.enabledFormats'='iceberg' and 'delta.enableIcebergCompatV2'='true'. When these options are set to the CREATE TABLE query, Iceberg metadata files are generated along with Delta Lake metadata files. In addition to these options, Delta Lake table protocol versions that define supported features by the table such as delta.minReaderVersion and delta.minWriterVersion are required to be set to 2 and 7 or more respectively. For more information about the table protocol versions, refer to What is a table protocol specification? in Delta Lake public document. Appendix 1. Create a new Delta Lake table with UniForm shows an example query to create a new Delta Lake UniForm table.

You can also enable UniForm on an existing Delta Lake table. This option is suitable if you have Delta Lake tables in your environment. Enabling UniForm doesn’t affect your current operations on the Delta Lake tables. To enable UniForm on a Delta Lake table, run REORG TABLE db.existing_delta_lake_table APPLY (UPGRADE UNIFORM(ICEBERG_COMPAT_VERSION=2)). After running this query, Delta Lake automatically generates Iceberg metadata files for the Iceberg client. In the example in this post, you run this option and enable UniForm after you create a Delta Lake table.

For the information about enabling UniForm, refer to Enable Delta Lake UniForm in the Delta Lake public document. Note that the extra package (delta-iceberg) is required to create a UniForm table in AWS Glue Data Catalog. The extra package is also required to generate Iceberg metadata along with Delta Lake metadata for the UniForm table.

Example use case

A fictional company built a data lake with Delta Lake on Amazon Simple Storage Service (Amazon S3) that’s mainly used through Amazon Athena. According to its usage expansion, this company wants to expand data access to cloud-based data warehouses such as Amazon Redshift for flexible analytics use cases.

There are a few challenges to achieve this requirement. Delta Lake isn’t natively supported in Amazon Redshift. For those data warehouses, Delta Lake tables need to be converted to manifest tables, which requires additional operational overhead. You need to run the GENERATE command on Spark or use a crawler in AWS Glue to generate manifest tables, and you need to sync those manifest tables every time the Delta tables are updated.

Delta Lake UniForm can be a solution to meet this requirement. With Delta Lake UniForm, you can make the Delta Table compatible with the other open table formats such as Apache Iceberg, which is natively supported in Amazon Redshift. Users can query those Delta Lake tables as Iceberg tables through UniForm.

The following diagram describes the architectural overview to achieve that requirement.

bdb4538_solution-overview

In this tutorial, you create a Delta Lake table with a synthetic review dataset that includes different products and customer reviews and enable UniForm on that Delta Lake table to make it accessible from Amazon Redshift. Each component works as follows in this scenario:

  • Amazon EMR (Amazon EMR on EC2 cluster with Apache Spark): An Apache Spark application on an Amazon EMR cluster creates a Delta Lake table and enables UniForm on it. Only Delta Lake client can write to the Delta Lake UniForm table, making Amazon EMR act as a writer.
  • Amazon Redshift: Amazon Redshift uses Iceberg clients to read records from the Delta Lake UniForm table. It’s limited to reading records from the table and cannot write to it.
  • Amazon S3 and AWS Glue Data Catalog: These are used to manage the underlying files and the catalog of the Delta Lake UniForm table. The data and metadata files for the table are stored in an S3 bucket. The table is registered in AWS Glue Data Catalog.

Set up resources

In this section, you complete the following resource setup:

  1. Launch an AWS CloudFormation template to configure resources such as S3 buckets, an Amazon Virtual Private Cloud (Amazon VPC) and a subnet, a database for Delta Lake in Data Catalog, AWS Identity and Access Management (IAM) policy and role with required permissions for Amazon EMR Studio, and an EC2 instance profile for Amazon EMR on EC2 cluster
  2. Launch an Amazon EMR on EC2 cluster
  3. Create an Amazon EMR Studio Workspace
  4. Upload a Jupyter Notebook on Amazon EMR Studio Workspace
  5. Launch a CloudFormation template to configure Amazon Redshift Serverless and relevant subnets

Launch a CloudFormation template to configure basic resources

You use a provided CloudFormation template to set up resources to build Delta Lake UniForm environments. The template creates the following resources.

  • An S3 bucket to store the Delta Lake table data
  • An S3 bucket to store an Amazon EMR Studio Workspace metadata and configuration files
  • An IAM role for Amazon EMR Studio
  • An EC2 instance profile for Amazon EMR on EC2 cluster
  • VPC and subnet for an Amazon EMR on EC2 cluster
  • A database for a Delta Lake table in Data Catalog

Complete the following steps to deploy the resources.

  1. Choose Launch stack:

  1. For Stack name, enter delta-lake-uniform-on-aws. For the Parameters, DeltaDatabaseName, PublicSubnetForEMRonEC2, and VpcCIDRForEMRonEC2 are set by default. You can also change the default values. Then, choose Next.
  2. Choose Next.
  3. Choose I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Submit.
  5. After the stack creation is complete, check the Outputs. The resource values are used in the following sections and in the Appendices.

Launch an Amazon EMR on EC2 cluster

Complete the following steps to create an Amazon EMR on EC2 cluster.

  1. Open the Amazon EMR on EC2 console.
  2. Choose Create cluster.
  3. Enter delta-lake-uniform-blog-post in Name and confirm choosing emr-7.3.0 as its release label.
  4. For Application bundle, select Spark 3.5.1Hadoop 3.3.6 and JupyterEnterpriseGateway 2.6.0.
  5. For AWS Glue Data Catalog settings, enable Use for Spark table metadata.
  6. For Networking, enter the values from the CloudFormation Outputs tab for VpcForEMR and PublicSubnetForEMR into Virtual private cloud (VPC) andSubnet respectively. For EC2 security groups, keep Create ElasticMapReduce-Primary for Primary node, and Create ElasticMapReduce-Core for Core and task nodes. The security groups for the Amazon EMR primary and core nodes are automatically created.
  7. For Cluster logs, enter s3://<DeltaLakeS3Bucket>/emr-cluster-logs as the Amazon S3 location. Replace <DeltaLakeS3Bucket> with the S3 bucket from the CloudFormation stack Outputs tab.
  8. For Software settings, select Load JSON from Amazon S3 and enter s3://aws-blogs-artifacts-public/artifacts/BDB-4538/config.json as the Amazon S3 location.
  9. For Amazon EMR service role in Identity and Access Management (IAM) roles section, choose Create a service role. Then, set default to Security Group. If there are existing security groups for Amazon EMR Primary and Core or Task nodes, set those security groups to Security Group.
  10. For EC2 instance profile for Amazon EMR, choose Choose an existing instance profile, and set EMRInstanceProfileRole to Instance profile.
  11. After reviewing the configuration, choose Create cluster.
  12. After the cluster status is Waiting on the Amazon EMR console, the cluster setup is complete (it approximately takes 10 minutes).

Create an Amazon EMR Studio Workspace

Complete the following steps to create an Amazon EMR Studio Workspace to use Delta Lake UniForm on Amazon EMR on EC2.

  1. Open Amazon EMR Studio console.
  2. Choose Create Studio.
  3. For Setup options, choose Custom.
  4. For Studio settings, enter delta-lake-uniform-studio as the Studio name.
  5. For S3 location for Workspace storage, choose Select existing location as a Workspace storage. The S3 location (s3://aws-emr-studio-<ACCOUNT_ID>-<REGION>-delta-lake-uniform-on-aws) can be obtained from EMRStudioS3Bucket on the CloudFormation Outputs tab. Then, choose EMRStudioRole as the IAM role (you can find the IAM Role name on the CloudFormation Outputs tab).
  6. For Workspace settings, enter delta-lake-workspace as the Workspace name.
  7. In Networking and security, choose the VPC ID and Subnet ID that you created in Launch an AWS CloudFormation template. You can obtain the VPC ID and Subnet ID from the VpcForEMR and PublicSubnetForEMR keys on the CloudFormation Outputs tab respectively.
  8. After reviewing the settings, choose Create Studio and launch Workspace.
  9. After creating the Studio Workspace is complete, you are redirected to Jupyter Notebook.

Upload Jupyter Notebook

Complete the following steps to configure a Jupyter Notebook to use Delta Lake UniForm with Amazon EMR.

  1. Download delta-lake-uniform-on-aws.ipynb.
  2. Choose the arrow icon at the top of the page and upload the Notebook you just downloaded.

  1. Choose and open the notebook (delta-lake-uniform-on-aws.ipynb) you uploaded in the left pane.
  2. After the notebook is opened, choose EMR Compute in the navigation pane.

  1. Attach the Amazon EMR on EC2 cluster you created in the previous section. Choose EMR on EC2 cluster and set the cluster you created previously to EMR on EC2 cluster, then choose Attach.
  2. After attaching the cluster is successful, Cluster is attached to the Workspace is displayed on the console.

Create a workgroup and a namespace for Amazon Redshift Serverless

For this step, you configure a workgroup and a namespace for Amazon Redshift Serverless to run queries on a Delta Lake UniForm table. You also configure two subnets in the same VPC created by the CloudFormation stack delta-lake-uniform-on-aws. To deploy the resources, complete the following steps:

  1. Choose Launch stack:

  1. For Stack name, enter redshift-serverless-for-delta-lake-uniform.
  2. For Parameters, enter the Availability Zone and an IP range for each subnet. The VPC ID is automatically retrieved from the CloudFormation stack you created in Launch an AWS CloudFormation template to configure basic resources. If you change the default subnet, note that at least one subnet needs to be the same subnet you created for the Amazon EMR on EC2 cluster (by default, the subnet for Amazon EMR on EC2 cluster is automatically retrieved during this CloudFormation stack creation). You can check the subnet for the cluster on the CloudFormation Outputs Then, choose Next.
  3. Choose Next again, and then choose Submit.
  4. After the stack creation is complete, check the CloudFormation Outputs Make a note of the two Subnet IDs on the Outputs tab to use later in Run queries from Amazon Redshift against the UniForm table.

Now you’re ready to use Delta Lake UniForm on Amazon EMR.

Enable Delta Lake UniForm

Start by creating a Delta Lake table that contains the customer review dataset. After creating the table, run REORG query to enable UniForm on the Delta Lake table.

Create a Delta Lake table

Complete the following steps to create a Delta Lake table based on a customer review dataset and review the table metadata.

  1. Return to the Jupyter Notebook connected to the Amazon EMR on EC2 cluster and run the following cell to add delta-iceberg.jar to use UniForm and configure the spark extension.

  1. Initialize the SparkSession. The following configuration is necessary to use Iceberg through UniForm. Before running the code, replace <DeltaLakeS3Bucket> with the name of the S3 bucket for Delta Lake, which you can find on the CloudFormation stack Outputs tab.

bdb4538-2_2-notebook

  1. Create a Spark DataFrame from customer reviews.

bdb4538-2_3-notebook_df

  1. Create a Delta Lake table with the customer reviews dataset. This step takes approximately 5 minutes.

bdb4538-2_4-notebook

  1. Run DESCRIBE EXTENDED {DB_TBL} in the next cell to review the table. The output includes the table schema, location, table properties, and so on.

bdb4538-2_5-notebook

The Delta Lake table creation is complete. Next, enable UniForm on this Delta Lake table.

Run REORG query to enable UniForm

To allow an Iceberg client to access the Delta Lake table you created, enable UniForm on the table. You can also create a new Delta Lake table with UniForm enabled. For more information, see Appendix 1 at the end of this post. To enable UniForm and review the table metadata, complete the following steps.

  1. Run the following query to enable UniForm on the Delta Lake table. To enable UniForm on an existing Delta Lake table, you run REORG query against the table.

bdb4539-3_1-uniform_reorg

  1. Run DESCRIBE EXTENDED {DB_TBL} in the next cell to review the table metadata and compare it from before and after enabling UniForm. The new properties, such as delta.enableIcebergCompatV2=true and delta.universalFormat.enabledFormats=iceberg, are added to the table properties.

bdb4538-3_2-uniform_metadata

  1. Run aws s3 ls s3://<DeltaLakeS3Bucket>/warehouse/ --recursive to confirm if the Iceberg table metadata is created. Replace <DeltaLakeS3Bucket> with the S3 bucket from the CloudFormation Outputs tab. The following screenshot shows the command output of table metadata and data files. You can confirm that Delta Lake UniForm generates both Iceberg metadata and Delta Lake metadata files as indicated by the red rectangles below.

bdb4538-3_3-uniform_metadata

  1. Before querying the Delta Lake UniForm table from an Iceberg client, run the following analytic query for the Delta Lake UniForm table from Amazon EMR on EC2 side, and review the reviews count by each product category.

bdb4538-3_4-uniform_query

  1. The query result shows the output of the reviews count by product_category:

Enabling UniForm on the Delta Lake table is complete, and now you can query the Delta Lake table from an Iceberg client. Next, you query the Delta Lake table as an Iceberg table from Amazon Redshift.

Run queries against the UniForm table from Amazon Redshift

In the previous section, you enabled UniForm on your existing Delta Lake table. This allows you to run queries on a Delta Lake table as if it were an Iceberg table from Amazon Redshift. In this section, you run an analytic query on the UniForm table using Amazon Redshift Serverless and add records with a new product category to the UniForm table through the Jupyter Notebook connected to the Amazon EMR on EC2 cluster. Then, you verify the added records with another analytic query from Amazon Redshift. You can confirm that Delta Lake UniForm enables Amazon Redshift to query the Delta Lake table through this section.

Query the UniForm table from Amazon Redshift Serverless

  1. Open Amazon Redshift Serverless console
  2. In Namespaces/Workgroups, select the delta-lake-uniform-namespace that you created using the CloudFormation stack.
  3. Choose Query data on the right top corner to open the Amazon Redshift query editor.
  4. After opening the editor, select the delta-lake-uniform-workgroup workgroup in the left pane.
  5. Choose Create connection.
  6. After you successfully create a connection, you can see the delta_uniform_db database and customer_review table you created in the left pane of the editor.
  7. Copy and paste the following analytic query to the editor and choose Run.
SELECT product_category, count(*) as count_by_product_category 
FROM "awsdatacatalog"."delta_uniform_db"."customer_reviews"
GROUP BY product_category ORDER BY count_by_product_category DESC
  1. The editor shows the same result of the review count by product_category as you obtained from Jupyter Notebook in Run REORG query to enable UniForm.

bdb4538-4_1-rs_result

Add new product category records into Delta Lake UniForm table from Amazon EMR

Go back to the Jupyter Notebook on Amazon EMR Workspace to add new records with a new product category (Books) into the Delta Lake UniForm table. After adding the records, query the UniForm table again from Amazon Redshift Serverless.

  1. On the Jupyter Notebook, go to Add new product category records into the UniForm table and run the following cell to load new records.

bdb4538-4_2-append_books

  1. Run the following cell and review the five records with Books as the product category. The following screenshot shows the output of this code.

bdb4538-4_3-append_listbooks

  1. Add the new reviews with Books product category. This takes around 2 minutes.

bdb4538-4_4-append_append

In the next section, you run a query on the UniForm table from Amazon Redshift Serverless to check if the new records with the Books product category have been added.

Review the added records in Delta Lake UniForm table from Amazon Redshift Serverless

To check if the result output includes the records of Books product category:

  1. On the query editor of Amazon Redshift, run the following query and check if the result output includes the records of Books product category.
SELECT product_category, count(*) as count_by_product_category 
FROM "awsdatacatalog"."delta_uniform_db"."customer_reviews"
GROUP BY product_category ORDER BY count_by_product_category DESC
  1. The following screenshot shows the output of the query you ran in the previous step. You can confirm the new product category Books has been added to the table from Amazon Redshift side.

bdb4538-4_5-rs-result
Now you can query from Amazon Redshift against the Delta Lake table by enabling Delta Lake UniForm.

Clean up resources

To clean up your resources, complete the following steps:

  1. In the Amazon EMR Workspaces console, choose Actions and then Delete to delete the workspace.
  2. Choose Delete to delete the Studio.
  3. In the Amazon EMR on EC2 console, choose Terminate to delete the Amazon EMR on EC2 cluster.
  4. In the Amazon S3 console, choose Empty to delete all objects in the following S3 buckets.
    1. The S3 bucket for Amazon EMR Studio such as aws-emr-studio-<ACCOUNT_ID>-<REGION>-delta-lake-uniform-on-aws. Replace <ACCOUNT_ID> and <REGION> with your account ID and the bucket’s region.
    2. The S3 bucket for Delta Lake tables such as delta-lake-uniform-on-aws-deltalakes3bucket-abcdefghijk.
  5. After you confirm the two buckets are empty, delete the CloudFormation stack redshift-serverless-for-delta-lake-uniform.
  6. After the first CloudFormation stack has been deleted, delete the CloudFormation stack delta-lake-uniform-on-aws.

Conclusion

Delta Lake UniForm on AWS represents an advancement in addressing the challenges of data interoperability and accessibility in modern big data architectures. By enabling Delta Lake tables to be read as Apache Iceberg tables, UniForm expands data access capabilities, allowing organizations to use a broader range of analytics engines and data warehouses such as Amazon Redshift.

The practical implications of this technology are substantial, offering new possibilities for data analysis and insights across diverse platforms. As organizations continue to navigate the complexities of big data, solutions like Delta Lake UniForm that promote interoperability and reduce data silos will become increasingly valuable.

By adopting these advanced open table formats and using cloud platforms such as AWS, organizations can build more robust and efficient data ecosystems. This approach not only enhances the value of existing data assets but also fosters a more agile and adaptable data strategy, ultimately driving innovation and improving decision-making processes in our data-driven world.

Appendix 1: Create a new Delta Lake table with UniForm

You can create a Delta Lake table with UniForm enabled using the following DDL.

CREATE TABLE IF NOT EXISTS delta_uniform_db.customer_reviews_create (
   marketplace string,
   customer_id string,
   review_id string,
   product_id string,
   product_title string,
   star_rating bigint,
   helpful_votes bigint,
   total_votes bigint,
   insight string,
   review_headline string,
   review_body string,
   review_date timestamp,
   review_year bigint,
   product_category string)
USING delta
TBLPROPERTIES ( 
   'delta.universalFormat.enabledFormats'='iceberg',
   'delta.enableIcebergCompatV2'='true',
   'delta.minReaderVersion'='2',
   'delta.minWriterVersion'='7')

Appendix 2: Run queries from Snowflake against the UniForm table

Delta Lake UniForm also allows you to run queries on a Delta Lake table from Snowflake. In this section, you run the same analytic query on the UniForm table using Snowflake as you previously did using Amazon Redshift Serverless in Run queries from Amazon Redshift against the UniForm table. Then you confirm that the query results from Snowflake match the results obtained from the Amazon Redshift Serverless query.

Configure IAM roles for Snowflake to access AWS Glue Data Catalog and Amazon S3

To query the Delta Lake UniForm table in Data Catalog from Snowflake, the following configurations are required.

  1. IAM roles: Create IAM roles for Snowflake to access Data Catalog and Amazon S3.
  2. Data Catalog integration with Snowflake: Snowflake provides two catalog options for Iceberg tables such as Using Snowflake as the Iceberg catalog and Using an external catalog such as Data Catalog. In this post, you choose AWS Glue Data Catalog as an external catalog. For information about the catalog options, refer to Iceberg catalog options in the Snowflake public documentation.
  3. An external volume creation for Amazon S3: To access the UniForm table from Snowflake, an external volume for Amazon S3 needs to be configured. With this configuration, Snowflake can connect the S3 bucket that you created for Iceberg tables. For information about the external volume, refer to Configure an external volume for Iceberg tables.

Create IAM roles for Snowflake to access AWS Glue Data Catalog and Amazon S3

Create the following two IAM roles for Snowflake to access AWS Glue Data Catalog and Amazon S3.

  • SnowflakeIcebergGlueCatalogRole: This IAM role is used for Snowflake to access the Delta Lake UniForm table in AWS Glue Data Catalog.
  • SnowflakeIcebergS3Role: This IAM role is used for Snowflake to access the table’s underlying data in the S3 bucket.

To configure the IAM roles, complete the following steps:

  1. Choose Launch stack:

  1. Enter snowflake-iceberg as the stack name and choose Next.
  2. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  3. Choose Submit.
  4. After the stack creation is complete, check the CloudFormation Outputs tab. Make a note of the names and ARNs of the two IAM roles, which are used in the following section.

Create an AWS Glue Data Catalog Integration

Create a catalog integration for AWS Glue Data Catalog. For more information about the catalog integration for AWS Glue Data Catalog, refer to Configure a catalog integration for AWS Glue in the Snowflake public documentation. To configure the catalog integration, complete the following steps:

  1. Access your Snowflake account and open an empty worksheet (query editor).
  2. Run the following query and create a catalog integration with AWS Glue Data Catalog. Replace <YOUR_ACCOUNT_ID> with the IAM role ARN from the snowflake-iceberg CloudFormation Ouputs tab, and replace <REGION> with the region of AWS Glue Data Catalog.
CREATE CATALOG INTEGRATION glue_catalog_integration
  CATALOG_SOURCE=GLUE
  CATALOG_NAMESPACE='delta_uniform_db'
  TABLE_FORMAT=ICEBERG
  GLUE_AWS_ROLE_ARN='arn:aws:iam::<YOUR_ACCOUNT_ID>:role/SnowflakeIcebergGlueCatalogRole'
  GLUE_CATALOG_ID='<YOUR_ACCOUNT_ID>'
  GLUE_REGION='<REGION>'
  ENABLED=TRUE;
  1. Retrieve GLUE_AWS_IAM_USER_ARN and GLUE_AWS_EXTERNAL_ID by using DESCRIBE CATALOG INTEGRATION glue_catalog_integration in the editor. The output is similar to the following:
+------------------------------------------------------------------------------------------------------------------------------+
| property                 | property_type | property_value                                                 | property_default |
|--------------------------+---------------+----------------------------------------------------------------+------------------|
| ENABLED                  | Boolean       | true                                                           | false            |
| CATALOG_SOURCE           | String        | GLUE                                                           |                  |
| CATALOG_NAMESPACE        | String        | delta_uniform_db                                               |                  |
| TABLE_FORMAT             | String        | ICEBERG                                                        |                  |
| REFRESH_INTERVAL_SECONDS | Integer       | 30                                                             | 30               |
| GLUE_AWS_ROLE_ARN        | String        | arn:aws:iam::123456789012:role/SnowflakeIcebergGlueCatalogRole |                  |
| GLUE_CATALOG_ID          | String        | 123456789012                                                   |                  |
| GLUE_REGION              | String        | us-east-1                                                      |                  |
| GLUE_AWS_IAM_USER_ARN    | String        | arn:aws:iam::123456789012:user/<ID>                            |                  |
| GLUE_AWS_EXTERNAL_ID     | String        | An external ID specified on the IAM Role trust relationships   |                  |
| COMMENT                  | String        |                                                                |                  |
+------------------------------------------------------------------------------------------------------------------------------+
  1. Update the IAM role you created using the CloudFormation stack to enable Snowflake to access AWS Glue Data Catalog using that IAM role. Open Trust Relationships of SnowflakeIcebergGlueCatalogRole on the IAM console, choose Edit and update the trust relationship using the following policy. Replace <GLUE_AWS_IAM_USER_ARN> and <GLUE_AWS_EXTERNAL_ID> with the names you obtained in the previous step.
{
   "Version": "2012-10-17",
   "Statement": [
	  {
		 "Sid": "",
		 "Effect": "Allow",
		 "Principal": {
			"AWS": "<GLUE_AWS_IAM_USER_ARN>"
		 },
		 "Action": "sts:AssumeRole",
		 "Condition": {
			"StringEquals": {
			   "sts:ExternalId": "<GLUE_AWS_EXTERNAL_ID>"
			}
		 }
	  }
   ]
}

You completed setting up the IAM role for Snowflake to access your Data Catalog resources. Next, configure the IAM role for Amazon S3 access.

Register Amazon S3 as an external volume

In this section, you configure an external volume for Amazon S3. Snowflake accesses the UniForm table data files in S3 through the external volume. For the configuration of an external volume for S3, refer to Configure an external volume for Amazon S3 in Snowflake public documentation. To configure the external volume, complete the following steps:

  1. In the query editor, run the following query to create an external volume for the Delta Lake S3 bucket. Replace <DeltaLakeS3Bucket> with the name of the S3 bucket that you created in Launch a CloudFormation template to configure basic resources from the CloudFormation Outputs tab. Replace <ACCOUNT_ID> with your AWS account ID.
CREATE OR REPLACE EXTERNAL VOLUME delta_lake_uniform_s3
   STORAGE_LOCATIONS =
	  (
		 (
			NAME = 'delta-lake-uniform-on-aws'
			STORAGE_PROVIDER = 'S3'
			STORAGE_BASE_URL = 's3://<DeltaLakeS3Bucket>'
			STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<ACCOUNT_ID>:role/SnowflakeIcebergS3Role'
		 )
	  );
  1. Retrieve STORAGE_AWS_IAM_USER_ARN and STORAGE_AWS_EXTERNAL_ID by running DESCRIBE EXTERNAL VOLUME delta_lake_uniform_s3 in the editor. The output is similar to the following:
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| parent_property   | property           | property_type | property_value                                                                                                                       | property_default |
|-------------------+--------------------+---------------+--------------------------------------------------------------------------------------------------------------------------------------+------------------|
|                   | ALLOW_WRITES       | Boolean       | true                                                                                                                                 | true             |
| STORAGE_LOCATIONS | STORAGE_LOCATION_1 | String        | {"NAME":"uniform_s3_location","STORAGE_PROVIDER":"S3","STORAGE_BASE_URL":"s3://<DeltaLakeS3Bucket>","STORAGE_ALLOWED_LOCATIONS":["s3 |                  |
|                   |                    |               | ://<DeltaLakeS3Bucket>/*"],"STORAGE_REGION":"us-east-1","PRIVILEGES_VERIFIED":true,"STORAGE_AWS_ROLE_ARN":"arn:aws:iam::123456789012 |                  |
|                   |                    |               | :role/SnowflakeIcebergS3Role","STORAGE_AWS_IAM_USER_ARN":"arn:aws:iam::123456789012:user/<ID>","STORAGE_AWS_EXTERNAL_ID":"<External  |                  |
|                   |                    |               | ID>",... }                                                                                                                           |                  |
| STORAGE_LOCATIONS | ACTIVE             | String        | uniform_s3_location                                                                                                                  |                  |
+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
  1. Update the IAM role you created using the CloudFormation template (in Create IAM roles for Snowflake to access AWS Glue Data Catalog and Amazon S3) to enable Snowflake to use this IAM role. Open Trust Relationships of SnowflakeIcebergS3Role on the IAM console, choose Edit, and update the trust relationship with the following policy. Replace <STORAGE_AWS_IAM_USER_ARN> and <STORAGE_AWS_EXTERNAL_ID> with the values from the previous step.
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Sid": "",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<STORAGE_AWS_IAM_USER_ARN&>"
      },
      "Action": "sts:AssumeRole",
      "Condition": {
        "StringEquals": {
          "sts:ExternalId": "<STORAGE_AWS_EXTERNAL_ID>"
        }
      }
    }
  ]
}

The next step is to create an Iceberg table to run queries from Snowflake.

Create an Iceberg table in Snowflake

In this section, you create an Iceberg table in Snowflake. The table is an entry point for Snowflake to access the Delta Lake UniForm table in AWS Glue Data Catalog. To create the table, complete the following steps:

  1. (Optional) If you don’t have a database in Snowflake, run CREATE DATABASE <DATABASE_NAME>, replacing <DATABASE_NAME> with a unique database name for the Iceberg table.
  2. Run the following query in the Snowflake query editor. In this case, the database delta_uniform_snow_db is chosen for the table. Configure the following parameters:
    1. EXTERNAL_VOLUME: created by CREATE OR REPLACE EXTERNAL VOLUME query in the previous section, such as delta_lake_uniform_s3.
    2. CATALOG: created by the CREATE CATALOG INTEGRATION query in the previous section, such as glue_catalog_integration.
    3. CATALOG_TABLE_NAME: the name of Delta Lake UniForm table you created in Data Catalog such as customer_reviews.

The complete query is below:

CREATE OR REPLACE ICEBERG TABLE customer_reviews_snow
  EXTERNAL_VOLUME='delta_lake_uniform_s3'
  CATALOG='glue_catalog_integration'
  CATALOG_TABLE_NAME='customer_reviews';

After the table creation is complete, you’re ready to query the UniForm table in AWS Glue Data Catalog from Snowflake.

Query the UniForm table from Snowflake

In this step, you query the UniForm table from Snowflake. Paste and run the following analytic query in the Snowflake query editor.

SELECT product_category, count(*) as count_by_product_category 
FROM customer_reviews_snow 
GROUP BY product_category ORDER BY count_by_product_category DESC

The query result shows the same output as you saw in Review the added records in Delta Lake UniForm table from Amazon Redshift Serverless section.

+--------------------------------------------------+
| PRODUCT_CATEGORY     | COUNT_BY_PRODUCT_CATEGORY |
|----------------------+---------------------------|
| Office_Products      | 9673711                   |
| Books                | 9672664                   |
| Apparel              | 6448747                   |
| Computers            | 3224215                   |
| Beauty_Personal_Care | 3223599                   |
+--------------------------------------------------+

Now you can query from Snowflake against the Delta Lake table by enabling Delta Lake UniForm.


About the Authors

Tomohiro Tanaka is a Senior Cloud Support Engineer at Amazon Web Services. He’s passionate about helping customers use Apache Iceberg for their data lakes on AWS. In his free time, he enjoys a coffee break with his colleagues and making coffee at home.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

What’s new in Cloudflare: Account Owned Tokens and Zaraz Automated Actions

Post Syndicated from Joseph So original https://blog.cloudflare.com/account-owned-tokens-automated-actions-zaraz

In October 2024, we started publishing roundup blog posts to share the latest features and updates from our teams. Today, we are announcing general availability for Account Owned Tokens, which allow organizations to improve access control for their Cloudflare services. Additionally, we are launching Zaraz Automated Actions, which is a new feature designed to streamline event tracking and tool integration when setting up third-party tools. By automating common actions like pageviews, custom events, and e-commerce tracking, it removes the need for manual configurations.

Improving access control for Cloudflare services with Account Owned Tokens

Cloudflare is critical infrastructure for the Internet, and we understand that many of the organizations that build on Cloudflare rely on apps and integrations outside the platform to make their lives easier. In order to allow access to Cloudflare resources, these apps and integrations interact with Cloudflare via our API, enabled by access tokens and API keys. Today, the API Access Tokens and API keys on the Cloudflare platform are owned by individual users, which can lead to some difficulty representing services, and adds an additional dependency on managing users alongside token permissions.

What’s new about Account Owned Tokens

First, a little explanation because the terms can be a little confusing. On Cloudflare, we have both Users and Accounts, and they mean different things, but sometimes look similar. Users are people, and they sign in with an email address. Accounts are not people, they’re the top-level bucket we use to organize all the resources you use on Cloudflare. Accounts can have many users, and that’s how we enable collaboration. If you use Cloudflare for your personal projects, both your User and Account might have your email address as the name, but if you use Cloudflare as a company, the difference is more apparent because your user is “[email protected]” and the account might be “Example Company”. 


Account Owned Tokens are not confined by the permissions of the creating user (e.g. a user can never make a token that can edit a field that they otherwise couldn’t edit themselves) and are scoped to the account they are owned by. This means that instead of creating a token belonging to the user “[email protected]”, you can now create a token belonging to the account “Example Company”.


The ability to make these tokens, owned by the account instead of the user, allows for more flexibility to represent what the access should be used for.

Prior to Account Owned Tokens, customers would have to have a user ([email protected]) create a token to pull a list of Cloudflare zones for their account and ensure their security settings are set correctly as part of a compliance workflow, for example. All of the actions this compliance workflow does are now attributed to joe.smith, and if joe.smith leaves the company and his permissions are revoked, the compliance workflow fails.

With this new release, an Account Owned Token could be created, named “compliance workflow”, with permissions to do this operation independently of [email protected]. All actions this token does are attributable to “compliance workflow”. This token is visible and manageable by all the superadmins on your Cloudflare account. If joe.smith leaves the company, the access remains independent of that user, and all super administrators on the account moving forward can still see, edit, roll, and delete the token as needed.

Any long-running services or programs can be represented by these types of tokens, be made visible (and manageable) by all super administrators in your Cloudflare account, and truly represent the service, instead of the users managing the service. Audit logs moving forward will log that a given token was used, and user access can be kept to a minimum.

Getting started

Account Owned Tokens can be found on the new “API Tokens” tab under the “Manage Account” section of your Cloudflare dashboard, and any Super Administrators on your account have the capability to create, edit, roll, and delete these tokens. The API is the same, but at a new /account/<accountId>/tokens endpoint.



Why/where should I use Account Owned Tokens?

There are a few places we would recommend replacing your User Owned Tokens with Account Owned Tokens:

1. Long-running services that are managed by multiple people: When multiple users all need to manage the same service, Account Owned Tokens can remove the bottleneck of requiring a single person to be responsible for all the edits, rotations, and deletions of the tokens. In addition, this guards against any user lifecycle events affecting the service. If the employee that owns the token for your service leaves the company, the service’s token will no longer be based on their permissions.

2. Cloudflare accounts running any services that need attestable access records beyond user membership: By restricting all of your users from being able to access the API, and consolidating all usable tokens to a single list at the account level, you can ensure that a complete list of all API access can be found in a single place on the dashboard, under “Account API Tokens”.


3. Anywhere you’ve created “Service Users”: “Service Users”, or any identity that is meant to allow multiple people to access Cloudflare, are an active threat surface. They are generally highly privileged, and require additional controls (vaulting, password rotation, monitoring) to ensure non-repudiation and appropriate use. If these operations solely require API access, consolidating that access into an Account Owned Token is safe.

Why/where should I use User Owned Tokens?

There are a few scenarios/situations where you should continue to use User Owned Tokens:

  1. Where programmatic access is done by a single person at an external interface: If a single user has an external interface using their own access privileges at Cloudflare, it still makes sense to use a personal token, so that information access can be traced back to them. (e.g. using a personal token in a data visualization tool that pulls logs from Cloudflare)

  2. User level operations: Any operations that operate on your own user (e.g. email changes, password changes, user preferences) still require a user level token.

  3. Where you want to control resources over multiple accounts with the same credential: As of November 2024, Account Owned Tokens are scoped to a single account. In 2025, we want to ensure that we can create cross-account credentials, anywhere that multiple accounts have to be called in the same set of operations should still rely on API Tokens owned by a user.

  4. Where we currently do not support a given endpoint: We are currently in the process of working through a list of our services to ensure that they all support Account Owned Tokens. When interacting with any of these services that are not supported programmatically, please continue to use User Owned Tokens.

  5. Where you need to do token management programmatically: If you are in an organization that needs to create and delete large numbers of tokens programmatically, please continue to use User Owned Tokens. In late 2024, watch for the “Create Additional Tokens” template on the Account Owned Tokens Page. This template and associated created token will allow for the management of additional tokens.


What does this mean for my existing tokens and programmatic access moving forward?

We do not plan to decommission User Owned Tokens, as they still have a place in our overall access model and are handy for ensuring user-centric workflows can be implemented.

As of November 2024, we’re still working to ensure that ALL of our endpoints work with Account Owned Tokens, and we expect to deliver additional token management improvements continuously, with an expected end date of Q3 2025 to cover all endpoints.

A list of services that support, and do not support, Account Owned Tokens can be found in our documentation.

What’s next?

If Account Owned Tokens could provide value to your or your organization, documentation is available here, and you can give them a try today from the “API Tokens” menu in your dashboard.

Zaraz Automated Actions makes adding tools to your website a breeze


Cloudflare Zaraz is a tool designed to manage and optimize third-party tools like analytics, marketing tags, or social media scripts on websites. By loading these third-party services through Cloudflare’s network, Zaraz improves website performance, security, and privacy. It ensures that these external scripts don’t slow down page loading times or expose sensitive user data, as it handles them efficiently through Cloudflare’s global network, reducing latency and improving the user experience.

Automated Actions are a new product feature that allow users to easily setup page views, custom events, and e-commerce tracking without going through the tedious process of manually setting up triggers and actions.

Why we built Automated Actions

An action in Zaraz is a way to tell a third party tool that a user interaction or event has occurred when certain conditions, defined by triggers, are met. You create actions from within the tools page, associating them with specific tools and triggers.


Setting up a tool in Zaraz has always involved a few steps: configuring a trigger, linking it to a tool action and finally calling zaraz.track(). This process allowed advanced configurations with complex rules, and while it was powerful, it occasionally left users confused — why isn’t calling zaraz.track() enough? We heard your feedback, and we’re excited to introduce Zaraz Automated Actions, a feature designed to make Zaraz easier to use by reducing the amount of work needed to configure a tool.

With Zaraz Automated Actions, you can now automate sending data to your third-party tools without the need to create a manual configuration. Inspired by the simplicity of zaraz.ecommerce(), we’ve extended this ease to all Zaraz events, removing the need for manual trigger and action setup. For example, calling zaraz.track(‘myEvent’) will send your event to the tool without the need to configure any triggers or actions.

Getting started with Automated Actions

When adding a new tool in Zaraz, you’ll now see an additional step where you can choose one of three Automated Actions: pageviews, all other events, or ecommerce. These options allow you to specify what kind of events you want to automate for that tool.


  • Pageviews: Automatically sends data to the tool whenever someone visits a page on your site, without any manual configuration.

  • All other events: Sends all custom events triggered using zaraz.track() to the selected tool, making it easy to automate tracking of user interactions.

  • Ecommerce: Automatically sends all e-commerce events triggered via zaraz.ecommerce() to the tool, streamlining your sales and transaction tracking.

These Automated Actions are also available for all your existing tools, which can be toggled on or off from the tool detail page in your Zaraz dashboard. This flexibility allows you to fine-tune which actions are automated based on your needs.


Custom actions for tools without Automated Action support

Some tools do not support automated actions because the tool itself does not support page view, custom, or e-commerce events. For such tools you can still create your own custom actions, just like before. Custom actions allow you to configure specific events to send data to your tools based on unique triggers. The process remains the same, and you can follow the detailed steps outlined in our Create Actions guide. Remember to set up your trigger first, or choose an existing one, before configuring the action.

Automatically enrich your payload

When creating a custom action, it is now easier to send Event Properties using the Include Event Properties field. When this is toggled on, you can automatically send client-specific data with each action, such as user behavior or interaction details. For example, to send an userID property when sending a click event you can do something like this: zaraz.track(‘click’, { userID: “foo” }).

Additionally, you can enable the Include System Properties option to send system-level information, such as browser, operating system, and more. In your action settings click on “Add Field”, pick the “Include System Properties”, click on confirm and then toggle the field on. 

For a full list of system properties, visit our System Properties reference guide. These options give you greater flexibility and control over the data you send with custom actions.

These two fields replace the now deprecated “Enrich Payload” dropdown field.


Zaraz Automated Actions marks a significant step forward in simplifying how you manage events across your tools. By automating common actions like page views, e-commerce events, and custom tracking, you can save time and reduce the complexity of manual configurations. Whether you’re leveraging Automated Actions for speed or creating custom actions for more tailored use cases, Zaraz offers the flexibility to fit your workflow. 

We’re excited to see how you use this feature. Please don’t hesitate to reach out to us on Cloudflare Zaraz’s Discord Channel — we’re always there fixing issues, listening to feedback, and announcing exciting product updates.

Never miss an update

We’ll continue to share roundup blog posts as we continue to build and innovate. Be sure to follow along on the Cloudflare Blog for the latest news and updates.

Use Amazon Kinesis Data Streams to deliver real-time data to Amazon OpenSearch Service domains with Amazon OpenSearch Ingestion

Post Syndicated from M Mehrtens original https://aws.amazon.com/blogs/big-data/use-amazon-kinesis-data-streams-to-deliver-real-time-data-to-amazon-opensearch-service-domains-with-amazon-opensearch-ingestion/

In this post, we show how to use Amazon Kinesis Data Streams to buffer and aggregate real-time streaming data for delivery into Amazon OpenSearch Service domains and collections using Amazon OpenSearch Ingestion. You can use this approach for a variety of use cases, from real-time log analytics to integrating application messaging data for real-time search. In this post, we focus on the use case for centralizing log aggregation for an organization that has a compliance need to archive and retain its log data.

Kinesis Data Streams is a fully managed, serverless data streaming service that stores and ingests various streaming data in real time at any scale. For log analytics use cases, Kinesis Data Streams enhances log aggregation by decoupling producer and consumer applications, and providing a resilient, scalable buffer to capture and serve log data. This decoupling provides advantages over traditional architectures. As log producers scale up and down, Kinesis Data Streams can be scaled dynamically to persistently buffer log data. This prevents load changes from impacting an OpenSearch Service domain, and provides a resilient store of log data for consumption. It also allows for multiple consumers to process log data in real time, providing a persistent store of real-time data for applications to consume. This allows the log analytics pipeline to meet Well-Architected best practices for resilience (REL04-BP02) and cost (COST09-BP02).

OpenSearch Ingestion is a serverless pipeline that provides powerful tools for extracting, transforming, and loading data into an OpenSearch Service domain. OpenSearch Ingestion integrates with many AWS services, and provides ready-made blueprints to accelerate ingesting data for a variety of analytics use cases into OpenSearch Service domains. When paired with Kinesis Data Streams, OpenSearch Ingestion allows for sophisticated real-time analytics of data, and helps reduce the undifferentiated heavy lifting of creating a real-time search and analytics architecture.

Solution overview

In this solution, we consider a common use case for centralized log aggregation for an organization. Organizations might consider a centralized log aggregation approach for a variety of reasons. Many organizations have compliance and governance requirements that have stipulations for what data needs to be logged, and how long log data must be retained and remain searchable for investigations. Other organizations seek to consolidate application and security operations, and provide common observability toolsets and capabilities across their teams.

To meet such requirements, you need to collect data from log sources (producers) in a scalable, resilient, and cost-effective manner. Log sources may vary between application and infrastructure use cases and configurations, as illustrated in the following table.

Log Producer Example Example Producer Log Configuration
Application Logs AWS Lambda Amazon CloudWatch Logs
Application Agents FluentBit Amazon OpenSearch Ingestion
AWS Service Logs Amazon Web Application Firewall Amazon S3

The following diagram illustrates an example architecture.

You can use Kinesis Data Streams for a variety of these use cases. You can configure Amazon CloudWatch logs to send data to Kinesis Data Streams using a subscription filter (see Real-time processing of log data with subscriptions). If you send data with Kinesis Data Streams for analytics use cases, you can use OpenSearch Ingestion to create a scalable, extensible pipeline to consume your streaming data and write it to OpenSearch Service indexes. Kinesis Data Streams provides a buffer that can support multiple consumers, configurable retention, and built-in integration with a variety of AWS services. For other use cases where data is stored in Amazon Simple Storage Service (Amazon S3), or where an agent writes data such as FluentBit, an agent can write data directly to OpenSearch Ingestion without an intermediate buffer thanks to OpenSearch Ingestion’s built-in persistent buffers and automatic scaling.

Standardizing logging approaches reduces development and operational overhead for organizations. For example, you might standardize on all applications logging to CloudWatch logs when feasible, and also handle Amazon S3 logs where CloudWatch logs are unsupported. This reduces the number of use cases that a centralized team needs to handle in their log aggregation approach, and reduces the complexity of the log aggregation solution. For more sophisticated development teams, you might standardize on using FluentBit agents to write data directly to OpenSearch Ingestion to lower cost when log data doesn’t need to be stored in CloudWatch.

This solution focuses on using CloudWatch logs as a data source for log aggregation. For the Amazon S3 log use case, see Using an OpenSearch Ingestion pipeline with Amazon S3. For agent-based solutions, see the agent-specific documentation for integration with OpenSearch Ingestion, such as Using an OpenSearch Ingestion pipeline with Fluent Bit.

Prerequisites

Several key pieces of infrastructure used in this solution are required to ingest data into OpenSearch Service with OpenSearch Ingestion:

  • A Kinesis data stream to aggregate the log data from CloudWatch
  • An OpenSearch domain to store the log data

When creating the Kinesis data stream, we recommend starting with On-Demand mode. This will allow Kinesis Data Streams to automatically scale the number of shards needed for your log throughput. After you identify the steady state workload for your log aggregation use case, we recommend moving to Provisioned mode, using the number of shards identified in On-Demand mode. This can help you optimize long-term cost for high-throughput use cases.

In general, we recommend using one Kinesis data stream for your log aggregation workload. OpenSearch Ingestion supports up to 96 OCUs per pipeline, and 24,000 characters per pipeline definition file (see OpenSearch Ingestion quotas). This means that each pipeline can support a Kinesis data stream with up to 96 shards, because each OCU processes one shard. Using one Kinesis data stream simplifies the overall process to aggregate log data into OpenSearch Service, and simplifies the process for creating and managing subscription filters for log groups.

Depending on the scale of your log workloads, and the complexity of your OpenSearch Ingestion pipeline logic, you may consider more Kinesis data streams for your use case. For example, you may consider one stream for each major log type in your production workload. Having log data for different use cases separated into different streams can help reduce the operational complexity of managing OpenSearch Ingestion pipelines, and allows you to scale and deploy changes to each log use case separately when required.

To create a Kinesis Data Stream, see Create a data stream.

To create an OpenSearch domain, see Creating and managing Amazon OpenSearch domains.

Configure log subscription filters

You can implement CloudWatch log group subscription filters at the account level or log group level. In both cases, we recommend creating a subscription filter with a random distribution method to make sure log data is evenly distributed across Kinesis data stream shards.

Account-level subscription filters are applied to all log groups in an account, and can be used to subscribe all log data to a single destination. This works well if you want to store all your log data in OpenSearch Service using Kinesis Data Streams. There is a limit of one account-level subscription filter per account. Using Kinesis Data Streams as the destination also allows you to have multiple log consumers to process the account log data when relevant. To create an account-level subscription filter, see Account-level subscription filters.

Log group-level subscription filters are applied on each log group. This approach works well if you want to store a subset of your log data in OpenSearch Service using Kinesis Data Streams, and if you want to use multiple different data streams to store and process multiple log types. There is a limit of two log group-level subscription filters per log group. To create a log group-level subscription filter, see Log group-level subscription filters.

After you create your subscription filter, verify that log data is being sent to your Kinesis data stream. On the Kinesis Data Streams console, choose the link for your stream name.

Choose a shard with Starting position set as Trim horizon, and choose Get records.

You should see records with a unique Partition key column value and binary Data column. This is because CloudWatch sends data in .gzip format to compress log data.

Configure an OpenSearch Ingestion pipeline

Now that you have a Kinesis data stream and CloudWatch subscription filters to send data to the data stream, you can configure your OpenSearch Ingestion pipeline to process your log data. To begin, you create an AWS Identity and Access Management (IAM) role that allows read access to the Kinesis data stream and read/write access to the OpenSearch domain. To create your pipeline, your manager role that is used to create the pipeline will require iam:PassRole permissions to the pipeline role created in this step.

  1. Create an IAM role with the following permissions to read from your Kinesis data stream and access your OpenSearch domain:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "allowReadFromStream",
                "Effect": "Allow",
                "Action": [
                    "kinesis:DescribeStream",
                    "kinesis:DescribeStreamConsumer",
                    "kinesis:DescribeStreamSummary",
                    "kinesis:GetRecords",
                    "kinesis:GetShardIterator",
                    "kinesis:ListShards",
                    "kinesis:ListStreams",
                    "kinesis:ListStreamConsumers",
                    "kinesis:RegisterStreamConsumer",
                    "kinesis:SubscribeToShard"
                ],
                "Resource": [
                    "arn:aws:kinesis:{{region}}:{{account-id}}:stream/{{stream-name}}"
                ]
            },
            {
                "Sid": "allowAccessToOS",
                "Effect": "Allow",
                "Action": [
                    "es:DescribeDomain",
                    "es:ESHttp*"
                ],
                "Resource": [
                    "arn:aws:es:{region}:{account-id}:domain/{domain-name}",
                    "arn:aws:es:{region}:{account-id}:domain/{domain-name}/*"
                ]
            }
        ]
    }

  2. Give your role a trust policy that allows access from osis-pipelines.amazonaws.com:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "",
                "Effect": "Allow",
                "Principal": {
                    "Service": [
                        "osis-pipelines.amazonaws.com"
                    ]
                },
                "Action": "sts:AssumeRole",
                "Condition": {
                    "StringEquals": {
                        "aws:SourceAccount": "{account-id}"
                    },
                    "ArnLike": {
                        "aws:SourceArn": "arn:aws:osis:{region}:{account-id}:pipeline/*"
                    }
                }
            }
        ]
    }

For a pipeline to write data to a domain, the domain must have a domain-level access policy that allows the pipeline role to access it, and if your domain uses fine-grained access control, then the IAM role needs to be mapped to a backend role in the OpenSearch Service security plugin that allows access to create and write to indexes.

  1. After you create your pipeline role, on the OpenSearch Service console, choose Pipelines under Ingestion in the navigation pane.
  2. Choose Create pipeline.
  3. Search for Kinesis in the blueprints, select the Kinesis Data Streams blueprint, and choose Select blueprint.
  4. Under Pipeline settings, enter a name for your pipeline, and set Max capacity for the pipeline to be equal to the number of shards in your Kinesis data stream.

If you’re using On-Demand mode for the data stream, choose a capacity equal to the current number of shards in the stream. This use case doesn’t require a persistent buffer, because Kinesis Data Streams provides a persistent buffer for the log data, and OpenSearch Ingestion tracks its position in the Kinesis data stream over time, preventing data loss on restarts.

  1. Under Pipeline configuration, update the pipeline source settings to use your Kinesis data stream name and pipeline IAM role Amazon Resource Name (ARN).

For full configuration information, see . For most configurations, you can use the default values. By default, the pipeline will write batches of 100 documents every 1 second, and will subscribe to the Kinesis data stream from the latest position in the stream using enhanced fan-out, checkpointing its position in the stream every 2 minutes. You can adjust this behavior as desired to tune how frequently the consumer checkpoints, where it begins in the stream, and use polling to reduce costs from enhanced fan-out.

  source:
    kinesis-data-streams:
      acknowledgments: true
      codec:
        # JSON codec supports parsing nested CloudWatch events into
        # individual log entries that will be written as documents to
        # OpenSearch
        json:
          key_name: "logEvents"
          # These keys contain the metadata sent by CloudWatch Subscription Filters
          # in addition to the individual log events:
          # https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
          include_keys: ['owner', 'logGroup', 'logStream' ]
      streams:
        # Update to use your Kinesis Stream name used in your Subscription Filters:
        - stream_name: "KINESIS_STREAM_NAME"
          # Can customize initial position if you don't want OSI to consume the entire stream:
          initial_position: "EARLIEST"
          # Compression will always be gzip for CloudWatch, but will vary for other sources:
          compression: "gzip"
      aws:
        # Provide the Role ARN with access to KDS. This role should have a trust relationship with osis-pipelines.amazonaws.com
        # This must be the same role used below in the Sink configuration.
        sts_role_arn: "PIPELINE_ROLE_ARN"
        # Provide the region of the Data Stream.
        region: "REGION"
  1. Update the pipeline sink settings to include your OpenSearch domain endpoint URL and pipeline IAM role ARN.

The IAM role ARN must be the same for both the OpenSearch Servicer sink definition and the Kinesis Data Streams source definition. You can control what data gets indexed in different indexes using the index definition in the sink. For example, you can use metadata about the Kinesis data stream name to index by data stream (${getMetadata("kinesis_stream_name")), or you can use document fields to index data depending on the CloudWatch log group or other document data (${path/to/field/in/document}). In this example, we use three document-level fields (data_stream.type, data_stream.dataset, and data_stream.namespace) to index our documents, and create these fields in our pipeline processor logic in the next section:

  sink:
    - opensearch:
        # Provide an AWS OpenSearch Service domain endpoint
        hosts: [ "OPENSEARCH_ENDPOINT" ]
        # Route log data to different target indexes depending on the log context:
        index: "ss4o_${data_stream/type}-${data_stream/dataset}-${data_stream/namespace}"
        aws:
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
          # This role must be the same as the role used above for Kinesis.
          sts_role_arn: "PIPELINE_ROLE_ARN"
          # Provide the region of the domain.
          region: "REGION"
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
          serverless: false

Finally, you can update the pipeline configuration to include processor definitions to transform your log data before writing documents to the OpenSearch domain. For example, this use case adopts Simple Schema for Observability (SS4O) and uses the OpenSearch Ingestion pipeline to create the desired schema for SS4O. This includes adding common fields to associate metadata with the indexed documents, as well as parsing the log data to make data more searchable. This use case also uses the log group name to identify different log types as datasets, and uses this information to write documents to different indexes depending on their use cases.

  1. Rename the CloudWatch event timestamp to mark the observed timestamp when the log was generated using the rename_keys processor, and add the current timestamp as the processed timestamp when OpenSearch Ingestion handled the record using the date processor:
      #  Processor logic is used to change how log data is parsed for OpenSearch.
      processor:
        - rename_keys:
            entries:
            # Include CloudWatch timestamp as the observation timestamp - the time the log
            # was generated and sent to CloudWatch:
            - from_key: "timestamp"
              to_key: "observed_timestamp"
        - date:
            # Include the current timestamp that OSI processed the log event:
            from_time_received: true
            destination: "processed_timestamp"

  2. Use the add_entries processor to include metadata about the processed document, including the log group, log stream, account ID, AWS Region, Kinesis data stream information, and dataset metadata:
        - add_entries:
            entries:
            # Support SS4O common log fields (https://opensearch.org/docs/latest/observing-your-data/ss4o/)
            - key: "cloud/provider"
              value: "aws"
            - key: "cloud/account/id"
              format: "${owner}"
            - key: "cloud/region"
              value: "us-west-2"
            - key: "aws/cloudwatch/log_group"
              format: "${logGroup}"
            - key: "aws/cloudwatch/log_stream"
              format: "${logStream}"
            # Include default values for the data_stream:
            - key: "data_stream/namespace"
              value: "default"
            - key: "data_stream/type"
              value: "logs"
            - key: "data_stream/dataset"
              value: "general"
            # Include metadata about the source Kinesis message that contained this log event:
            - key: "aws/kinesis/stream_name"
              value_expression: "getMetadata(\"stream_name\")"
            - key: "aws/kinesis/partition_key"
              value_expression: "getMetadata(\"partition_key\")"
            - key: "aws/kinesis/sequence_number"
              value_expression: "getMetadata(\"sequence_number\")"
            - key: "aws/kinesis/sub_sequence_number"
              value_expression: "getMetadata(\"sub_sequence_number\")"

  3. Use conditional expression syntax to update the data_stream.dataset fields depending on the log source, to control what index the document is written to, and use the delete_entries processor to delete the original CloudWatch document fields that were renamed:
        - add_entries:
            entries:
            # Update the data_stream fields based on the log event context - in this case
            # classifying the log events by their source (CloudTrail or Lambda).
            # Additional logic could be added to classify the logs by business or application context:
            - key: "data_stream/dataset"
              value: "cloudtrail"
              add_when: "contains(/logGroup, \"cloudtrail\") or contains(/logGroup, \"CloudTrail\")"
              overwrite_if_key_exists: true
            - key: "data_stream/dataset"
              value: "lambda"
              add_when: "contains(/logGroup, \"/aws/lambda/\")"
              overwrite_if_key_exists: true
            - key: "data_stream/dataset"
              value: "apache"
              add_when: "contains(/logGroup, \"/apache/\")"
              overwrite_if_key_exists: true
        # Remove the default CloudWatch fields, as we re-mapped them to SS4O fields:
        - delete_entries:
            with_keys:
              - "logGroup"
              - "logStream"
              - "owner"

  4. Parse the log message fields to allow structured and JSON data to be more searchable in the OpenSearch indexes using the grok and parse_json

Grok processors use pattern matching to parse data from structured text fields. For examples of built-in Grok patterns, see java-grok patterns and dataprepper grok patterns.

    # Use Grok parser to parse non-JSON apache logs
    - grok:
        grok_when: "/data_stream/dataset == \"apache\""
        match:
          message: ['%{COMMONAPACHELOG_DATATYPED}']
        target_key: "http"
    # Attempt to parse the log data as JSON to support field-level searches in the OpenSearch index:
    - parse_json:
        # Parse root message object into aws.cloudtrail to match SS4O standard for SS4O logs
        source: "message"
        destination: "aws/cloudtrail"
        parse_when: "/data_stream/dataset == \"cloudtrail\""
        tags_on_failure: ["json_parse_fail"]
    - parse_json:
        # Parse root message object as JSON when possible for Lambda function logs - can also set up Grok support
        # for Lambda function logs to capture non-JSON logging function data as searchable fields
        source: "message"
        destination: "aws/lambda"
        parse_when: "/data_stream/dataset == \"lambda\""
        tags_on_failure: ["json_parse_fail"]
    - parse_json:
        # Parse root message object as JSON when possible for general logs
        source: "message"
        destination: "body"
        parse_when: "/data_stream/dataset == \"general\""
        tags_on_failure: ["json_parse_fail"]

When it’s all put together, your pipeline configuration will look like the following code:

version: "2"
kinesis-pipeline:
  source:
    kinesis-data-streams:
      acknowledgments: true
      codec:
        # JSON codec supports parsing nested CloudWatch events into
        # individual log entries that will be written as documents to
        # OpenSearch
        json:
          key_name: "logEvents"
          # These keys contain the metadata sent by CloudWatch Subscription Filters
          # in addition to the individual log events:
          # https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#DestinationKinesisExample
          include_keys: ['owner', 'logGroup', 'logStream' ]
      streams:
        # Update to use your Kinesis Stream name used in your Subscription Filters:
        - stream_name: "KINESIS_STREAM_NAME"
          # Can customize initial position if you don't want OSI to consume the entire stream:
          initial_position: "EARLIEST"
          # Compression will always be gzip for CloudWatch, but will vary for other sources:
          compression: "gzip"
      aws:
        # Provide the Role ARN with access to KDS. This role should have a trust relationship with osis-pipelines.amazonaws.com
        # This must be the same role used below in the Sink configuration.
        sts_role_arn: "PIPELINE_ROLE_ARN"
        # Provide the region of the Data Stream.
        region: "REGION"
        
  #  Processor logic is used to change how log data is parsed for OpenSearch.
  processor:
    - rename_keys:
        entries:
        # Include CloudWatch timestamp as the observation timestamp - the time the log
        # was generated and sent to CloudWatch:
        - from_key: "timestamp"
          to_key: "observed_timestamp"
    - date:
        # Include the current timestamp that OSI processed the log event:
        from_time_received: true
        destination: "processed_timestamp"
    - add_entries:
        entries:
        # Support SS4O common log fields (https://opensearch.org/docs/latest/observing-your-data/ss4o/)
        - key: "cloud/provider"
          value: "aws"
        - key: "cloud/account/id"
          format: "${owner}"
        - key: "cloud/region"
          value: "us-west-2"
        - key: "aws/cloudwatch/log_group"
          format: "${logGroup}"
        - key: "aws/cloudwatch/log_stream"
          format: "${logStream}"
        # Include default values for the data_stream:
        - key: "data_stream/namespace"
          value: "default"
        - key: "data_stream/type"
          value: "logs"
        - key: "data_stream/dataset"
          value: "general"
        # Include metadata about the source Kinesis message that contained this log event:
        - key: "aws/kinesis/stream_name"
          value_expression: "getMetadata(\"stream_name\")"
        - key: "aws/kinesis/partition_key"
          value_expression: "getMetadata(\"partition_key\")"
        - key: "aws/kinesis/sequence_number"
          value_expression: "getMetadata(\"sequence_number\")"
        - key: "aws/kinesis/sub_sequence_number"
          value_expression: "getMetadata(\"sub_sequence_number\")"
    - add_entries:
        entries:
        # Update the data_stream fields based on the log event context - in this case
        # classifying the log events by their source (CloudTrail or Lambda).
        # Additional logic could be added to classify the logs by business or application context:
        - key: "data_stream/dataset"
          value: "cloudtrail"
          add_when: "contains(/logGroup, \"cloudtrail\") or contains(/logGroup, \"CloudTrail\")"
          overwrite_if_key_exists: true
        - key: "data_stream/dataset"
          value: "lambda"
          add_when: "contains(/logGroup, \"/aws/lambda/\")"
          overwrite_if_key_exists: true
        - key: "data_stream/dataset"
          value: "apache"
          add_when: "contains(/logGroup, \"/apache/\")"
          overwrite_if_key_exists: true
    # Remove the default CloudWatch fields, as we re-mapped them to SS4O fields:
    - delete_entries:
        with_keys:
          - "logGroup"
          - "logStream"
          - "owner"
    # Use Grok parser to parse non-JSON apache logs
    - grok:
        grok_when: "/data_stream/dataset == \"apache\""
        match:
          message: ['%{COMMONAPACHELOG_DATATYPED}']
        target_key: "http"
    # Attempt to parse the log data as JSON to support field-level searches in the OpenSearch index:
    - parse_json:
        # Parse root message object into aws.cloudtrail to match SS4O standard for SS4O logs
        source: "message"
        destination: "aws/cloudtrail"
        parse_when: "/data_stream/dataset == \"cloudtrail\""
        tags_on_failure: ["json_parse_fail"]
    - parse_json:
        # Parse root message object as JSON when possible for Lambda function logs - can also set up Grok support
        # for Lambda function logs to capture non-JSON logging function data as searchable fields
        source: "message"
        destination: "aws/lambda"
        parse_when: "/data_stream/dataset == \"lambda\""
        tags_on_failure: ["json_parse_fail"]
    - parse_json:
        # Parse root message object as JSON when possible for general logs
        source: "message"
        destination: "body"
        parse_when: "/data_stream/dataset == \"general\""
        tags_on_failure: ["json_parse_fail"]

  sink:
    - opensearch:
        # Provide an AWS OpenSearch Service domain endpoint
        hosts: [ "OPENSEARCH_ENDPOINT" ]
        # Route log data to different target indexes depending on the log context:
        index: "ss4o_${data_stream/type}-${data_stream/dataset}-${data_stream/namespace}"
        aws:
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
          # This role must be the same as the role used above for Kinesis.
          sts_role_arn: "PIPELINE_ROLE_ARN"
          # Provide the region of the domain.
          region: "REGION"
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
          serverless: false
  1. When your configuration is complete, choose Validate pipeline to check your pipeline syntax for errors.
  2. In the Pipeline role section, optionally enter a suffix to create a unique service role that will be used to start your pipeline run.
  3. In the Network section, select VPC access.

For a Kinesis Data Streams source, you don’t need to select a virtual private cloud (VPC), subnets, or security groups. OpenSearch Ingestion only requires these attributes for HTTP data sources that are located within a VPC. For Kinesis Data Streams, OpenSearch Ingestion uses AWS PrivateLink to read from Kinesis Data Streams and write to OpenSearch domains or serverless collections.

  1. Optionally, enable CloudWatch logging for your pipeline.
  2. Choose Next to review and create your pipeline.

If you’re using account-level subscription filters for CloudWatch logs in the account where OpenSearch Ingestion is running, this log group should be excluded from the account-level subscription. This is because OpenSearch Ingestion pipeline logs could cause a recursive loop with the subscription filter that could lead to high volumes of log data ingestion and cost.

  1. In the Review and create section, choose Create pipeline.

When your pipeline enters the Active state, you’ll see logs begin to populate in your OpenSearch domain or serverless collection.

Monitor the solution

To maintain the health of the log ingestion pipeline, there are several key areas to monitor:

  • Kinesis Data Streams metrics – You should monitor the following metrics:
    • FailedRecords – Indicates an issue in CloudWatch subscription filters writing to the Kinesis data stream. Reach out to AWS Support if this metric stays at a non-zero level for a sustained period.
    • ThrottledRecords – Indicates your Kinesis data stream needs more shards to accommodate the log volume from CloudWatch.
    • ReadProvisionedThroughputExceeded – Indicates your Kinesis data stream has more consumers consuming read throughput than supplied by the shard limits, and you may need to move to an enhanced fan-out consumer strategy.
    • WriteProvisionedThroughputExceeded – Indicates your Kinesis data stream needs more shards to accommodate the log volume from CloudWatch, or that your log volume is being unevenly distributed to your shards. Make sure the subscription filter distribution strategy is set to random, and consider enabling enhanced shard-level monitoring on the data stream to identify hot shards.
    • RateExceeded – Indicates that a consumer is incorrectly configured for the stream, and there may be an issue in your OpenSearch Ingestion pipeline causing it to subscribe too often. Investigate your consumer strategy for the Kinesis data stream.
    • MillisBehindLatest – Indicates the enhanced fan-out consumer isn’t keeping up with the load in the data stream. Investigate the OpenSearch Ingestion pipeline OCU configuration and make sure there are sufficient OCUs to accommodate the Kinesis data stream shards.
    • IteratorAgeMilliseconds – Indicates the polling consumer isn’t keeping up with the load in the data stream. Investigate the OpenSearch Ingestion pipeline OCU configuration and make sure there are sufficient OCUs to accommodate the Kinesis data stream shards, and investigate the polling strategy for the consumer.
  • CloudWatch subscription filter metrics – You should monitor the following metrics:
    • DeliveryErrors – Indicates an issue in CloudWatch subscription filter delivering data to the Kinesis data stream. Investigate data stream metrics.
    • DeliveryThrottling – Indicates insufficient capacity in the Kinesis data stream. Investigate data stream metrics.
  • OpenSearch Ingestion metrics – For recommended monitoring for OpenSearch Ingestion, see Recommended CloudWatch alarms.
  • OpenSearch Service metrics – For recommended monitoring for OpenSearch Service, see Recommended CloudWatch alarms for Amazon OpenSearch Service.

Clean up

Make sure you clean up unwanted AWS resources created while following this post in order to prevent additional billing for these resources. Follow these steps to clean up your AWS account:

  1. Delete your Kinesis data stream.
  2. Delete your OpenSearch Service domain.
  3. Use the DeleteAccountPolicy API to remove your account-level CloudWatch subscription filter.
  4. Delete your log group-level CloudWatch subscription filter:
    1. On the CloudWatch console, select the desired log group.
    2. On the Actions menu, choose Subscription Filters and Delete all subscription filter(s).
  5. Delete the OpenSearch Ingestion pipeline.

Conclusion

In this post, you learned how to create a serverless ingestion pipeline to deliver CloudWatch logs in real time to an OpenSearch domain or serverless collection using OpenSearch Ingestion. You can use this approach for a variety of real-time data ingestion use cases, and add it to existing workloads that use Kinesis Data Streams for real-time data analytics.

For other use cases for OpenSearch Ingestion and Kinesis Data Streams, consider the following:

To continue improving your log analytics use cases in OpenSearch, consider using some of the pre-built dashboards available in Integrations in OpenSearch Dashboards.


About the authors

M Mehrtens has been working in distributed systems engineering throughout their career, working as a Software Engineer, Architect, and Data Engineer. In the past, M has supported and built systems to process terrabytes of streaming data at low latency, run enterprise Machine Learning pipelines, and created systems to share data across teams seamlessly with varying data toolsets and software stacks. At AWS, they are a Sr. Solutions Architect supporting US Federal Financial customers.

Arjun Nambiar is a Product Manager with Amazon OpenSearch Service. He focuses on ingestion technologies that enable ingesting data from a wide variety of sources into Amazon OpenSearch Service at scale. Arjun is interested in large-scale distributed systems and cloud-centered technologies, and is based out of Seattle, Washington.

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search applications and solutions. Muthu is interested in the topics of networking and security, and is based out of Austin, Texas.

Incremental refresh for Amazon Redshift materialized views on data lake tables

Post Syndicated from Raks Khare original https://aws.amazon.com/blogs/big-data/incremental-refresh-for-amazon-redshift-materialized-views-on-data-lake-tables/

Amazon Redshift is a fast, fully managed cloud data warehouse that makes it cost-effective to analyze your data using standard SQL and business intelligence tools. You can use Amazon Redshift to analyze structured and semi-structured data and seamlessly query data lakes and operational databases, using AWS designed hardware and automated machine learning (ML)-based tuning to deliver top-tier price performance at scale.

Amazon Redshift delivers price performance right out of the box. However, it also offers additional optimizations that you can use to further improve this performance and achieve even faster query response times from your data warehouse.

One such optimization for reducing query runtime is to precompute query results in the form of a materialized view. Materialized views in Redshift speed up running queries on large tables. This is useful for queries that involve aggregations and multi-table joins. Materialized views store a precomputed result set of these queries and also support incremental refresh capability for local tables.

Customers use data lake tables to achieve cost effective storage and interoperability with other tools. With open table formats (OTFs) such as Apache Iceberg, data is continuously being added and updated.

Amazon Redshift now provides the ability to incrementally refresh your materialized views on data lake tables including open file and table formats such as Apache Iceberg.

In this post, we will show you step-by-step what operations are supported on both open file formats and transactional data lake tables to enable incremental refresh of the materialized view.

Prerequisites

To walk through the examples in this post, you need the following prerequisites:

  1. You can test the incremental refresh of materialized views on standard data lake tables in your account using an existing Redshift data warehouse and data lake. However, if you want to test the examples using sample data, download the sample data. The sample files are ‘|’ delimited text files.
  2. An AWS Identity and Access Management (IAM) role attached to Amazon Redshift to grant the minimum permissions required to use Redshift Spectrum with Amazon Simple Storage Service (Amazon S3) and AWS Glue.
  3. Set the IAM Role as the default role in Amazon Redshift.

Incremental materialized view refresh on standard data lake tables

In this section, you learn how to can build and incrementally refresh materialized views in Amazon Redshift on standard text files in Amazon S3, maintaining data freshness with a cost-effective approach.

  1. Upload the first file, customer.tbl.1, downloaded from the Prerequisites section in your desired S3 bucket with the prefix customer.
  2. Connect to your Amazon Redshift Serverless workgroup or Redshift provisioned cluster using Query editor v2.
  3. Create an external schema.
    create external schema datalake_mv_demo
    from data catalog   
    database 'datalake-mv-demo'
    iam_role default;

  4. Create an external table named customer in the external schema datalake_mv_demo created in the preceding step.
    create external table datalake_mv_demo.customer(
            c_custkey int8,
            c_name varchar(25),
            c_address varchar(40),
            c_nationkey int4,
            c_phone char(15),
            c_acctbal numeric(12, 2),
            c_mktsegment char(10),
            c_comment varchar(117)
        ) row format delimited fields terminated by '|' stored as textfile location 's3://<your-s3-bucket-name>/customer/';

  5. Validate the sample data in the external customer.
    select * from datalake_mv_demo.customer;

  6. Create a materialized view on the external table.
    CREATE MATERIALIZED VIEW customer_mv 
    AS
    select * from datalake_mv_demo.customer;

  7. Validate the data in the materialized view.
    select * from customer_mv limit 5;

  8. Upload a new file customer.tbl.2 in the same S3 bucket and customer prefix location. This file contains one additional record.
  9. Using Query editor v2 , refresh the materialized view customer_mv.
    REFRESH MATERIALIZED VIEW customer_mv;

  10. Validate the incremental refresh of the materialized view when the new file is added.
    select mv_name, status, start_time, end_time
    from SYS_MV_REFRESH_HISTORY
    where mv_name='customer_mv'
    order by start_time DESC;

  11. Retrieve the current number of rows present in the materialized view customer_mv.
    select count(*) from customer_mv;

  12. Delete the existing file customer.tbl.1 from the same S3 bucket and prefix customer. You should only have customer.tbl.2 in the customer prefix of your S3 bucket.
  13. Using Query editor v2, refresh the materialized view customer_mv again.
    REFRESH MATERIALIZED VIEW customer_mv;

  14. Verify that the materialized view is refreshed incrementally when the existing file is deleted.
    select mv_name, status, start_time, end_time
    from SYS_MV_REFRESH_HISTORY
    where mv_name='customer_mv'
    order by start_time DESC;

  15. Retrieve the current row count in the materialized view customer_mv. It should now have one record as present in the customer.tbl.2 file.
    select count(*) from customer_mv;

  16. Modify the contents of the previously downloaded customer.tbl.2 file by altering the customer key from 999999999 to 111111111.
  17. Save the modified file and upload it again to the same S3 bucket, overwriting the existing file within the customer prefix.
  18. Using Query editor v2, refresh the materialized view customer_mv
    REFRESH MATERIALIZED VIEW customer_mv;

  19. Validate that the materialized view was incrementally refreshed after the data was modified in the file.
    select mv_name, status, start_time, end_time
    from SYS_MV_REFRESH_HISTORY
    where mv_name='customer_mv'
    order by start_time DESC;

  20. Validate that the data in the materialized view reflects your prior data changes from 999999999 to 111111111.
    select * from customer_mv;

Incremental materialized view refresh on Apache Iceberg data lake tables

Apache Iceberg is a data lake open table format that’s rapidly becoming an industry standard for managing data in data lakes. Iceberg introduces new capabilities that enable multiple applications to work together on the same data in a transactionally consistent manner.

In this section, we will explore how Amazon Redshift can seamlessly integrate with Apache Iceberg. You can use this integration to build materialized views and incrementally refresh them using a cost-effective approach, maintaining the freshness of the stored data.

  1. Sign in to the AWS Management Console, go to Amazon Athena, and execute the following SQL to create a database in an AWS Glue catalog.
    create database iceberg_mv_demo;

  2. Create a new Iceberg table
    create table iceberg_mv_demo.category (
      catid int ,
      catgroup string ,
      catname string ,
      catdesc string)
      PARTITIONED BY (catid, bucket(16,catid))
      LOCATION 's3://<your-s3-bucket-name>/iceberg/'
      TBLPROPERTIES (
      'table_type'='iceberg',
      'write_compression'='snappy',
      'format'='parquet');

  3. Add some sample data to iceberg_mv_demo.category.
    insert into iceberg_mv_demo.category values
    (1, 'Sports', 'MLB', 'Major League Basebal'),
    (2, 'Sports', 'NHL', 'National Hockey League'),
    (3, 'Sports', 'NFL', 'National Football League'),
    (4, 'Sports', 'NBA', 'National Basketball Association'),
    (5, 'Sports', 'MLS', 'Major League Soccer');

  4. Validate the sample data in iceberg_mv_demo.category.
    select * from iceberg_mv_demo.category;

  5. Connect to your Amazon Redshift Serverless workgroup or Redshift provisioned cluster using Query editor v2.
  6. Create an external schema
    CREATE external schema iceberg_schema
    from data catalog
    database 'iceberg_mv_demo'
    region 'us-east-1'
    iam_role default;

  7. Query the Iceberg table data from Amazon Redshift.
    SELECT *  FROM "dev"."iceberg_schema"."category";

  8. Create a materialized view using the external schema.
    create MATERIALIZED view mv_category as
    select  * from
    "dev"."iceberg_schema"."category";

  9. Validate the data in the materialized view.
    select  * from
    "dev"."iceberg_schema"."category";

  10. Using Amazon Athena, modify the Iceberg table iceberg_mv_demo.category and insert sample data.
    insert into category values
    (12, 'Concerts', 'Comedy', 'All stand-up comedy performances'),
    (13, 'Concerts', 'Other', 'General');

  11. Using Query editor v2, refresh the materialized view mv_category.
    Refresh  MATERIALIZED view mv_category;

  12. Validate the incremental refresh of the materialized view after the additional data was populated in the Iceberg table.
    select mv_name, status, start_time, end_time
    from SYS_MV_REFRESH_HISTORY
    where mv_name='mv_category'
    order by start_time DESC;

  13. Using Amazon Athena, modify the Iceberg table iceberg_mv_demo.category by deleting and updating records.
    delete from iceberg_mv_demo.category
    where catid = 3;
     
    update iceberg_mv_demo.category
    set catdesc= 'American National Basketball Association'
    where catid=4;

  14. Validate the sample data in iceberg_mv_demo.category to confirm that catid=4 has been updated and catid=3 has been deleted from the table.
    select * from iceberg_mv_demo.category;

  15. Using Query editor v2, Refresh the materialized view mv_category.
    Refresh  MATERIALIZED view mv_category;

  16. Validate the incremental refresh of the materialized view after one row was updated and another was deleted.
    select mv_name, status, start_time, end_time
    from SYS_MV_REFRESH_HISTORY
    where mv_name='mv_category'
    order by start_time DESC;

Performance Improvements

To understand the performance improvements of incremental refresh over full recompute, we used the industry-standard TPC-DS benchmark using 3 TB data sets for Iceberg tables configured in copy-on-write. In our benchmark, fact tables are stored on Amazon S3, while dimension tables are in Redshift. We created 34 materialized views representing different customer use cases on a Redshift provisioned cluster of size ra3.4xl with 4 nodes. We applied 1% inserts and deletes on fact tables, i.e., tables store_sales, catalog_sales and web_sales. We ran the inserts and deletes with Spark SQL on EMR serverless. We refreshed all 34 materialized views using incremental refresh and measured refresh latencies. We repeated the experiment using full recompute.

Our experiments show that incremental refresh provides substantial performance gains over full recompute. After insertions, incremental refresh was 13.5X faster on average than full recompute (maximum 43.8X, minimum 1.8X). After deletions, incremental refresh was 15X faster on average (maximum 47X, minimum 1.2X). The following graphs illustrate the latency of refresh.

Inserts

Deletes

Clean up

When you’re done, remove any resources that you no longer need to avoid ongoing charges.

  1. Run the following script to clean up the Amazon Redshift objects.
    DROP  MATERIALIZED view mv_category;
    
    DROP  MATERIALIZED view customer_mv;

  2. Run the following script to clean up the Apache Iceberg tables using Amazon Athena.
    DROP  TABLE iceberg_mv_demo.category;

Conclusion

Materialized views on Amazon Redshift can be a powerful optimization tool. With incremental refresh of materialized views on data lake tables, you can store pre-computed results of your queries over one or more base tables, providing a cost-effective approach to maintaining fresh data. We encourage you to update your data lake workloads and use the incremental materialized view feature. If you’re new to Amazon Redshift, try the Getting Started tutorial and use the free trial to create and provision your first cluster and experiment with the feature.

See Materialized views on external data lake tables in Amazon Redshift Spectrum for considerations and best practices.


About the authors

Raks KhareRaks Khare is a Senior Analytics Specialist Solutions Architect at AWS based out of Pennsylvania. He helps customers across varying industries and regions architect data analytics solutions at scale on the AWS platform. Outside of work, he likes exploring new travel and food destinations and spending quality time with his family.

Tahir Aziz is an Analytics Solution Architect at AWS. He has worked with building data warehouses and big data solutions for over 15+ years. He loves to help customers design end-to-end analytics solutions on AWS. Outside of work, he enjoys traveling and cooking.

Raza Hafeez is a Senior Product Manager at Amazon Redshift. He has over 13 years of professional experience building and optimizing enterprise data warehouses and is passionate about enabling customers to realize the power of their data. He specializes in migrating enterprise data warehouses to AWS Modern Data Architecture.

Enrico Siragusa is a Senior Software Development Engineer at Amazon Redshift. He contributed to query processing and materialized views. Enrico holds a M.Sc. in Computer Science from the University of Paris-Est and a Ph.D. in Bioinformatics from the International Max Planck Research School in Computational Biology and Scientific Computing in Berlin.

Amazon OpenSearch Service announces Standard and Extended Support dates for Elasticsearch and OpenSearch versions

Post Syndicated from Arvind Mahesh original https://aws.amazon.com/blogs/big-data/amazon-opensearch-service-announces-standard-and-extended-support-dates-for-elasticsearch-and-opensearch-versions/

Amazon OpenSearch Service supports 19 versions of Elasticsearch opensource, and 11 versions of OpenSearch. Over the years, we have added several stability, resiliency, and security features to recent engine versions, helping customers derive better value from OpenSearch Service. As software versions grow older, we need to make sure that these versions continue to meet high security and compliance standards. Many of the legacy versions supported on OpenSearch Service, such as Elasticsearch versions 1.5 and 2.3, depend on third-party dependencies that are no longer actively supported. By moving to the latest engine versions, customers can derive maximum benefit from the new features, improved price-performance, and security improvements we make to OpenSearch.

Today, we’re announcing timelines for end of Standard Support and Extended Support for legacy Elasticsearch versions up to 6.7, Elasticsearch versions 7.1 through 7.8, OpenSearch versions from 1.0 through 1.2, and OpenSearch versions 2.3 through 2.9 available on Amazon OpenSearch Service. Versions that are under Standard Support receive regular bug fixes and security fixes, and versions in Extended Support receive critical security fixes and operating system patches for an additional flat fee per normalized instance hour. With Extended Support, we want to make sure that our customers continue to receive critical security fixes for an adequate time, while they plan to upgrade to more recent engine versions. For more details on Extended Support please see the FAQs.

End of Standard Support and Extended Support for Elasticsearch versions

See Table 1 that follows for end of Standard Support and Extended Support dates for legacy Elasticsearch versions available on OpenSearch Service. We recommend that customers running Elasticsearch versions upgrade to the latest OpenSearch versions. All Elasticsearch versions will receive at least 12 months of Extended Support, and version 5.6 will receive 36 months of Extended Support. After Extended Support ends for a version, domains running the specific version will not receive bug fixes or security updates.

Software version End of Standard Support End of Extended Support
Elasticsearch versions 1.5 and 2.3 11/7/2025 11/7/2026
Elasticsearch versions 5.1 to 5.5 11/7/2025 11/7/2026
Elasticsearch version 5.6 11/7/2025 11/7/2028
Elasticsearch versions 6.0 to 6.7 11/7/2025 11/7/2026
Elasticsearch version 6.8 Not announced Not announced
Elasticsearch versions 7.1 to 7.8 11/7/2025 11/7/2026
Elasticsearch version 7.9 Not announced Not announced
Elasticsearch version 7.10 Not announced Not announced

End of Standard Support and Extended Support for OpenSearch versions

For OpenSearch versions running on Amazon OpenSearch Service, we will provide at least 12 months of Standard Support after the end of support date for the corresponding upstream open source OpenSearch version, or 12 months of Standard Support after the release of the next minor version on OpenSearch Service, whichever is longer. All OpenSearch versions will receive at least 12 months of Extended Support after the end of Standard Support date. For more details, check the open source OpenSearch maintenance policy.

See Table 2 that follows for end of Standard Support and Extended Support dates for various OpenSearch versions available on OpenSearch Service. For future updates on versions in Standard Support and Extended Support, follow supported versions.

Software Version End of Standard Support End of Extended Support
OpenSearch versions 1.0 to 1.2 11/7/2025 11/7/2026
OpenSearch version 1.3 Not announced Not announced
OpenSearch versions 2.3 to 2.9 11/7/2025 11/7/2026
OpenSearch versions 2.11 and higher versions Not announced Not announced

Upgrading OpenSearch Service domains: We recommend that you update your domains to the latest available OpenSearch version to derive maximum value out of OpenSearch Service. Minor version upgrades on OpenSearch tend to be seamless because they don’t contain breaking changes, and we recommend moving to the latest minor version, or a version for which end of support has not yet been announced. For example, if you are on OpenSearch version 1.2, you can move to OpenSearch version 1.3, because it’s the last minor version of the 1.x series and because presently it continues to be supported by the open source community and AWS. If you want to choose an Elasticsearch version, and you are running an older 6.x or 7.x version, you can move to version 6.8, or 7.10.

There are various ways to upgrade your cluster to a newer version, and the steps vary depending on the version your domain is running and the version you want to upgrade to. See Upgrading OpenSearch Service domains for detailed instructions on upgrading your domain to a new version. You can also use the Migration Assistant for Amazon OpenSearch Service for upgrading to newer versions

Calculating Extended Support charges: Domains running versions under Extended Support will be charged a flat additional fee per normalized instance hour (NIH). For example, $0.0065 per NIH in the US East (North Virginia) AWS Region. See the pricing page for exact pricing by Region.

NIH is computed as a factor of the instance size (for example, medium or large), and the number of instance hours. For example, if you’re running an m7g.medium.search instance for 24 hours in the US EAST (North Virginia) Region, which is priced at $0.068 per instance hour (on-demand), you will typically pay $1.632 ($0.068×24). If you’re running a version that is in Extended Support, you will pay an additional $0.0065 per NIH, which is computed as $0.0065 x 24 (number of instance hours) x 2 (size normalization factor, which is 2 for medium-sized instances), which comes to $0.312 for Extended Support for 24 hours. The total amount that you will pay for 24 hours will be a sum of the standard instance usage cost and the Extended Support cost, which is $1.944 ($1.632+$0.312, excluding storage cost). The following table shows the normalization factor for various instance sizes in OpenSearch Service.

Instance size Normalization Factor
nano 0.25
micro 0.5
small 1
medium 2
large 4
xlarge 8
2xlarge 16
4xlarge 32
8xlarge 64
9xlarge 72
10xlarge 80
12xlarge 96
16xlarge 128
18xlarge 144
24xlarge 192
32xlarge 256

Summary

We add new capabilities across various vectors to the latest OpenSearch versions, which include new features, performance and resiliency improvements, and security improvements. We recommend that you update to recent OpenSearch versions to get the most benefit out of OpenSearch Service. For any questions on Standard and Extended Support options, see the FAQs. For further questions, contact AWS Support.


About the authors

Arvind Mahesh is a Senior Manager-Product at Amazon Web Services for Amazon OpenSearch Service. He has close to two decades of technology experience across a variety of domains such as Analytics, Search, Cloud, Network Security, and Telecom.

Kuldeep Yadav is a Senior Technical Program Manager at Amazon Web Services who is passionate about driving innovation and complex problem solving. He works closely with teams and customers in ensuring operational excellence and achieving more with less. Outside of work he enjoys trekking and all sports

Jon Handler is a Senior Principal Solutions Architect at Amazon Web Services based in Palo Alto, CA. Jon works closely with OpenSearch and Amazon OpenSearch Service, providing help and guidance to a broad range of customers who have search and log analytics workloads that they want to move to the AWS Cloud. Prior to joining AWS, Jon’s career as a software developer included 4 years of coding a large-scale, ecommerce search engine. Jon holds a Bachelor of the Arts from the University of Pennsylvania, and a Master of Science and a PhD in Computer Science and Artificial Intelligence from Northwestern University.

Introducing Express brokers for Amazon MSK to deliver high throughput and faster scaling for your Kafka clusters

Post Syndicated from Channy Yun (윤석찬) original https://aws.amazon.com/blogs/aws/introducing-express-brokers-for-amazon-msk-to-deliver-high-throughput-and-faster-scaling-for-your-kafka-clusters/

Today, we’re announcing the general availability of Express brokers, a new broker type for Amazon Managed Streaming for Apache Kafka (Amazon MSK). It’s designed to deliver up to three times more throughput per-broker, scale up to 20 times faster, and reduce recovery time by 90 percent as compared to Standard brokers running Apache Kafka. Express brokers come preconfigured with Kafka best practices by default, support Kafka APIs, and provide the same low latency performance that Amazon MSK customers expect, so they can continue using existing client applications without any changes.

Express brokers provide improved compute and storage elasticity for Kafka applications when using Amazon MSK provisioned clusters. Amazon MSK is a fully-managed AWS service that makes it easier for you to build and run highly available and scalable applications based on Apache Kafka.

Let’s dive deeper into some of the key features that Express brokers have and the benefits they provide:

  • Easier operations with hands-free storage management – Express brokers offer unlimited storage without preprovisioning, eliminating disk-related bottlenecks. Cluster sizing is simpler, requiring only ingress and egress throughput divided by recommended per-broker throughput. This removes the need for proactive disk capacity monitoring and scaling, simplifying cluster management and improving resilience by eliminating a potential failure source.
  • Fewer brokers with up to three times throughput per broker – Higher throughput per broker allows for smaller clusters for the same workload. Standard brokers’ throughput must account for client traffic and background operations, with m7g.16xl Standard brokers safely handling 154 MBps ingress. Express brokers use opinionated settings and resource isolation, enabling m7g.16xl size instances to safely manage up to 500 MBps ingress without compromising performance or availability during cluster events.
  • Higher utilization with 20 times faster scaling – Express brokers reduce data movement during scaling, making them up to 20 times faster than Standard brokers. This allows for more quicker and reliable cluster resizing. You can monitor each broker’s ingress throughput capacity and add brokers within minutes, eliminating the need for over-provisioning in anticipation of traffic spikes.
  • Higher resilience with 90 percent faster recovery – Express brokers are designed for mission-critical applications requiring high resilience. They come preconfigured with best-practice defaults, including 3-way replication (RF=3), which reduce failures due to misconfiguration. Express brokers also recover 90 percent faster from transient failures compared to standard Apache Kafka brokers. Express brokers’ rebalancing and recovery use minimal cluster resources, simplifying capacity planning. This eliminates the risk of increased resource utilization and the need for continuous monitoring when right-sizing clusters.

You have choice options in Amazon MSK depending on your workload and preference:

MSK provisioned MSK Serverless
Standard brokers Express brokers
Configuration range Most flexible Flexible Least flexible
Cluster rebalancing Customer managed Customer managed
but up to 20x faster
MSK managed
Capacity management Yes Yes (compute only) No
Storage management Yes No No

Express brokers lower costs, provide higher resiliency, and lower operational overhead, making them the best choice for all Kafka workloads. If you prefer to use Kafka without managing any aspect of its capacity, its configuration, or how it scales, then you can choose Amazon MSK Serverless. This provides a fully abstracted Apache Kafka experience that eliminates the need for any infrastructure management, scales automatically, and charges you on a pay-per-use consumption model that doesn’t require you to optimize resource utilization.

Getting started with Express brokers in Amazon MSK
To get started with Express brokers, you can use the Sizing and Pricing worksheet that Amazon MSK provides. This worksheet helps you estimate the cluster size you’ll need to accommodate your workload and also gives you a rough estimate of the total monthly cost you’ll incur.

The throughput requirements of your workload are the primary factor in the size of your cluster. You should also consider other factors, such as partition and connection count to arrive at the size and number of brokers you’ll need for your cluster. For example, if your streaming application needs 30 MBps of data ingress (write) and 80 MBps data egress (read) capacity, you can use three express.m7g.large brokers to meet your throughput needs (assuming the partition count for your workload is within the maximum number of partitions that Amazon MSK recommends for an m7g.large instance).

The following table shows the recommended maximum ingress, egress, and partition counts per instance size for sustainable and safe operations. You can learn more about these recommendations in the Best practices section of Amazon MSK Developer Guide.

Instance size Ingress (MBps) Egress (MBps)
express.m7g.large 15.6 31.2
express.m7g.4xlarge 124.9 249.8
express.m7g.16xlarge 500.0 1000.0

Once you have decided the number and size of Express brokers you’ll need for your workload, go to the AWS Management Console or use the CreateCluster API to create an Amazon MSK provisioned cluster.

When you create a new cluster on the Amazon MSK console, in the Broker type option, choose Express brokers and then select the mount of compute capacity that you want to provision for the broker. As you can see in the screen shot, you can use Apache Kafka 3.6.0 version and Graviton-based instances for Express brokers. You don’t need to preprovision storage for Express brokers.

You can also customize some of these configurations to further fine-tune the performance of your clusters according to your own preferences. To learn more, visit Express broker configurations in the Amazon MSK developer guide.

To create an MSK cluster in the AWS Command Line Interface (AWS CLI), use the create-cluster command.

aws kafka create-cluster \
    --cluster-name "channy-express-cluster" \
    --kafka-version "3.6.0" \
    --number-of-broker-nodes 3 \
    --broker-node-group-info file://brokernodegroupinfo.json

A JSON file named brokernodegroupinfo.json specifies the three subnets over which you want Amazon MSK to distribute the broker nodes.

{
    "InstanceType": "express.m7g.large",
    "BrokerAZDistribution": "DEFAULT",
    "ClientSubnets": [
        "subnet-0123456789111abcd",
        "subnet-0123456789222abcd",
        "subnet-0123456789333abcd"
    ]
}

Once the cluster is created, you can use the bootstrap connection string to connect your clients to the cluster endpoints.

With Express brokers, you can scale vertically (changing instance size) or horizontally (adding brokers). Vertical scaling doubles throughput without requiring partition reassignment. Horizontal scaling adds brokers in sets of three and and allows you to create more partitions, but it requires partition reassignment for new brokers to serve traffic.

A major benefit of Express brokers is that you can add brokers and rebalance partitions within minutes. On the other hand, rebalancing partitions after adding Standard brokers can take several hours. The graph below shows the time it took to rebalance partitions after adding 3 Express brokers to a cluster and reassigning 2000 partitions to each of the new brokers.

As you can see, it took approximately 10 minutes to reassign these partitions and utilize the additional capacity of the new brokers. When we ran the same experiment on an equivalent cluster comprising of Standard brokers, partition reassignment took over 24hours.

To learn more about the partition reassignment, visit Expanding your cluster in the Apache Kafka documentation.

Things to know
Here are some things you should know about Express brokers:

  • Data migration – You can migrate the data in your existing Kafka or MSK cluster to a cluster composed of Express brokers using Amazon MSK Replicator, which copies both the data and the metadata of your cluster to a new cluster.
  • Monitoring – You can monitor your cluster composed of Express brokers in the cluster and at the broker level with Amazon CloudWatch metrics and enable open monitoring with Prometheus to expose metrics using the JMX Exporter and the Node Exporter.
  • Security – Just like with other broker types, Amazon MSK integrates with AWS Key Management Service (AWS KMS) to offer transparent server-side encryption for the storage in Express brokers. When you create an MSK cluster with Express brokers, you can specify the AWS KMS key that you want Amazon MSK to use to encrypt your data at rest. If you don’t specify a KMS key, Amazon MSK creates an AWS managed key for you and uses it on your behalf.

Now available
The Express broker type is available today in the US East (Ohio), US East (N. Virginia), US West (Oregon), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), Europe (Frankfurt), and Europe (Ireland), and Europe (Stockholm) Regions.

You pay an hourly rate for Apache Kafka broker instance usage (billed at one-second resolution) for Express brokers, with varying fees depending on the size of the broker instance and active brokers in your MSK clusters. You also pay a per-GB rate for data written to an Express broker (billed at per-byte resolution). To learn more, visit the Amazon MSK pricing page.

Give Express brokers for Amazon MSK a try in the Amazon MSK console. For more information, visit the Amazon MSK Developer Guide and send feedback to AWS re:Post for Amazon MSK or through your usual AWS support contacts.

Channy