Tag Archives: Analytics

Enhance data security with fine-grained access controls in Amazon DataZone

Post Syndicated from Deepmala Agarwal original https://aws.amazon.com/blogs/big-data/enhance-data-security-with-fine-grained-access-controls-in-amazon-datazone/

Fine-grained access control is a crucial aspect of data security for modern data lakes and data warehouses. As organizations handle vast amounts of data across multiple data sources, the need to manage sensitive information has become increasingly important. Making sure the right people have access to the right data, without exposing sensitive information to unauthorized individuals, is essential for maintaining data privacy, compliance, and security.

Today, Amazon DataZone has introduced fine-grained access control, providing you granular control over your data assets in the Amazon DataZone business data catalog across data lakes and data warehouses. With the new capability, data owners can now restrict access to specific records of data at row and column levels, instead of granting access to the entire data asset. For example, if your data contains columns with sensitive information such as personally identifiable information (PII), you can restrict access to only the necessary columns, making sure sensitive information is protected while still allowing access to non-sensitive data. Similarly, you can control access at the row level, allowing users to see only the records that are relevant to their role or task.

In this post, we discuss how to implement fine-grained access control with row and column asset filters using this new feature in Amazon DataZone.

Row and column filters

Row filters enable you to restrict access to specific rows based on criteria you define. For instance, if your table contains data for two regions (America and Europe) and you want to make sure that employees in Europe only access data relevant to their region, you can create a row filter that excludes rows where the region is not Europe (for example, region != 'Europe'). This way, employees in America won’t have access to Europe’s data.

Column filters allow you to limit access to specific columns within your data assets. For example, if your table includes sensitive information such as PII, you can create a column filter to exclude PII columns. This makes sure subscribers can only access non-sensitive data.

The row and column asset filters in Amazon DataZone enable you to control who can access what using a consistent, business user-friendly mechanism for all of your data across AWS data lakes and data warehouses. To use fine-grained access control in Amazon DataZone, you can create row and column filters on top of your data assets in the Amazon DataZone business data catalog. When a user requests a subscription to your data asset, you can approve the subscription by applying the appropriate row and column filters. Amazon DataZone enforces these filters using AWS Lake Formation and Amazon Redshift, making sure the subscriber can only access the rows and columns that they are authorized to use.

Solution overview

To demonstrate the new capability, we consider a sample customer use case where an electronics ecommerce platform is looking to implement fine-grained access controls using Amazon DataZone. The customer has multiple product categories, each operated by different divisions of the company. The platform governance team wants to make sure each division has visibility only to data belonging to their own categories. Additionally, the platform governance team needs to adhere to the finance team requirements that pricing information should be visible only to the finance team.

The sales team, acting as the data producer, has published an AWS Glue table called Product sales that contains data for both Laptops and Servers categories to the Amazon DataZone business data catalog using the project Product-Sales. The analytic teams in both the laptop and server divisions need to access this data for their respective analytics projects. The data owner’s objective is to grant data access to consumers based on the division they belong to. This means giving access to only rows of data with laptop sales to the laptops sales analytics team, and rows with servers sales to the server sales analytics team. Additionally, the data owner wants to restrict both teams from accessing the pricing data. This post demonstrates the implementation steps to achieve this use case in Amazon DataZone.

The steps to configure this solution are as follows:

  1. The publisher creates asset filters for limiting access:
    1. We create two row filters: a Laptop Only row filter that limits access to only the rows of data with laptop sales, and a Server Only row filter that limits access to the rows of data with server sales.
    2. We also create a column filter called exclude-price-columns that excludes the price-related columns from the Product Sales
  2. Consumers discover and request subscriptions:
    1. The analyst from the laptops division requests a subscription to the Product Sales data asset.
    2. The analyst from the servers division also request a subscription to the Product Sales data asset.
    3. Both subscription requests are sent to the publisher for approval.
  3. The publisher approves the subscriptions and applies the appropriate filters:
    1. The publisher approves the request from the analysts in the laptops division, applying the Laptop Only row filter and the exclude-price-columns columns filter.
    2. The publisher approves the request from the consumer in the servers division, applying the Server Only row filter and the exclude-price-columns columns filter.
  4. Consumers access the authorized data in Amazon Athena:
    1. After the subscription is approved, we query the data in Athena to make sure that the analyst from the laptops division can now access only the product sales data for the Laptop
    2. Similarly, the analyst from the servers division can access only the product sales data for the Server
    3. Both consumers can see all columns except the price-related columns, as per the applied column filter.

The following diagram illustrates the solution architecture and process flow.

Prerequisites

To follow along with this post, the publisher of the product sales data asset must have published a sales dataset in Amazon DataZone.

Publisher creates asset filters for limiting access

In this section, we detail the steps the publisher takes to create asset filers.

Create row filters

This dataset contains the product categories Laptops and Servers. We want to restrict access to the dataset that is authorized based on the product category. We use the row filter feature in Amazon DataZone to achieve this.

Amazon DataZone allows you to create row filters that can be used when approving subscriptions to make sure that the subscriber can only access rows of data as defined in the row filters. To create a row filter, complete the following steps:

  1. On the Amazon DataZone console, navigate to the product-sales project (the project to which the asset belongs).
  2. Navigate to the Data tab for the project.
  3. Choose Inventory data in the navigation pane, then the asset Product Sales, where you want to create the row filter.

You can add row filters for assets of type AWS Glue tables or Redshift tables.

  1. On the asset detail page, on the Asset filters tab, choose Add asset filter.

We create two row filters, one each for the Laptops and Servers categories.

  1. Complete the following steps to create a laptop only asset row filter:
    1. Enter a name for this filter (Laptop Only).
    2. Enter a description of the filter (Allow rows with product category as Laptop Only).
    3. For the filter type, select Row filter.
    4. For the row filter expression, enter one or more expressions:
      1. Choose the column Product Category from the column dropdown menu.
      2. Choose the operator = from the operator dropdown menu.
      3. Enter the value Laptops in the Value field.
    5. If you need to add another condition to the filter expression, choose Add condition. For this post, we create a filter with one condition.
    6. When using multiple conditions in the row filter expression, choose And or Or to link the conditions.
    7. You can also define the subscriber visibility. For this post, we kept the default value (No, show values to subscriber).
    8. Choose Create asset filter.
  2. Repeat the same steps to create a row filter called Server Only, except this time enter the value Servers in the Value field.

Create column filters

Next, we create column filters to restrict access to columns with price-related data. Complete the following steps:

  1. In the same asset, add another asset filter of type column filter.
  2. On the Asset filters tab, choose Add asset filter.
  3. For Name, enter a name for the filter (for this post, exclude-price-columns).
  4. For Description, enter a description of the filters (for this post, exclude price data columns).
  5. For the filter type, select Column to create the column filter. This will display all the available columns in the data asset’s schema.
  6. Select all columns except the price-related ones.
  7. Choose Create asset filter.

Consumers discover and request subscriptions

In this section, we switch to the role of an analyst from the laptop division who is working within the project Sales Analytics - Laptop. As the data consumer, we search the catalog to find the Product Sales data asset and request access by subscribing to it.

  1. Log in to your project as a consumer and search for the Product Sales data asset.
  2. On the Product Sales data asset details page, choose Subscribe.
  3. For Project, choose Sales Analytics – Laptops.
  4. For Reason for request, enter the reason for the subscription request.
  5. Choose Subscribe to submit the subscription request.

Publisher approves subscriptions with filters

After the subscription request is submitted, the publisher will receive the request, and they can approve it by following these steps:

  1. As the publisher, open the project Product-Sales.
  2. On the Data tab, choose Incoming requests in the left navigation pane.
  3. Locate the request and choose View request. You can filter by Pending to see only requests that are still open.

This opens the details of the request, where you can see details like who requested the access, for what project, and the reason for the request.

  1. To approve the request, there are two options:
    1. Full access – If you choose to approve the subscription with full access option, the subscriber will get access to all the rows and columns in our data asset.
    2. Approve with row and column filters – To limit access to specific rows and columns of data, you can choose the option to approve with row and column filters. For this post, we use both filters that we created earlier.
  2. Select Choose filter, then on the dropdown menu, choose the Laptops Only and pii-col-filter
  3. Choose Approve to approve the request.

After access is granted and fulfilled, the subscription looks as shown in the following screenshot.

  1. Now let’s log in as a consumer from the server division.
  2. Repeat the same steps, but this time, while approving the subscription, the publisher of sales data approves with the Server only The other steps remain the same.

Consumers access authorized data in Athena

Now that we have successfully published an asset to the Amazon DataZone catalog and subscribed to it, we can analyze it. Let’s log in as a consumer from the laptop division.

  1. In the Amazon DataZone data portal, choose the consumer project Sales Analytics - Laptops.
  2. On the Schema tab, we can view the subscribed assets.
  3. Choose the project Sales Analytics - Laptops and choose the Overview
  4. In the right pane, open the Athena environment.

We can now run queries on the subscribed table.

  1. Choose the table under Tables and views, then choose Preview to view the SELECT statement in the query editor.
  2. Run a query as the consumer of Sales Analytics - Laptops, in which we can view data only with product category Laptops.

Under Tables and views, you can expand the table product_sales. The price-related columns are not visible in the Athena environment for querying.

  1. Next, you can switch to the role of analyst from the server division and analyze the dataset in similar way.
  2. We run the same query and see that under product_category, the analyst can see Servers only.

Conclusion

Amazon DataZone offers a straightforward way to implement fine-grained access controls on top of your data assets. This feature allows you to define column-level and row-level filters to enforce data privacy before the data is available to data consumers. Amazon DataZone fine-grained access control is generally available in all AWS Regions that support Amazon DataZone.

Try out the fine-grained access control feature in your own use case, and let us know your feedback in the comments section.


About the Authors

Deepmala Agarwal works as an AWS Data Specialist Solutions Architect. She is passionate about helping customers build out scalable, distributed, and data-driven solutions on AWS. When not at work, Deepmala likes spending time with family, walking, listening to music, watching movies, and cooking!

Leonardo Gomez is a Principal Analytics Specialist Solutions Architect at AWS. He has over a decade of experience in data management, helping customers around the globe address their business and technical needs. Connect with him on LinkedIn.

Utkarsh Mittal is a Senior Technical Product Manager for Amazon DataZone at AWS. He is passionate about building innovative products that simplify customers’ end-to-end analytics journeys. Outside of the tech world, Utkarsh loves to play music, with drums being his latest endeavor.

Automate data loading from your database into Amazon Redshift using AWS Database Migration Service (DMS), AWS Step Functions, and the Redshift Data API

Post Syndicated from Ritesh Sinha original https://aws.amazon.com/blogs/big-data/automate-data-loading-from-your-database-into-amazon-redshift-using-aws-database-migration-service-dms-aws-step-functions-and-the-redshift-data-api/

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing ETL (extract, transform, and load), business intelligence (BI), and reporting tools. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads such as BI, predictive analytics, and real-time streaming analytics.

As more and more data is being generated, collected, processed, and stored in many different systems, making the data available for end-users at the right place and right time is a very important aspect for data warehouse implementation. A fully automated and highly scalable ETL process helps minimize the operational effort that you must invest in managing the regular ETL pipelines. It also provides timely refreshes of data in your data warehouse.

You can approach the data integration process in two ways:

  • Full load – This method involves completely reloading all the data within a specific data warehouse table or dataset
  • Incremental load – This method focuses on updating or adding only the changed or new data to the existing dataset in a data warehouse

This post discusses how to automate ingestion of source data that changes completely and has no way to track the changes. This is useful for customers who want to use this data in Amazon Redshift; some examples of such data are products and bills of materials without tracking details at the source.

We show how to build an automatic extract and load process from various relational database systems into a data warehouse for full load only. A full load is performed from SQL Server to Amazon Redshift using AWS Database Migration Service (AWS DMS). When Amazon EventBridge receives a full load completion notification from AWS DMS, ETL processes are run on Amazon Redshift to process data. AWS Step Functions is used to orchestrate this ETL pipeline. Alternatively, you could use Amazon Managed Workflows for Apache Airflow (Amazon MWAA), a managed orchestration service for Apache Airflow that makes it straightforward to set up and operate end-to-end data pipelines in the cloud.

Solution overview

The workflow consists of the following steps:

  1. The solution uses an AWS DMS migration task that replicates the full load dataset from the configured SQL Server source to a target Redshift cluster in a staging area.
  2. AWS DMS publishes the replicationtaskstopped event to EventBridge when the replication task is complete, which invokes an EventBridge rule.
  3. EventBridge routes the event to a Step Functions state machine.
  4. The state machine calls a Redshift stored procedure through the Redshift Data API, which loads the dataset from the staging area to the target production tables. With this API, you can also access Redshift data with web-based service applications, including AWS Lambda.

The following architecture diagram highlights the end-to-end solution using AWS services.

In the following sections, we demonstrate how to create the full load AWS DMS task, configure the ETL orchestration on Amazon Redshift, create the EventBridge rule, and test the solution.

Prerequisites

To complete this walkthrough, you must have the following prerequisites:

  • An AWS account
  • A SQL Server database configured as a replication source for AWS DMS
  • A Redshift cluster to serve as the target database
  • An AWS DMS replication instance to migrate data from source to target
  • A source endpoint pointing to the SQL Server database
  • A target endpoint pointing to the Redshift cluster

Create the full load AWS DMS task

Complete the following steps to set up your migration task:

  1. On the AWS DMS console, choose Database migration tasks in the navigation pane.
  2. Choose Create task.
  3. For Task identifier, enter a name for your task, such as dms-full-dump-task.
  4. Choose your replication instance.
  5. Choose your source endpoint.
  6. Choose your target endpoint.
  7. For Migration type, choose Migrate existing data.

  1. In the Table mapping section, under Selection rules, choose Add new selection rule
  2. For Schema, choose Enter a schema.
  3. For Schema name, enter a name (for example, dms_sample).
  4. Keep the remaining settings as default and choose Create task.

The following screenshot shows your completed task on the AWS DMS console.

Create Redshift tables

Create the following tables on the Redshift cluster using the Redshift query editor:

  • dbo.dim_cust – Stores customer attributes:
CREATE TABLE dbo.dim_cust (
cust_key integer ENCODE az64,
cust_id character varying(10) ENCODE lzo,
cust_name character varying(100) ENCODE lzo,
cust_city character varying(50) ENCODE lzo,
cust_rev_flg character varying(1) ENCODE lzo
)

DISTSTYLE AUTO;
  • dbo.fact_sales – Stores customer sales transactions:
CREATE TABLE dbo.fact_sales (
order_number character varying(20) ENCODE lzo,
cust_key integer ENCODE az64,
order_amt numeric(18,2) ENCODE az64
)

DISTSTYLE AUTO;
  • dbo.fact_sales_stg – Stores daily customer incremental sales transactions:
CREATE TABLE dbo.fact_sales_stg (
order_number character varying(20) ENCODE lzo,
cust_id character varying(10) ENCODE lzo,
order_amt numeric(18,2) ENCODE az64
)

DISTSTYLE AUTO;

Use the following INSERT statements to load sample data into the sales staging table:

insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (100,1,200);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (101,1,300);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (102,2,25);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (103,2,35);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (104,3,80);
insert into dbo.fact_sales_stg(order_number,cust_id,order_amt) values (105,3,45);

Create the stored procedures

In the Redshift query editor, create the following stored procedures to process customer and sales transaction data:

  • Sp_load_cust_dim() – This procedure compares the customer dimension with incremental customer data in staging and populates the customer dimension:
CREATE OR REPLACE PROCEDURE dbo.sp_load_cust_dim()
LANGUAGE plpgsql
AS $$
BEGIN
truncate table dbo.dim_cust;
insert into dbo.dim_cust(cust_key,cust_id,cust_name,cust_city) values (1,100,'abc','chicago');
insert into dbo.dim_cust(cust_key,cust_id,cust_name,cust_city) values (2,101,'xyz','dallas');
insert into dbo.dim_cust(cust_key,cust_id,cust_name,cust_city) values (3,102,'yrt','new york');
update dbo.dim_cust
set cust_rev_flg=case when cust_city='new york' then 'Y' else 'N' end
where cust_rev_flg is null;
END;
$$
  • sp_load_fact_sales() – This procedure does the transformation for incremental order data by joining with the date dimension and customer dimension and populates the primary keys from the respective dimension tables in the final sales fact table:
CREATE OR REPLACE PROCEDURE dbo.sp_load_fact_sales()
LANGUAGE plpgsql
AS $$
BEGIN
--Process Fact Sales
insert into dbo.fact_sales
select
sales_fct.order_number,
cust.cust_key as cust_key,
sales_fct.order_amt
from dbo.fact_sales_stg sales_fct
--join to customer dim
inner join (select * from dbo.dim_cust) cust on sales_fct.cust_id=cust.cust_id;
END;
$$

Create the Step Functions state machine

Complete the following steps to create the state machine redshift-elt-load-customer-sales. This state machine is invoked as soon as the AWS DMS full load task for the customer table is complete.

  1. On the Step Functions console, choose State machines in the navigation pane.
  2. Choose Create state machine.
  3. For Template, choose Blank.
  4. On the Actions dropdown menu, choose Import definition to import the workflow definition of the state machine.

  1. Open your preferred text editor and save the following code as an ASL file extension (for example, redshift-elt-load-customer-sales.ASL). Provide your Redshift cluster ID and the secret ARN for your Redshift cluster.
{
"Comment": "State Machine to process ETL for Customer Sales Transactions",
"StartAt": "Load_Customer_Dim",
"States": {
"Load_Customer_Dim": {
"Type": "Task",
"Parameters": {
"ClusterIdentifier": "redshiftcluster-abcd",
"Database": "dev",
"Sql": "call dbo.sp_load_cust_dim()",
"SecretArn": "arn:aws:secretsmanager:us-west-2:xxx:secret:rs-cluster-secret-abcd"
},
"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement",
"Next": "Wait on Load_Customer_Dim"
},
"Wait on Load_Customer_Dim": {
"Type": "Wait",
"Seconds": 30,
"Next": "Check_Status_Load_Customer_Dim"
},

"Check_Status_Load_Customer_Dim": {
"Type": "Task",
"Next": "Choice",
"Parameters": {
"Id.$": "$.Id"
},

"Resource": "arn:aws:states:::aws-sdk:redshiftdata:describeStatement"
},

"Choice": {
"Type": "Choice",
"Choices": [
{
"Not": {
"Variable": "$.Status",
"StringEquals": "FINISHED"
},
"Next": "Wait on Load_Customer_Dim"
}
],
"Default": "Load_Sales_Fact"
},
"Load_Sales_Fact": {
"Type": "Task",
"End": true,
"Parameters": {
"ClusterIdentifier": "redshiftcluster-abcdef”,
"Database": "dev",
"Sql": "call dbo.sp_load_fact_sales()",
"SecretArn": "arn:aws:secretsmanager:us-west-2:xxx:secret:rs-cluster-secret-abcd"
},

"Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement"
}
}
}
  1. Choose Choose file and upload the ASL file to create a new state machine.

  1. For State machine name, enter a name for the state machine (for example, redshift-elt-load-customer-sales).
  2. Choose Create.

After the successful creation of the state machine, you can verify the details as shown in the following screenshot.

The following diagram illustrates the state machine workflow.

The state machine includes the following steps:

  • Load_Customer_Dim – Performs the following actions:
    • Passes the stored procedure sp_load_cust_dim to the execute-statement API to run in the Redshift cluster to load the incremental data for the customer dimension
    • Sends data back the identifier of the SQL statement to the state machine
  • Wait_on_Load_Customer_Dim – Waits for at least 15 seconds
  • Check_Status_Load_Customer_Dim – Invokes the Data API’s describeStatement to get the status of the API call
  • is_run_Load_Customer_Dim_complete – Routes the next step of the ETL workflow depending on its status:
    • FINISHED – Passes the stored procedure Load_Sales_Fact to the execute-statement API to run in the Redshift cluster, which loads the incremental data for fact sales and populates the corresponding keys from the customer and date dimensions
    • All other statuses – Goes back to the wait_on_load_customer_dim step to wait for the SQL statements to finish

The state machine redshift-elt-load-customer-sales loads the dim_cust, fact_sales_stg, and fact_sales tables when invoked by the EventBridge rule.

As an optional step, you can set up event-based notifications on completion of the state machine to invoke any downstream actions, such as Amazon Simple Notification Service (Amazon SNS) or further ETL processes.

Create an EventBridge rule

EventBridge sends event notifications to the Step Functions state machine when the full load is complete. You can also turn event notifications on or off in EventBridge.

Complete the following steps to create the EventBridge rule:

  1. On the EventBridge console, in the navigation pane, choose Rules.
  2. Choose Create rule.
  3. For Name, enter a name (for example, dms-test).
  4. Optionally, enter a description for the rule.
  5. For Event bus, choose the event bus to associate with this rule. If you want this rule to match events that come from your account, select AWS default event bus. When an AWS service in your account emits an event, it always goes to your account’s default event bus.
  6. For Rule type, choose Rule with an event pattern.
  7. Choose Next.
  8. For Event source, choose AWS events or EventBridge partner events.
  9. For Method, select Use pattern form.
  10. For Event source, choose AWS services.
  11. For AWS service, choose Database Migration Service.
  12. For Event type, choose All Events.
  13. For Event pattern, enter the following JSON expression, which looks for the REPLICATON_TASK_STOPPED status for the AWS DMS task:
{
"source": ["aws.dms"],
"detail": {
"eventId": ["DMS-EVENT-0079"],
"eventType": ["REPLICATION_TASK_STOPPED"],
"detailMessage": ["Stop Reason FULL_LOAD_ONLY_FINISHED"],
"type": ["REPLICATION_TASK"],
"category": ["StateChange"]
}
}

  1. For Target type, choose AWS service.
  2. For AWS service, choose Step Functions state machine.
  3. For State machine name, enter redshift-elt-load-customer-sales.
  4. Choose Create rule.

The following screenshot shows the details of the rule created for this post.

Test the solution

Run the task and wait for the workload to complete. This workflow moves the full volume data from the source database to the Redshift cluster.

The following screenshot shows the load statistics for the customer table full load.

AWS DMS provides notifications when an AWS DMS event occurs, for example the completion of a full load or if a replication task has stopped.

After the full load is complete, AWS DMS sends events to the default event bus for your account. The following screenshot shows an example of invoking the target Step Functions state machine using the rule you created.

We configured the Step Functions state machine as a target in EventBridge. This enables EventBridge to invoke the Step Functions workflow in response to the completion of an AWS DMS full load task.

Validate the state machine orchestration

When the entire customer sales data pipeline is complete, you may go through the entire event history for the Step Functions state machine, as shown in the following screenshots.

Limitations

The Data API and Step Functions AWS SDK integration offers a robust mechanism to build highly distributed ETL applications within minimal developer overhead. Consider the following limitations when using the Data API and Step Functions:

Clean up

To avoid incurring future charges, delete the Redshift cluster, AWS DMS full load task, AWS DMS replication instance, and Step Functions state machine that you created as part of this post.

Conclusion

In this post, we demonstrated how to build an ETL orchestration for full loads from operational data stores using the Redshift Data API, EventBridge, Step Functions with AWS SDK integration, and Redshift stored procedures.

To learn more about the Data API, see Using the Amazon Redshift Data API to interact with Amazon Redshift clusters and Using the Amazon Redshift Data API.


About the authors

Ritesh Kumar Sinha is an Analytics Specialist Solutions Architect based out of San Francisco. He has helped customers build scalable data warehousing and big data solutions for over 16 years. He loves to design and build efficient end-to-end solutions on AWS. In his spare time, he loves reading, walking, and doing yoga.

Praveen Kadipikonda is a Senior Analytics Specialist Solutions Architect at AWS based out of Dallas. He helps customers build efficient, performant, and scalable analytic solutions. He has worked with building databases and data warehouse solutions for over 15 years.

Jagadish Kumar (Jag) is a Senior Specialist Solutions Architect at AWS focused on Amazon OpenSearch Service. He is deeply passionate about Data Architecture and helps customers build analytics solutions at scale on AWS.

Introducing self-managed data sources for Amazon OpenSearch Ingestion

Post Syndicated from Muthu Pitchaimani original https://aws.amazon.com/blogs/big-data/introducing-self-managed-data-sources-for-amazon-opensearch-ingestion/

Enterprise customers increasingly adopt Amazon OpenSearch Ingestion (OSI) to bring data into Amazon OpenSearch Service for various use cases. These include petabyte-scale log analytics, real-time streaming, security analytics, and searching semi-structured key-value or document data. OSI makes it simple, with straightforward integrations, to ingest data from many AWS services, including Amazon DynamoDB, Amazon Simple Storage Service (Amazon S3), Amazon Managed Streaming for Apache Kafka (Amazon MSK), and Amazon DocumentDB (with MongoDB compatibility).

Today we are announcing support for ingesting data from self-managed OpenSearch/Elasticsearch and Apache Kafka clusters. These sources can either be on Amazon Elastic Compute Cloud (Amazon EC2) or on-premises environments.

In this post, we outline the steps to get started with these sources.

Solution overview

OSI supports the AWS Cloud Development Kit (AWS CDK), AWS CloudFormation, the AWS Command Line Interface (AWS CLI), Terraform, AWS APIs, and the AWS Management Console to deploy pipelines. In this post, we use the console to demonstrate how to create a self-managed Kafka pipeline.

Prerequisites

To make sure OSI can connect and read data successfully, the following conditions should be met:

  • Network connectivity to data sources – OSI is generally deployed in a public network, such as the internet, or in a virtual private cloud (VPC). OSI deployed in a customer VPC is able to access data sources in the same or different VPC and on the internet with an attached internet gateway. If your data sources are in another VPC, common methods for network connectivity include direct VPC peering, using a transit gateway, or using customer managed VPC endpoints powered by AWS PrivateLink. If your data sources are on your corporate data center or other on-premises environment, common methods for network connectivity include AWS Direct Connect and using a network hub like a transit gateway. The following diagram shows a sample configuration of OSI running in a VPC and using Amazon OpenSearch Service as a sink. OSI runs in a service VPC and creates an Elastic Network interface (ENI) in the customer VPC. For self-managed data source these ENIs are used for reading data from on-premises environment. OSI creates an VPC endpoint in the service VPC to send data to the sink.
  • Name resolution for data sources – OSI uses an Amazon Route 53 resolver. This resolver automatically answers queries to names local to a VPC, public domain names on the internet, and records hosted in private hosted zones. If you’re are using a private hosted zone, make sure you have a DHCP option set enabled, attached to the VPC using AmazonProvidedDNS as domain name server. For more information, see Work with DHCP option sets. Additionally, you can use resolver inbound and outbound endpoints if you need a complex resolution schemes with conditions that are beyond a simple private hosted zone.
  • Certificate verification for data source names – OSI supports only SASL_SSL for transport for Apache Kafka source. Within SASL, Amazon OpenSearch Service supports most authentication mechanisms like PLAIN, SCRAM, IAM, GSAPI and others. When using SASL_SSL, make sure you have access to certificates needed for OSI to authenticate. For self-managed OpenSearch data sources, make sure verifiable certificates are installed on the clusters. Amazon OpenSearch Service doesn’t support insecure communication between OSI and OpenSearch. Certificate verification cannot be turned off. In particular, the “insecure” configuration option is not supported.
  • Access to AWS Secrets Manager – OSI uses AWS Secrets Manager to retrieve credentials and certificates needed to communicate with self-managed data sources. For more information, see Create and manage secrets with AWS Secrets Manager.
  • IAM role for pipelines – You need an AWS Identity and Access Management (IAM) pipeline role to write to data sinks. For more information, see Identity and Access Management for Amazon OpenSearch Ingestion.

Create a pipeline with self-managed Kafka as a source

After you complete the prerequisites, you’re ready to create a pipeline for your data source. Complete the following steps:

  1. On the OpenSearch Service console, choose Pipelines under Ingestion in the navigation pane.
  2. Choose Create pipeline.
  3. Choose Streaming under Use case in the navigation pane.
  4. Select Self managed Apache Kafka under Ingestion pipeline blueprints and choose Select blueprint.

This will populate a sample configuration for this pipeline.

  1. Provide a name for this pipeline and choose the appropriate pipeline capacity.
  2. Under Pipeline configuration, provide your pipeline configuration in YAML format. The following code snippet shows sample configuration in YAML for SASL_SSL authentication:
    version: '2'
    kafka-pipeline:
      source:
        kafka:
          acknowledgments: true
          bootstrap_servers:
            - 'node-0.example.com:9092'
          encryption:
            type: "ssl"
            certificate: '${{aws_secrets:kafka-cert}}'
            
          authentication:
            sasl:
              plain:
                username: '${{aws_secrets:secrets:username}}'
                password: '${{aws_secrets:secrets:password}}'
          topics:
            - name: on-prem-topic
              group_id: osi-group-1
      processor:
        - grok:
            match:
              message:
                - '%{COMMONAPACHELOG}'
        - date:
            destination: '@timestamp'
            from_time_received: true
      sink:
        - opensearch:
            hosts: ["https://search-domain-12345567890.us-east-1.es.amazonaws.com"]
            aws:
              region: us-east-1
              sts_role_arn: 'arn:aws:iam::123456789012:role/pipeline-role'
            index: "on-prem-kakfa-index"
    extension:
      aws:
        secrets:
          kafka-cert:
            secret_id: kafka-cert
            region: us-east-1
            sts_role_arn: 'arn:aws:iam::123456789012:role/pipeline-role'
          secrets:
            secret_id: secrets
            region: us-east-1
            sts_role_arn: 'arn:aws:iam::123456789012:role/pipeline-role'

  1. Choose Validate pipeline and confirm there are no errors.
  2. Under Network configuration, choose Public access or VPC access. (For this post, we choose VPC access).
  3. If you chose VPC access, specify your VPC, subnets, and an appropriate security group so OSI can reach the outgoing ports for the data source.
  4. Under VPC attachment options, select Attach to VPC and choose an appropriate CIDR range.

OSI resources are created in a service VPC managed by AWS that is separate from the VPC you chose in the last step. This selection allows you to configure what CIDR ranges OSI should use inside this service VPC. The choice exists so you can make sure there is no address collision between CIDR ranges in your VPC that is attached to your on-premises network and this service VPC. Many pipelines in your account can share same CIDR ranges for this service VPC.

  1. Specify any optional tags and log publishing options, then choose Next.
  2. Review the configuration and choose Create pipeline.

You can monitor the pipeline creation and any log messages in the Amazon CloudWatch Logs log group you specified. Your pipeline should now be successfully created. For more information about how to provision capacity for the performance of this pipeline, see the section Recommended Compute Units (OCUs) for the MSK pipeline in Introducing Amazon MSK as a source for Amazon OpenSearch Ingestion.

Create a pipeline with self-managed OpenSearch as a source

The steps for creating a pipeline for self-managed OpenSearch are similar to the steps for creating one for Kafka. During the blueprint selection, choose Data Migration under Use case and select Self managed OpenSearch/Elasticsearch. OpenSearch Ingestion can source data from all versions of OpenSearch and Elasticsearch from version 7.0  to  version 7.10.

The following blueprint shows a sample configuration YAML for this data source:

version: "2"
opensearch-migration-pipeline:
  source:
    opensearch:
      acknowledgments: true
      hosts: [ "https://node-0.example.com:9200" ]
      username: "${{aws_secrets:secret:username}}"
      password: "${{aws_secrets:secret:password}}"
      indices:
        include:
        - index_name_regex: "opensearch_dashboards_sample_data*"
        exclude:
          - index_name_regex: '\..*'
  sink:
    - opensearch:
        hosts: [ "https://search-domain-12345567890.us-east-1.es.amazonaws.com" ]
        aws:
          sts_role_arn: "arn:aws:iam::123456789012:role/pipeline-role"
          region: "us-east-1"
        index: "on-prem-os"
extension:
  aws:
    secrets:
      secret:
        secret_id: "self-managed-os-credentials"
        region: "us-east-1"
        sts_role_arn: "arn:aws:iam::123456789012:role/pipeline-role"
        refresh_interval: PT1H

Considerations for self-managed OpenSearch data source

Certificates installed on the OpenSearch cluster need to be verifiable for OSI to connect to this data source before reading data. Insecure connections are currently not supported.

After you’re connected, make sure the cluster has sufficient read bandwidth to allow for OSI to read data. Use the Min and Max OCU setting to limit OSI read bandwidth consumption. Your read bandwidth will vary depending upon data volume, number of indexes, and provisioned OCU capacity. Start small and increase the number of OCUs to balance between available bandwidth and acceptable migration time.

This source is typically meant for one-time migration of data and not as continuous ingestion to keep data in sync between data sources and sinks.

OpenSearch Service domains support remote reindexing, but that consumes resources in your domains. Using OSI will move this compute out of the domain, and OSI can achieve significantly higher bandwidth than remote reindexing, thereby resulting in faster migration times.

OSI doesn’t support deferred replay or traffic recording today; refer to Migration Assistant for Amazon OpenSearch Service if your migration needs those capabilities.

Conclusion

In this post, we introduced self-managed sources for OpenSearch Ingestion that enable you to ingest data from corporate data centers or other on-premises environments. OSI also supports various other data sources and integrations. Refer to Working with Amazon OpenSearch Ingestion pipeline integrations to learn about these other data sources.


About the Authors

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search applications and solutions. Muthu is interested in the topics of networking and security, and is based out of Austin, Texas.

Arjun Nambiar is a Product Manager with Amazon OpenSearch Service. He focuses on ingestion technologies that enable ingesting data from a wide variety of sources into Amazon OpenSearch Service at scale. Arjun is interested in large-scale distributed systems and cloud-centered technologies, and is based out of Seattle, Washington.

Amazon DataZone enhances data discovery with advanced search filtering

Post Syndicated from Chaitanya Vejendla original https://aws.amazon.com/blogs/big-data/amazon-datazone-enhances-data-discovery-with-advanced-search-filtering/

Amazon DataZone, a fully managed data management service, helps organizations catalog, discover, analyze, share, and govern data between data producers and consumers. We are excited to announce the introduction of advanced search filtering capabilities in the Amazon DataZone business data catalog.

With the improved rendering of glossary terms, you can now navigate large sets of terms with ease in an expandable and collapsible hierarchy, reducing the time and effort required to locate specific data assets. The introduction of logical operators (AND and OR) for filtering allows for more precise searches, enabling you to combine multiple criteria in a way that best suits your needs. The descriptive summary of search criteria helps users keep track of their applied filters, making it simple to adjust search parameters on the fly.

In this post, we discuss how these new search filtering capabilities enhance the user experience and boost the accuracy of search results, facilitating the ability to find data quickly.

Challenges

Many of our customers manage vast numbers of data assets within the Amazon DataZone catalog for discoverability. Data producers tag these assets with business glossary terms to classify and enhance discovery. For example, data assets owned by a particular department can be tagged with the glossary term for that department, like “Marketing.”

Data consumers searching for the right data assets use faceted search with various criteria, including business glossary terms, and apply filters to refine their search results. However, finding the right data assets can be challenging, especially when it involves combining multiple filters. Customers wanted more flexibility and precision in their search capabilities, such as:

  • A more intuitive way to navigate through extensive lists of glossary terms
  • The ability to apply more nuanced search logic to refine search results with greater precision
  • A summary of applied filters to effortlessly review and adjust search criteria

New features in Amazon DataZone

With the latest release, Amazon DataZone now supports features that enhance search flexibility and accuracy:

  • Improved rendering of glossary terms – Glossary terms are now displayed in a hierarchical view, providing a more organized structure. You can navigate and select from long lists of glossary terms presented in an expandable and collapsible hierarchy within the search facets. For instance, a data scientist can quickly find specific customer demographic data without sifting through an overwhelming flat list.
  • Logical operators for refined search – You can now choose logical operators to refine your search results, offering greater control and precision. For example, a financial analyst preparing a report on investment performance can use AND logic to combine criteria like investment type and region to pinpoint the exact data needed, or use OR logic to broaden the search to include any investments that meet either criterion.
  • Summary of search criteria – A descriptive summary of applied search filters is now provided, allowing you to review and manage your search criteria with ease. For example, a project manager can quickly adjust filters to find project-related assets matching specific phases or statuses.

These enhancements enable you to better understand the relationships between different search facets, enhancing the overall search experience and making it effortless to find the right data assets.

Use case overview

To demonstrate these search enhancements, we set up a new Amazon DataZone domain with two projects:

  • Marketing project – Publishes campaign-related data assets from the Marketing department. These data assets have been tagged with relevant business glossary terms corresponding to marketing.
  • Sales project – Publishes sales-related datasets from the Sales department. These data assets have been tagged with relevant business glossary terms corresponding to sales.

The following screenshots show examples of the different tagged assets.

In the following sections, we demonstrate the improvements in the user search experience for this use case.

Improved rendering of glossary terms

As a data consumer, you want to discover data assets using the faceted search capability within Amazon DataZone.

The search result panel has been enhanced to display glossaries and glossary terms in a hierarchical fashion. This allows you to expand and collapse sections for a more intuitive search experience.

For example, if you want to find product sales data assets from the Corporate Sales department, you can select the appropriate term within the glossary. The selection criteria and the corresponding result list show a total of 18 data assets, as shown in the following screenshot.

Next, if you want to further refine your search to focus only on the product category of Smartphones, you can do so.

Because OR is the default logical operator for your search within the glossary terms, it lists all the assets that are either part of Corporate Sales or tagged with Smartphones.

Logical operators for refined search

You now have the flexibility to change the default operator to AND to list only those data assets that are part of Corporate Sales and tagged with Smartphones, narrowing down the result set.

Additionally, you can further filter based on the asset type by selecting the available options. When you select Glue Table as your asset type, it defaults to the AND condition across the glossary terms and the asset type filter, thereby showing the data assets that satisfy all the filter conditions.

You also have the flexibility to change the operator to OR across these filters, yielding a more exhaustive list of data assets.

Summary of search criteria

As we showed in the preceding screenshots, the results also display a summary of the filters you applied for the search. This enables you to review and better manage your search criteria.

Conclusion

This post demonstrated new Amazon DataZone search enhancement features that streamline data discovery for a more intuitive user experience. These enhancements are designed to empower data consumers within organizations to make more informed decisions, faster. By streamlining the search process and making it more intuitive, Amazon DataZone continues to support the growing needs of data-driven businesses, helping you unlock the full potential of your data assets.

For more information about Amazon DataZone and to get started, refer to the Amazon DataZone User Guide.


About the authors

Chaitanya Vejendla is a Senior Solutions Architect specialized in DataLake & Analytics primarily working for Healthcare and Life Sciences industry division at AWS. Chaitanya is responsible for helping life sciences organizations and healthcare companies in developing modern data strategies, deploy data governance and analytical applications, electronic medical records, devices, and AI/ML-based applications, while educating customers about how to build secure, scalable, and cost-effective AWS solutions. His expertise spans across data analytics, data governance, AI, ML, big data, and healthcare-related technologies.

Ramesh H Singh is a Senior Product Manager Technical (External Services) at AWS in Seattle, Washington, currently with the Amazon DataZone team. He is passionate about building high-performance ML/AI and analytics products that enable enterprise customers to achieve their critical goals using cutting-edge technology.

Rishabh Asthana is a Front-end Engineer at AWS, working with the Amazon DataZone team based in New York City, USA.

Somdeb Bhattacharjee is an Enterprise Solutions Architect based out of New York, USA focused on helping customers on their cloud journey. He has interest in Databases, Big Data and Analytics.

Introducing end-to-end data lineage (preview) visualization in Amazon DataZone

Post Syndicated from Esra Kayabali original https://aws.amazon.com/blogs/aws/introducing-end-to-end-data-lineage-preview-visualization-in-amazon-datazone/

Amazon DataZone is a data management service to catalog, discover, analyze, share, and govern data between data producers and consumers in your organization. Engineers, data scientists, product managers, analysts, and business users can easily access data throughout your organization using a unified data portal so that they can discover, use, and collaborate to derive data-driven insights.

Now, I am excited to announce in preview a new API-driven and OpenLineage compatible data lineage capability in Amazon DataZone, which provides an end-to-end view of data movement over time. Data lineage is a new feature within Amazon DataZone that helps users visualize and understand data provenance, trace change management, conduct root cause analysis when a data error is reported, and be prepared for questions on data movement from source to target. This feature provides a comprehensive view of lineage events, captured automatically from Amazon DataZone’s catalog along with other events captured programmatically outside of Amazon DataZone by stitching them together for an asset.

When you need to validate how the data of interest originated in the organization, you may rely on manual documentation or human connections. This manual process is time-consuming and can result in inconsistency, which directly reduces your trust in the data. Data lineage in Amazon DataZone can raise trust by helping you understand where the data originated, how it has changed, and its consumption in time. For example, data lineage can be programmatically setup to show the data from the time it was captured as raw files in Amazon Simple Storage Service (Amazon S3), through its ETL transformations using AWS Glue, to the time it was consumed in tools such as Amazon QuickSight.

With Amazon DataZone’s data lineage, you can reduce the time spent mapping a data asset and its relationships, troubleshooting and developing pipelines, and asserting data governance practices. Data lineage helps you gather all lineage information in one place using API, and then provide a graphical view with which data users can be more productive, make better data-driven decisions, and also identify the root cause of data issues.

Let me tell you how to get started with data lineage in Amazon DataZone. Then, I will show you how data lineage enhances the Amazon DataZone data catalog experience by visually displaying connections about how a data asset came to be so you can make informed decisions when searching or using the data asset.

Getting started with data lineage in Amazon DataZone
In preview, I can get started by hydrating lineage information into Amazon DataZone programmatically by either directly creating lineage nodes using Amazon DataZone APIs or by sending OpenLineage compatible events from existing pipeline components to capture data movement or transformations that happens outside of Amazon DataZone. For information about assets in the catalog, Amazon DataZone automatically captures lineage of its states (i.e., inventory or published states), and its subscriptions for producers, such as data engineers, to trace who is consuming the data they produced or for data consumers, such as data analyst or data engineers, to understand if they are using the right data for their analysis.

With the information being sent, Amazon DataZone will start populating the lineage model and will be able to map the identifier sent through the APIs with the assets already cataloged. As new lineage information is being sent, the model starts creating versions to start the visualization of the asset at a given time, but it also allows me to navigate to previous versions.

I use a preconfigured Amazon DataZone domain for this use case. I use Amazon DataZone domains to organize my data assets, users, and projects. I go to the Amazon DataZone console and choose View domains. I choose my domain Sales_Domain and choose Open data portal.

I have five projects under my domain: one for a data producer (SalesProject) and four for data consumers (MarketingTestProject, AdCampaignProject, SocialCampaignProject, and WebCampaignProject). You can visit Amazon DataZone Now Generally Available – Collaborate on Data Projects across Organizational Boundaries to create your own domain and all the core components.

I enter “Market Sales Table” in the Search Assets bar and then go to the detail page for the Market Sales Table asset. I choose the LINEAGE tab to visualize lineage with upstream and downstream nodes.

I can now dive into asset details, processes, or jobs that lead to or from those assets and drill into column-level lineage.

Interactive visualization with data lineage
I will show you the graphical interface using various personas who regularly interact with Amazon DataZone and will benefit from the data lineage feature.

First, let’s say I am a marketing analyst, who needs to confirm the origin of a data asset to confidently use in my analysis. I go to the MarketingTestProject page and choose the LINEAGE tab. I notice the lineage includes information about the asset as it occurs inside and out of Amazon DataZone. The labels Cataloged, Published, and Access requested represent actions inside the catalog. I expand the market_sales dataset item to see where the data came from.

I now feel assured of the origin of the data asset and trust that it aligns with my business purpose ahead of starting my analysis.

Second, let’s say I am a data engineer. I need to understand the impact of my work on dependent objects to avoid unintended changes. As a data engineer, any changes made to the system should not break any downstream processes. By browsing lineage, I can clearly see who has subscribed and has access to the asset. With this information, I can inform the project teams about an impending change that can affect their pipeline. When a data issue is reported, I can investigate each node and traverse between its versions to dive into what has changed over time to identify the root cause of the issue and fix it in a timely manner.

Finally, as an administrator or steward, I am responsible for securing data, standardizing business taxonomies, enacting data management processes, and for general catalog management. I need to collect details about the source of data and understand the transformations that have happened along the way.

For example, as an administrator looking to respond to questions from an auditor, I traverse the graph upstream to see where the data is coming from and notice that the data is from two different sources: online sale and in-store sale. These sources have their own pipelines until the flow reaches a point where the pipelines merge.

While navigating through the lineage graph, I can expand the columns to ensure sensitive columns are dropped during the transformation processes and respond to the auditors with details in a timely manner.

Join the preview
Data lineage capability is available in preview in all Regions where Amazon DataZone is generally available. For a list of Regions where Amazon DataZone domains can be provisioned, visit AWS Services by Region.

Data lineage costs are dependent on storage usage and API requests, which are already included in Amazon DataZone’s pricing model. For more details, visit Amazon DataZone pricing.

To learn more about data lineage in Amazon DataZone, visit the Amazon DataZone User Guide.

— Esra

Implement disaster recovery with Amazon Redshift

Post Syndicated from Nita Shah original https://aws.amazon.com/blogs/big-data/implement-disaster-recovery-with-amazon-redshift/

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. You can start with just a few hundred gigabytes of data and scale to a petabyte or more. This enables you to use your data to acquire new insights for your business and customers.

The objective of a disaster recovery plan is to reduce disruption by enabling quick recovery in the event of a disaster that leads to system failure. Disaster recovery plans also allow organizations to make sure they meet all compliance requirements for regulatory purposes, providing a clear roadmap to recovery.

This post outlines proactive steps you can take to mitigate the risks associated with unexpected disruptions and make sure your organization is better prepared to respond and recover Amazon Redshift in the event of a disaster. With built-in features such as automated snapshots and cross-Region replication, you can enhance your disaster resilience with Amazon Redshift.

Disaster recovery planning

Any kind of disaster recovery planning has two key components:

  • Recovery Point Objective (RPO) – RPO is the maximum acceptable amount of time since the last data recovery point. This determines what is considered an acceptable loss of data between the last recovery point and the interruption of service.
  • Recovery Time Objective (RTO) – RTO is the maximum acceptable delay between the interruption of service and restoration of service. This determines what is considered an acceptable time window when service is unavailable.

To develop your disaster recovery plan, you should complete the following tasks:

  • Define your recovery objectives for downtime and data loss (RTO and RPO) for data and metadata. Make sure your business stakeholders are engaged in deciding appropriate goals.
  • Identify recovery strategies to meet the recovery objectives.
  • Define a fallback plan to return production to the original setup.
  • Test out the disaster recovery plan by simulating a failover event in a non-production environment.
  • Develop a communication plan to notify stakeholders of downtime and its impact to the business.
  • Develop a communication plan for progress updates, and recovery and availability.
  • Document the entire disaster recovery process.

Disaster recovery strategies

Amazon Redshift is a cloud-based data warehouse that supports many recovery capabilities out of the box to address unforeseen outages and minimize downtime.

Amazon Redshift RA3 instance types and Redshift serverless store their data in Redshift Managed Storage (RMS), which is backed by Amazon Simple Storage Service (Amazon S3), which is highly available and durable by default.

In the following sections, we discuss the various failure modes and associated recovery strategies.

Using backups

Backing up data is an important part of data management. Backups protect against human error, hardware failure, virus attacks, power outages, and natural disasters.

Amazon Redshift supports two kinds of snapshots: automatic and manual, which can be used to recover data. Snapshots are point-in-time backups of the Redshift data warehouse. Amazon Redshift stores these snapshots internally with RMS by using an encrypted Secure Sockets Layer (SSL) connection.

Redshift provisioned clusters offer automated snapshots that are taken automatically with a default retention of 1 day, which can be extended for up to 35 days. These snapshots are taken every 5 GB data change per node or every 8 hours, and the minimum time interval between two snapshots is 15 minutes. The data change must be greater than the total data ingested by the cluster (5 GB times the number of nodes). You can also set a custom snapshot schedule with frequencies between 1–24 hours. You can use the AWS Management Console or ModifyCluster API to manage the period of time your automated backups are retained by modifying the RetentionPeriod parameter. If you want to turn off automated backups altogether, you can set up the retention period to 0 (not recommended). For additional details, refer to Automated snapshots.

Amazon Redshift Serverless automatically creates recovery points approximately every 30 minutes. These recovery points have a default retention of 24 hours, after which they get automatically deleted. You do have the option to convert a recovery point into a snapshot if you want to retain it longer than 24 hours.

Both Amazon Redshift provisioned and serverless clusters offer manual snapshots that can be taken on-demand and be retained indefinitely. Manual snapshots allow you to retain your snapshots longer than automated snapshots to meet your compliance needs. Manual snapshots accrue storage charges, so it’s important that you delete them when you no longer need them. For additional details, refer to Manual snapshots.

Amazon Redshift integrates with AWS Backup to help you centralize and automate data protection across all your AWS services, in the cloud, and on premises. With AWS Backup for Amazon Redshift, you can configure data protection policies and monitor activity for different Redshift provisioned clusters in one place. You can create and store manual snapshots for Redshift provisioned clusters. This lets you automate and consolidate backup tasks that you had to do separately before, without any manual processes. To learn more about setting up AWS Backup for Amazon Redshift, refer to Amazon Redshift backups. As of this writing, AWS Backup does not integrate with Redshift Serverless.

Node failure

A Redshift data warehouse is a collection of computing resources called nodes.
Amazon Redshift will automatically detect and replace a failed node in your data warehouse cluster. Amazon Redshift makes your replacement node available immediately and loads your most frequently accessed data from Amazon S3 first to allow you to resume querying your data as quickly as possible.

If this is a single-node cluster (which is not recommended for customer production use), there is only one copy of the data in the cluster. When it’s down, AWS needs to restore the cluster from the most recent snapshot on Amazon S3, and that becomes your RPO.

We recommend using at least two nodes for production.

Cluster failure

Each cluster has a leader node and one or more compute nodes. In the event of a cluster failure, you must restore the cluster from a snapshot. Snapshots are point-in-time backups of a cluster. A snapshot contains data from all databases that are running on your cluster. It also contains information about your cluster, including the number of nodes, node type, and admin user name. If you restore your cluster from a snapshot, Amazon Redshift uses the cluster information to create a new cluster. Then it restores all the databases from the snapshot data. Note that the new cluster is available before all of the data is loaded, so you can begin querying the new cluster in minutes. The cluster is restored in the same AWS Region and a random, system-chosen Availability Zone, unless you specify another Availability Zone in your request.

Availability Zone failure

A Region is a physical location around the world where data centers are located. An Availability Zone is one or more discrete data centers with redundant power, networking, and connectivity in a Region. Availability Zones enable you to operate production applications and databases that are more highly available, fault tolerant, and scalable than would be possible from a single data center. All Availability Zones in a Region are interconnected with high-bandwidth, low-latency networking, over fully redundant, dedicated metro fiber providing high-throughput, low-latency networking between Availability Zones.

To recover from Availability Zone failures, you can use one of the following approaches:

  • Relocation capabilities (active-passive) – If your Redshift data warehouse is a single-AZ deployment and the cluster’s Availability Zone becomes unavailable, then Amazon Redshift will automatically move your cluster to another Availability Zone without any data loss or application changes. To activate this, you must enable cluster relocation for your provisioned cluster through configuration settings, which is automatically enabled for Redshift Serverless. Cluster relocation is free of cost, but it is a best-effort approach subject to resource availability in the Availability Zone being recovered in, and RTO can be impacted by other issues related to starting up a new cluster. This can result in recovery times between 10–60 minutes. To learn more about configuring Amazon Redshift relocation capabilities, refer to Build a resilient Amazon Redshift architecture with automatic recovery enabled.
  • Amazon Redshift Multi-AZ (active-active) – A Multi-AZ deployment allows you to run your data warehouse in multiple Availability Zones simultaneously and continue operating in unforeseen failure scenarios. No application changes are required to maintain business continuity because the Multi-AZ deployment is managed as a single data warehouse with one endpoint. Multi-AZ deployments reduce recovery time by guaranteeing capacity to automatically recover and are intended for customers with mission-critical analytics applications that require the highest levels of availability and resiliency to Availability Zone failures. This also allows you to implement a solution that is more compliant with the recommendations of the Reliability Pillar of the AWS Well-Architected Framework. Our pre-launch tests found that the RTO with Amazon Redshift Multi-AZ deployments is under 60 seconds or less in the unlikely case of an Availability Zone failure. To learn more about configuring Multi-AZ, refer to Enable Multi-AZ deployments for your Amazon Redshift data warehouse. As of writing, Redshift Serverless currently does not support Multi-AZ.

Region failure

Amazon Redshift currently supports single-Region deployments for clusters. However, you have several options to help with disaster recovery or accessing data across multi-Region scenarios.

Use a cross-Region snapshot

You can configure Amazon Redshift to copy snapshots for a cluster to another Region. To configure cross-Region snapshot copy, you need to enable this copy feature for each data warehouse (serverless and provisioned) and configure where to copy snapshots and how long to keep copied automated or manual snapshots in the destination Region. When cross-Region copy is enabled for a data warehouse, all new manual and automated snapshots are copied to the specified Region. In the event of a Region failure, you can restore your Redshift data warehouse in a new Region using the latest cross-Region snapshot.

The following diagram illustrates this architecture.

For more information about how to enable cross-Region snapshots, refer to the following:

Use a custom domain name

A custom domain name is easier to remember and use than the default endpoint URL provided by Amazon Redshift. With CNAME, you can quickly route traffic to a new cluster or workgroup created from snapshot in a failover situation. When a disaster happens, connections can be rerouted centrally with minimal disruption, without clients having to change their configuration.

For high availability, you should have a warm-standby cluster or workgroup available that regularly receives restored data from the primary cluster. This backup data warehouse could be in another Availability Zone or in a separate Region. You can redirect clients to the secondary Redshift cluster by setting up a custom domain name in the unlikely scenario of an entire Region failure.

In the following sections, we discuss how to use a custom domain name to handle Region failure in Amazon Redshift. Make sure the following prerequisites are met:

  • You need a registered domain name. You can use Amazon Route 53 or a third-party domain registrar to register a domain.
  • You need to configure cross-Region snapshots for your Redshift cluster or workgroup.
  • Turn on cluster relocation for your Redshift cluster. Use the AWS Command Line Interface (AWS CLI) to turn on relocation for a Redshift provisioned cluster. For Redshift Serverless, this is automatically enabled. For more information, see Relocating your cluster.
  • Take note of your Redshift endpoint. You can locate the endpoint by navigating to your Redshift workgroup or provisioned cluster name on the Amazon Redshift console.

Set up a custom domain with Amazon Redshift in the primary Region

In the hosted zone that Route 53 created when you registered the domain, create records to tell Route 53 how you want to route traffic to Redshift endpoint by completing the following steps:

  1. On the Route 53 console, choose Hosted zones in the navigation pane.
  2. Choose your hosted zone.
  3. On the Records tab, choose Create record.
  4. For Record name, enter your preferred subdomain name.
  5. For Record type, choose CNAME.
  6. For Value, enter the Redshift endpoint name. Make sure to provide the value by removing the colon (:), port, and database. For example, redshift-provisioned.eabc123.us-east-2.redshift.amazonaws.com.
  7. Choose Create records.

  1. Use the CNAME record name to create a custom domain in Amazon Redshift. For instructions, see Use custom domain names with Amazon Redshift.

You can now connect to your cluster using the custom domain name. The JDBC URL will be similar to jdbc:redshift://prefix.rootdomain.com:5439/dev?sslmode=verify-full, where prefix.rootdomain.com is your custom domain name and dev is the default database. Use your preferred editor to connect to this URL using your user name and password.

Steps to handle a Regional failure

In the unlikely situation of a Regional failure, complete the following steps:

  1. Use a cross-Region snapshot to restore a Redshift cluster or workgroup in your secondary Region.
  2. Turn on cluster relocation for your Redshift cluster in the secondary Region. Use the AWS CLI to turn on relocation for a Redshift provisioned cluster.
  3. Use the CNAME record name from the Route 53 hosted zone setup to create a custom domain in the newly created Redshift cluster or workgroup.
  4. Take note of the Redshift endpoint’s newly created Redshift cluster or workgroup.

Next, you need to update the Redshift endpoint in Route 53 for achieve seamless connectivity.

  1. On the Route 53 console, choose Hosted zones in the navigation pane.
  2. Choose your hosted zone.
  3. On the Record tab, select the CNAME record you created.
  4. Under Record details, choose Edit record.
  5. Change the value to the newly created Redshift endpoint. Make sure to provide the value by removing the colon (:), port, and database. For example, redshift-provisioned.eabc567.us-west-2.redshift.amazonaws.com.
  6. Choose Save.

Now when you connect to your custom domain name using the same JDBC URL from your application, you should be connected to your new cluster in your secondary Region.

Use active-active configuration

For business-critical applications that require high availability, you can set up an active-active configuration at the Region level. There are many ways to make sure all writes occur to all clusters; one way is to keep the data in sync between the two clusters by ingesting data concurrently into the primary and secondary cluster. You can also use Amazon Kinesis to sync the data between two clusters. For more details, see Building Multi-AZ or Multi-Region Amazon Redshift Clusters.

Additional considerations

In this section, we discuss additional considerations for your disaster recovery strategy.

Amazon Redshift Spectrum

Amazon Redshift Spectrum is a feature of Amazon Redshift that allows you to run SQL queries against exabytes of data stored in Amazon S3. With Redshift Spectrum, you don’t have to load or extract the data from Amazon S3 into Amazon Redshift before querying.

If you’re using external tables using Redshift Spectrum, you need to make sure it is configured and accessible on your secondary failover cluster.

You can set this up with the following steps:

  1. Replicate existing S3 objects between the primary and secondary Region.
  2. Replicate data catalog objects between the primary and secondary Region.
  3. Set up AWS Identity and Access Management (IAM) policies for accessing the S3 bucket residing in the secondary Region.

Cross-Region data sharing

With Amazon Redshift data sharing, you can securely share read access to live data across Redshift clusters, workgroups, AWS accounts, and Regions without manually moving or copying the data.

If you’re using cross-Region data sharing and one of the Regions has an outage, you need to have a business continuity plan to fail over your producer and consumer clusters to minimize the disruption.

In the event of an outage affecting the Region where the producer cluster is deployed, you can take the following steps to create a new producer cluster in another Region using a cross-Region snapshot and by reconfiguring data sharing, allowing your system to continue operating:

  1. Create a new Redshift cluster using the cross-Region snapshot. Make sure you have correct node type, node count, and security settings.
  2. Identify the Redshift data shares that were previously configured for the original producer cluster.
  3. Recreate these data shares on the new producer cluster in the target Region.
  4. Update the data share configurations in the consumer cluster to point to the newly created producer cluster.
  5. Confirm that the necessary permissions and access controls are in place for the data shares in the consumer cluster.
  6. Verify that the new producer cluster is operational and the consumer cluster is able to access the shared data.

In the event of an outage in the Region where the consumer cluster is deployed, you will need to create a new consumer cluster in a different Region. This makes sure all applications that are connecting to the consumer cluster continue to function as expected, with proper access.

The steps to accomplish this are as follows:

  1. Identify an alternate Region that is not affected by the outage.
  2. Provision a new consumer cluster in the alternate Region.
  3. Provide necessary access to data sharing objects.
  4. Update the application configurations to point to the new consumer cluster.
  5. Validate that all the applications are able to connect to the new consumer cluster and are functioning as expected.

For additional information on how to configure data sharing, refer to Sharing datashares.

Federated queries

With federated queries in Amazon Redshift, you can query and analyze data across operational databases, data warehouses, and data lakes. If you’re using federated queries, you need to set up federated queries from the failover cluster as well to prevent any application failure.

Summary

In this post, we discussed various failure scenarios and recovery strategies associated with Amazon Redshift. Disaster recovery solutions make restoring your data and workloads seamless so you can get business operations back online quickly after a catastrophic event.

As an administrator, you can now work on defining your Amazon Redshift disaster recovery strategy and implement it to minimize business disruptions. You should develop a comprehensive plan that includes:

  • Identifying critical Redshift resources and data
  • Establishing backup and recovery procedures
  • Defining failover and failback processes
  • Enforcing data integrity and consistency
  • Implementing disaster recovery testing and drills

Try out these strategies for yourself, and leave any questions and feedback in the comments section.


About the authors

Nita Shah is a Senior Analytics Specialist Solutions Architect at AWS based out of New York. She has been building data warehouse solutions for over 20 years and specializes in Amazon Redshift. She is focused on helping customers design and build enterprise-scale well-architected analytics and decision support platforms.

Poulomi Dasgupta is a Senior Analytics Solutions Architect with AWS. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems. Outside of work, she likes travelling and spending time with her family.

Ranjan Burman is an Analytics Specialist Solutions Architect at AWS. He specializes in Amazon Redshift and helps customers build scalable analytical solutions. He has more than 16 years of experience in different database and data warehousing technologies. He is passionate about automating and solving customer problems with cloud solutions.

Jason Pedreza is a Senior Redshift Specialist Solutions Architect at AWS with data warehousing experience handling petabytes of data. Prior to AWS, he built data warehouse solutions at Amazon.com and Amazon Devices. He specializes in Amazon Redshift and helps customers build scalable analytic solutions.

Agasthi Kothurkar is an AWS Solutions Architect, and is based in Boston. Agasthi works with enterprise customers as they transform their business by adopting the Cloud. Prior to joining AWS, he worked with leading IT consulting organizations on customers engagements spanning Cloud Architecture, Enterprise Architecture, IT Strategy, and Transformation. He is passionate about applying Cloud technologies to resolve complex real world business problems.

Build a real-time streaming generative AI application using Amazon Bedrock, Amazon Managed Service for Apache Flink, and Amazon Kinesis Data Streams

Post Syndicated from Felix John original https://aws.amazon.com/blogs/big-data/build-a-real-time-streaming-generative-ai-application-using-amazon-bedrock-amazon-managed-service-for-apache-flink-and-amazon-kinesis-data-streams/

Generative artificial intelligence (AI) has gained a lot of traction in 2024, especially around large language models (LLMs) that enable intelligent chatbot solutions. Amazon Bedrock is a fully managed service that offers a choice of high-performing foundation models (FMs) from leading AI companies such as AI21 Labs, Anthropic, Cohere, Meta, Mistral AI, Stability AI, and Amazon through a single API, along with a broad set of capabilities to help you build generative AI applications with security, privacy, and responsible AI. Use cases around generative AI are vast and go well beyond chatbot applications; for instance, generative AI can be used for analysis of input data such as sentiment analysis of reviews.

Most businesses generate data continuously in real-time. Internet of Things (IoT) sensor data, application log data from your applications, or clickstream data generated by users of your website are only some examples of continuously generated data. In many situations, the ability to process this data quickly (in real-time or near real-time) helps businesses increase the value of insights they get from their data.

One option to process data in real-time is using stream processing frameworks such as Apache Flink. Flink is a framework and distributed processing engine for processing data streams. AWS provides a fully managed service for Apache Flink through Amazon Managed Service for Apache Flink, which enables you to build and deploy sophisticated streaming applications without setting up infrastructure and managing resources.

Data streaming enables generative AI to take advantage of real-time data and provide businesses with rapid insights. This post looks at how to integrate generative AI capabilities when implementing a streaming architecture on AWS using managed services such as Managed Service for Apache Flink and Amazon Kinesis Data Streams for processing streaming data and Amazon Bedrock to utilize generative AI capabilities. We focus on the use case of deriving review sentiment in real-time from customer reviews in online shops. We include a reference architecture and a step-by-step guide on infrastructure setup and sample code for implementing the solution with the AWS Cloud Development Kit (AWS CDK). You can find the code to try it out yourself on the GitHub repo.

Solution overview

The following diagram illustrates the solution architecture. The architecture diagram depicts the real-time streaming pipeline in the upper half and the details on how you gain access to the Amazon OpenSearch Service dashboard in the lower half.

Architecture Overview

The real-time streaming pipeline consists of a producer that is simulated by running a Python script locally that is sending reviews to a Kinesis Data Stream. The reviews are from the Large Movie Review Dataset and contain positive or negative sentiment. The next step is the ingestion to the Managed Service for Apache Flink application. From within Flink, we are asynchronously calling Amazon Bedrock (using Anthropic Claude 3 Haiku) to process the review data. The results are then ingested into an OpenSearch Service cluster for visualization with OpenSearch Dashboards. We directly call the PutRecords API of Kinesis Data Streams within the Python script for the sake of simplicity and to cost-effectively run this example. You should consider using an Amazon API Gateway REST API as a proxy in front of Kinesis Data Streams when using a similar architecture in production, as described in Streaming Data Solution for Amazon Kinesis.

To gain access to the OpenSearch dashboard, we need to use a bastion host that is deployed in the same private subnet within your virtual private cloud (VPC) as your OpenSearch Service cluster. To connect with the bastion host, we use Session Manager, a capability of Amazon Systems Manager, which allows us to connect to our bastion host securely without having to open inbound ports. To access it, we use Session Manager to port forward the OpenSearch dashboard to our localhost.

The walkthrough consists of the following high-level steps:

  1. Create the Flink application by building the JAR file.
  2. Deploy the AWS CDK stack.
  3. Set up and connect to OpenSearch Dashboards.
  4. Set up the streaming producer.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Implementation details

This section focuses on the Flink application code of this solution. You can find the code on GitHub. The StreamingJob.java file inside the flink-async-bedrock directory file serves as entry point to the application. The application uses the FlinkKinesisConsumer, which is a connector for reading streaming data from a Kinesis Data Stream. It applies a map transformation to convert each input string into an instance of Review class object, resulting in DataStream<Review> to ease processing.

The Flink application uses the helper class AsyncDataStream defined in the StreamingJob.java file to incorporate an asynchronous, external operation into Flink. More specifically, the following code creates an asynchronous data stream by applying the AsyncBedrockRequest function to each element in the inputReviewStream. The application uses unorderedWait to increase throughput and reduce idle time because event ordering is not required. The timeout is set to 25,000 milliseconds to give the Amazon Bedrock API enough time to process long reviews. The maximum concurrency or capacity is limited to 1,000 requests at a time. See the following code:

DataStream<ProcessedReview> processedReviewStream = AsyncDataStream.unorderedWait(inputReviewStream, new AsyncBedrockRequest(applicationProperties), 25000, TimeUnit.MILLISECONDS, 1000).uid("processedReviewStream");

The Flink application initiates asynchronous calls to the Amazon Bedrock API, invoking the Anthropic Claude 3 Haiku foundation model for each incoming event. We use Anthropic Claude 3 Haiku on Amazon Bedrock because it is Anthropic’s fastest and most compact model for near-instant responsiveness. The following code snippet is part of the AsyncBedrockRequest.java file and illustrates how we set up the required configuration to call the Anthropic’s Claude Messages API to invoke the model:

@Override
public void asyncInvoke(Review review, final ResultFuture<ProcessedReview> resultFuture) throws Exception {

    // [..]

    JSONObject user_message = new JSONObject()
        .put("role", "user")
        .put("content", "<review>" + reviewText + "</review>");

    JSONObject assistant_message = new JSONObject()
        .put("role", "assistant")
        .put("content", "{");

    JSONArray messages = new JSONArray()
            .put(user_message)
            .put(assistant_message);

    String payload = new JSONObject()
            .put("system", systemPrompt)
            .put("anthropic_version", "bedrock-2023-05-31")
            .put("temperature", 0.0)
            .put("max_tokens", 4096)
            .put("messages", messages)
            .toString();

    InvokeModelRequest request = InvokeModelRequest.builder()
            .body(SdkBytes.fromUtf8String(payload))
            .modelId("anthropic.claude-3-haiku-20240307-v1:0")
            .build();

    CompletableFuture<InvokeModelResponse> completableFuture = client.invokeModel(request)
            .whenComplete((response, exception) -> {
                if (exception != null) {
                    LOG.error("Model invocation failed: " + exception);
                }
            })
            .orTimeout(250000, TimeUnit.MILLISECONDS);

Prompt engineering

The application uses advanced prompt engineering techniques to guide the generative AI model’s responses and provide consistent responses. The following prompt is designed to extract a summary as well as a sentiment from a single review:

String systemPrompt = 
     "Summarize the review within the <review> tags 
     into a single and concise sentence alongside the sentiment 
     that is either positive or negative. Return a valid JSON object with 
     following keys: summary, sentiment. 
     <example> {\\\"summary\\\": \\\"The reviewer strongly dislikes the movie, 
     finding it unrealistic, preachy, and extremely boring to watch.\\\", 
     \\\"sentiment\\\": \\\"negative\\\"} 
     </example>";

The prompt instructs the Anthropic Claude model to return the extracted sentiment and summary in JSON format. To maintain consistent and well-structured output by the generative AI model, the prompt uses various prompt engineering techniques to improve the output. For example, the prompt uses XML tags to provide a clearer structure for Anthropic Claude. Moreover, the prompt contains an example to enhance Anthropic Claude’s performance and guide it to produce the desired output. In addition, the prompt pre-fills Anthropic Claude’s response by pre-filling the Assistant message. This technique helps provide a consistent output format. See the following code:

JSONObject assistant_message = new JSONObject()
    .put("role", "assistant")
    .put("content", "{");

Build the Flink application

The first step is to download the repository and build the JAR file of the Flink application. Complete the following steps:

  1. Clone the repository to your desired workspace:
    git clone https://github.com/aws-samples/aws-streaming-generative-ai-application.git

  2. Move to the correct directory inside the downloaded repository and build the Flink application:
    cd flink-async-bedrock && mvn clean package

Building Jar File

Maven will compile the Java source code and package it in a distributable JAR format in the directory flink-async-bedrock/target/ named flink-async-bedrock-0.1.jar. After you deploy your AWS CDK stack, the JAR file will be uploaded to Amazon Simple Storage Service (Amazon S3) to create your Managed Service for Apache Flink application.

Deploy the AWS CDK stack

After you build the Flink application, you can deploy your AWS CDK stack and create the required resources:

  1. Move to the correct directory cdk and deploy the stack:
    cd cdk && npm install & cdk deploy

This will create the required resources in your AWS account, including the Managed Service for Apache Flink application, Kinesis Data Stream, OpenSearch Service cluster, and bastion host to quickly connect to OpenSearch Dashboards, deployed in a private subnet within your VPC.

  1. Take note of the output values. The output will look similar to the following:
 ✅  StreamingGenerativeAIStack

✨  Deployment time: 1414.26s

Outputs:
StreamingGenerativeAIStack.BastionHostBastionHostIdC743CBD6 = i-0970816fa778f9821
StreamingGenerativeAIStack.accessOpenSearchClusterOutput = aws ssm start-session --target i-0970816fa778f9821 --document-name AWS-StartPortForwardingSessionToRemoteHost --parameters '{"portNumber":["443"],"localPortNumber":["8157"], "host":["vpc-generative-ai-opensearch-qfssmne2lwpzpzheoue7rkylmi.us-east-1.es.amazonaws.com"]}'
StreamingGenerativeAIStack.bastionHostIdOutput = i-0970816fa778f9821
StreamingGenerativeAIStack.domainEndpoint = vpc-generative-ai-opensearch-qfssmne2lwpzpzheoue7rkylmi.us-east-1.es.amazonaws.com
StreamingGenerativeAIStack.regionOutput = us-east-1
Stack ARN:
arn:aws:cloudformation:us-east-1:<AWS Account ID>:stack/StreamingGenerativeAIStack/3dec75f0-cc9e-11ee-9b16-12348a4fbf87

✨  Total time: 1418.61s

Set up and connect to OpenSearch Dashboards

Next, you can set up and connect to OpenSearch Dashboards. This is where the Flink application will write the extracted sentiment as well as the summary from the processed review stream. Complete the following steps:

  1. Run the following command to establish connection to OpenSearch from your local workspace in a separate terminal window. The command can be found as output named accessOpenSearchClusterOutput.
    • For Mac/Linux, use the following command:
aws ssm start-session --target <BastionHostId> --document-name AWS-StartPortForwardingSessionToRemoteHost --parameters '{"portNumber":["443"],"localPortNumber":["8157"], "host":["<OpenSearchDomainHost>"]}'
    • For Windows, use the following command:
aws ssm start-session ^
    —target <BastionHostId> ^
    —document-name AWS-StartPortForwardingSessionToRemoteHost ^    
    —parameters host="<OpenSearchDomainHost>",portNumber="443",localPortNumber="8157"

It should look similar to the following output:

Session Manager CLI

  1. Create the required index in OpenSearch by issuing the following command:
    • For Mac/Linux, use the following command:
curl --location -k --request PUT https://localhost:8157/processed_reviews \
--header 'Content-Type: application/json' \
--data-raw '{
  "mappings": {
    "properties": {
        "reviewId": {"type": "integer"},
        "userId": {"type": "keyword"},
        "summary": {"type": "keyword"},
        "sentiment": {"type": "keyword"},
        "dateTime": {"type": "date"}}}}}'
    • For Windows, use the following command:
$url = https://localhost:8157/processed_reviews
$headers = @{
    "Content-Type" = "application/json"
}
$body = @{
    "mappings" = @{
        "properties" = @{
            "reviewId" = @{ "type" = "integer" }
            "userId" = @{ "type" = "keyword" }
            "summary" = @{ "type" = "keyword" }
            "sentiment" = @{ "type" = "keyword" }
            "dateTime" = @{ "type" = "date" }
        }
    }
} | ConvertTo-Json -Depth 3
Invoke-RestMethod -Method Put -Uri $url -Headers $headers -Body $body -SkipCertificateCheck
  1. After the session is established, you can open your browser and navigate to https://localhost:8157/_dashboards. Your browser might consider the URL not secure. You can ignore this warning.
  2. Choose Dashboards Management under Management in the navigation pane.
  3. Choose Saved objects in the sidebar.
  4. Import export.ndjson, which can be found in the resources folder within the downloaded repository.

OpenSearch Dashboards Upload

  1. After you import the saved objects, you can navigate to Dashboards under My Dashboard in the navigation pane.

At the moment, the dashboard appears blank because you haven’t uploaded any review data to OpenSearch yet.

Set up the streaming producer

Finally, you can set up the producer that will be streaming review data to the Kinesis Data Stream and ultimately to the OpenSearch Dashboards. The Large Movie Review Dataset was originally published in 2011 in the paper “Learning Word Vectors for Sentiment Analysis” by Andrew L. Maas, Raymond E. Daly, Peter T. Pham, Dan Huang, Andrew Y. Ng, and Christopher Potts. Complete the following steps:

  1. Download the Large Movie Review Dataset here.
  2. After the download is complete, extract the .tar.gz file to retrieve the folder named aclImdb 3 or similar that contains the review data. Rename the review data folder to aclImdb.
  3. Move the extracted dataset to data/ inside the repository that you previously downloaded.

Your repository should look like the following screenshot.

Folder Overview

  1. Modify the DATA_DIR path in producer/producer.py if the review data is named differently.
  2. Move to the producer directory using the following command:
    cd producer

  3. Install the required dependencies and start generating the data:
    pip install -r requirements.txt && python produce.py

The OpenSearch dashboard should be populated after you start generating streaming data and writing it to the Kinesis Data Stream. Refresh the dashboard to view the latest data. The dashboard shows the total number of processed reviews, the sentiment distribution of the processed reviews in a pie chart, and the summary and sentiment for the latest reviews that have been processed.

When you have a closer look at the Flink application, you will notice that the application marks the sentiment field with the value error whenever there is an error with the asynchronous call made by Flink to the Amazon Bedrock API. The Flink application simply filters the correctly processed reviews and writes them to the OpenSearch dashboard.

For robust error handling, you should write any incorrectly processed reviews to a separate output stream and not discard them completely. This separation allows you to handle failed reviews differently than successful ones for simpler reprocessing, analysis, and troubleshooting.

Clean up

When you’re done with the resources you created, complete the following steps:

  1. Delete the Python producer using Ctrl/Command + C.
  2. Destroy your AWS CDK stack by returning to the root folder and running the following command in your terminal:
    cd cdk && cdk destroy

  3. When asked to confirm the deletion of the stack, enter yes.

Conclusion

In this post, you learned how to incorporate generative AI capabilities in your streaming architecture using Amazon Bedrock and Managed Service for Apache Flink using asynchronous requests. We also gave guidance on prompt engineering to derive the sentiment from text data using generative AI. You can build this architecture by deploying the sample code from the GitHub repository.

For more information on how to get started with Managed Service for Apache Flink, refer to Getting started with Amazon Managed Service for Apache Flink (DataStream API). For details on how to set up Amazon Bedrock, refer to Set up Amazon Bedrock. For other posts on Managed Service for Apache Flink, browse through the AWS Big Data Blog.


About the Authors

Felix John is a Solutions Architect and data streaming expert at AWS, based in Germany. He focuses on supporting small and medium businesses on their cloud journey. Outside of his professional life, Felix enjoys playing Floorball and hiking in the mountains.

Michelle Mei-Li Pfister is a Solutions Architect at AWS. She is supporting customers in retail and consumer packaged goods (CPG) industry on their cloud journey. She is passionate about topics around data and machine learning.

Amazon DataZone announces custom blueprints for AWS services

Post Syndicated from Anish Anturkar original https://aws.amazon.com/blogs/big-data/amazon-datazone-announces-custom-blueprints-for-aws-services/

Last week, we announced the general availability of custom AWS service blueprints, a new feature in Amazon DataZone allowing you to customize your Amazon DataZone project environments to use existing AWS Identity and Access Management (IAM) roles and AWS services to embed the service into your existing processes. In this post, we share how this new feature can help you in federating to your existing AWS resources using your own IAM role. We also delve into details on how to configure data sources and subscription targets for a project using a custom AWS service blueprint.

New feature: Custom AWS service blueprints

Previously, Amazon DataZone provided default blueprints that created AWS resources required for data lake, data warehouse, and machine learning use cases. However, you may have existing AWS resources such as Amazon Redshift databases, Amazon Simple Storage Service (Amazon S3) buckets, AWS Glue Data Catalog tables, AWS Glue ETL jobs, Amazon EMR clusters, and many more for your data lake, data warehouse, and other use cases. With Amazon DataZone default blueprints, you were limited to only using preconfigured AWS resources that Amazon DataZone created. Customers needed a way to integrate these existing AWS service resources with Amazon DataZone, using a customized IAM role so that Amazon DataZone users can get federated access to those AWS service resources and use the publication and subscription features of Amazon DataZone to share and govern them.

Now, with custom AWS service blueprints, you can use your existing resources using your preconfigured IAM role. Administrators can customize Amazon DataZone to use existing AWS resources, enabling Amazon DataZone portal users to have federated access to those AWS services to catalog, share, and subscribe to data, thereby establishing data governance across the platform.

Benefits of custom AWS service blueprints

Custom AWS service blueprints don’t provision any resources for you, unlike other blueprints. Instead, you can configure your IAM role (bring your own role) to integrate your existing AWS resources with Amazon DataZone. Additionally, you can configure action links, which provide federated access to any AWS resources like S3 buckets, AWS Glue ETL jobs, and so on, using your IAM role.

You can also configure custom AWS service blueprints to bring your own resources, namely AWS databases, as data sources and subscription targets to enhance governance across those assets. With this release, administrators can configure data sources and subscription targets on the Amazon DataZone console and not be restricted to do those actions in the data portal.

Custom blueprints and environments can only be set up by administrators to manage access to configured AWS resources. As custom environments are created in specific projects, the right to grant access to custom resources is delegated to the project owners who can manage project membership by adding or removing members. This restricts the ability of portal users to create custom environments without the right permissions in AWS Console for Amazon DataZone or access custom AWS resources configured in a project that they are not a member of.

Solution overview

To get started, administrators need to enable the custom AWS service blueprints feature on the Amazon DataZone console. Then administrators can customize configurations by defining which project and IAM role to use when federating to the AWS services that are set up as action links for end-users. After the customized set up is complete, when a data producer or consumer logs in to the Amazon DataZone portal and if they’re part of those customized projects, they can federate to any of the configured AWS services such as Amazon S3 to upload or download files or seamlessly go to existing AWS Glue ETL jobs using their own IAM roles and continue their work with data with the customized tool of choice. With this feature, you can how include Amazon DataZone in your existing data pipeline processes to catalog, share, and govern data.

The following diagram shows an administrator’s workflow to set up a custom blueprint.

In the following sections, we discuss common use cases for custom blueprints, and walk through the setup step by step. If you’re new to Amazon DataZone, refer to Getting started.

Use case 1: Bring your own role and resources

Customers manage data platforms that consist of AWS managed services such as AWS Lake Formation, Amazon S3 for data lakes, AWS Glue for ETL, and so on. With those processes already set up, you may want to bring your own roles and resources to Amazon DataZone to continue with an existing process without any disruption. In such cases, you may not want Amazon DataZone to create new resources because it disrupts existing processes in data pipelines and to also curtail AWS resource usage and costs.

In the current setup, you can create an Amazon DataZone domain associated with different accounts. There could be a dedicated account that acts like a producer to share data, and a few other consumer accounts to subscribe to published assets in the catalog. The consumer account has IAM permissions set up for the AWS Glue ETL job to use for the subscription environment of a project. By doing so, the role has access to the newly subscribed data as well as permissions from previous setups to access data from other AWS resources. After you configure the AWS Glue job IAM role in the environment using the custom AWS service blueprint, the authorized users of that role can use the subscribed assets in the AWS Glue ETL job and extend that data for downstream activities to store them in Amazon S3 and other databases to be queried and analyzed using the Amazon Athena SQL editor or Amazon QuickSight.

Use case 2: Amazon S3 multi-file downloads

Customers and users of the Amazon DataZone portal often need the ability to download files after searching and filtering through the catalog in an Amazon DataZone project. This requirement arises because the data and analytics associated with a particular use case can sometimes involve hundreds of files. Downloading these files individually would be a tedious and time-consuming process for Amazon DataZone users. To address this need, the Amazon DataZone portal can take advantage of the capabilities provided by custom AWS service blueprints. These custom blueprints allow you to configure action links to S3 bucket folders associated with specified Amazon DataZone projects.

You can build projects and subscribe to both unstructured and structured data assets within the Amazon DataZone portal. For structured datasets, you can use Amazon DataZone blueprint-based environments like data lakes (Athena) and data warehouses (Amazon Redshift). For unstructured data assets, you can use the custom blueprint-based Amazon S3 environment, which provides a familiar Amazon S3 browser interface with access to specific buckets and folders, using an IAM role owned and provided by the customer. This functionality streamlines the process of finding and accessing unstructured data and allows you to download multiple files at once, enabling you to build and enhance your analytics more efficiently.

Use case 3: Amazon S3 file uploads

In addition to the download functionality, users often need to retain and attach metadata to new versions of files. For example, when you download a file, you can perform data changes, enrichment, or analysis on the file, and then upload the updated version back to the Amazon DataZone portal. For uploading files, Amazon DataZone users can use the same custom blueprint-based Amazon S3 environment action links to upload files.

Use case 4: Extend existing environments to custom blueprint environments

You may have existing Amazon DataZone project environments created using default data lake and data warehouse blueprints. With other AWS services set up in the data platform, you may want to extend the configured project environments to include those additional services to provide a seamless experience for your data producers or consumers while switching between tools.

Now that you understand the capabilities of the new feature, let’s look at how administrators can set up a custom role and resources on the Amazon DataZone console.

Create a domain

First, you need an Amazon DataZone domain. If you already have one, you can skip to enabling your custom blueprints. Otherwise, refer to Create domains for instructions to set up a domain. Optionally, you can associate accounts if you want to set up Amazon DataZone across multiple accounts.

Associate accounts for cross-account scenarios

You can optionally associate accounts. For instructions, refer to Request association with other AWS accounts. Make sure to use the latest AWS Resource Access Manager (AWS RAM) DataZonePortalReadWrite policy when requesting account association. If your account is already associated, request access again with the new policy.

Accept the account association request

To accept the account associated request, refer to Accept an account association request from an Amazon DataZone domain and enable an environment blueprint. After you accept the account association, you should see the following screenshot.

Add associated account users in the Amazon DataZon domain account

With this launch, you can set up associated account owners to access the Amazon DataZone data portal from their account. To enable this, they need to be registered as users in the domain account. As a domain admin, you can create Amazon DataZone user profiles to allow Amazon DataZone access to users and roles from the associated account. Complete the following steps:

  1. On the Amazon DataZone console, navigate to your domain.
  2. On the User management tab, choose Add IAM Users from the Add dropdown menu.
  3. Enter the ARNs of your associated account IAM users or roles. For this post, we add arn:aws:iam::123456789101:role/serviceBlueprintRole and arn:aws:iam::123456789101:user/Jacob.
  4. Choose Add users(s).

Back on the User management tab, you should see the new user state with Assigned status. This means that the domain owner has assigned associated account users to access Amazon DataZone. This status will change to Active when the identity starts using Amazon DataZone from the associated account.

As of writing this post, there is a maximum limit of adding six identities (users or roles) per associated account.

Enable the custom AWS service blueprint feature

You can enable custom AWS service blueprints in the domain account or the associated account, according to your requirements. Complete the following steps:

  1. On the Account associations tab, choose the associated domain.
  2. Choose the AWS service blueprint.
  3. Choose Enable.

Create an environment using the custom blueprint

If an associated account is being used to create this environment, use the same associated account IAM identity assigned by the domain owner in the previous step. Your identity needs to be explicitly assigned a user profile in order for you to create this environment. Complete the following steps:

  1. Choose the custom blueprint.
  2. In the Created environments section, choose Create environment.
  3. Select Create and use a new project or use an existing project if you already have one.
  4. For Environment role, choose a role. For this post, we curated a cross-account role called AmazonDataZoneAdmin and gave it AdministratorAccess This is the bring your own role feature. You should curate your role according to your requirements. Here are some guidelines on how to set up custom role as we have used a more permissible policy for this blog:
    1. You can use AWS Policy Generator to build a policy that fits your requirements and attach it to the custom IAM role you want to use.
    2. Make sure the role begins with AmazonDataZone* to follow conventions. This is not mandatory, but recommended. If the IAM admin is using an AmazonDataZoneFullAccess policy, you need to follow this convention because there is a pass role check validation.
    3. When you create the CustomRole (AWSDataZone*) make sure it trusts amazonaws.com in its trust policy:
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "datazone.amazonaws.com"
                ]
            },
            "Action": [
                "sts:AssumeRole",
                "sts:TagSession"
            ]
        }
    ]
}
  1. For Region, choose an AWS Region.
  2. Choose Create environment.

Although you could use the same IAM role for multiple environments in a project, the recommendation is to not use a same IAM role for multiple environments across projects. Subscription grants are fulfilled at the project construct and therefore we don’t allow the same environment role to be used across different projects.

Configure custom action links

After you create the AWS service environment, you can configure any AWS Management Console links to your environment. Amazon DataZone will assume the custom role to help federate environment users to the configured action links. Complete the following steps:

  1. In your environment, choose Customize AWS links.
  2. Configure any S3 buckets, Athena workgroups, AWS Glue jobs, or other custom resources.
  3. Select Custom AWS links and enter any AWS service console custom resources. For this post, we link to the Amazon Relational Database Service (Amazon RDS) console.

You should now see the console links set up for your environment.

Access resources using a custom role through the Amazon DataZone portal from an associated account

Associate account users who have been added to Amazon DataZone can access the data portal from their associated account directly. Complete the following steps:

  1. In your environment, in the Summary section, choose the My Environment link.

You should see all your configured resources (role and action links) for your environment.

  1. Choose any action link to navigate to the appropriate console resources.
  2. Choose any action link for a custom resource (for this post, Amazon RDS).

You’re directed to the appropriate service console.

With this setup, you have now configured a custom AWS service blueprint to use your own role for the environment to use for data access as well. You have also set up action links for configured AWS resources to be shown to data producers and consumers in the Amazon DataZone data portal. With these links, you can federate to those services in a single click and take the project context along while working with the data.

Configure data sources and subscription targets

Additionally, administrators can now configure data sources and subscription targets on the Amazon DataZone console using custom AWS service blueprint environments. This needs to be configured to set up the database role ManagedAccessRole to the data source and subscription target, which you can’t do through the Amazon DataZone portal.

Configure data sources in the custom AWS service blueprint environment for publishing

Complete the following steps to configure your data source:

  1. On the Amazon DataZone console, navigate to the custom AWS service blueprint environment you just created.
  2. On the Data sources tab, choose Add
  3. Select AWS Glue or Amazon Redshift.
  4. For AWS Glue, complete the following steps:
    1. Enter your AWS Glue database. If you don’t already have an existing AWS Glue database setup, refer to Create a database.
    2. Enter the manageAccessRole role that is added as a Lake Formation admin. Make sure the role provided has aws.internal in its trust policy. The role starts with AmazonDataZone*.
    3. Choose Add.
  1. For Amazon Redshift, complete the following steps:
    1. Select Cluster or Serverless. If you don’t already have a Redshift cluster, refer to Create a sample Amazon Redshift cluster. If you don’t already have an Amazon Redshift Serverless workgroup, refer Amazon Redshift Serverless to create a sample database.
    2. Choose Create new AWS Secret or use a preexisting one.
    3. If you’re creating a new secret, enter a secret name, user name, and password.
  2. Choose the cluster or workgroup you want to connect to.
  3. Enter the database and schema names.
  4. Enter the role ARN for manageAccessRole.
  5. Choose Add.

Configure a subscription target in the AWS service environment for subscribing

Complete the following steps to add your subscription target

  1. On the Amazon DataZone console, navigate the custom AWS service blueprint environment you just created.
  2. On the Subscription targets tab, choose Add.
  3. Follow the same steps as you did to set up a data source.
  4. For Redshift subscription targets, you also need to add a database role that will be granted access to the given schema. You can enter a specific Redshift user role or, if you’re a Redshift admin, enter sys:superuser.
  5. Create a new tag on the environment role (BYOR) with RedshiftDbRoles as key and the database name used for configuring the Redshift subscription target as value.

Extend existing data lake and data warehouse blueprints

Finally, if you want to extend existing data lake or data warehouse project environments to create to use existing AWS services in the platform, complete the following steps:

  1. Create a copy of the environment role of an existing Amazon DataZone project environment.
  2. Extend this role by adding additional required policies to allow this custom role to access additional resources.
  3. Create a custom AWS service environment in the same Amazon DataZone project using this new custom role.
  4. Configure the subscription target and data source using the database name of the existing Amazon DataZone environment (<env_name>_pub_db, <env_name>_sub_db).
  5. Use the same managedAccessRole role from the existing Amazon DataZone environment.
  6. Request subscription to the required data assets or add subscribed assets from the project to this new AWS service environment.

Clean up

To clean up your resources, complete the following steps:

  1. If you used sample code for AWS Glue and Redshift databases, make sure to clean up all those resources to avoid incurring additional charges. Delete any S3 buckets you created as well.
  2. On the Amazon DataZone console, delete the projects used in this post. This will delete most project-related objects like data assets and environments.
  3. On the Lake Formation console, delete the Lake Formation admins registered by Amazon DataZone.
  4. On the Lake Formation console, delete any tables and databases created by Amazon DataZone.

Conclusion

In this post, we discussed how the custom AWS service blueprint simplifies the process to start using existing IAM roles and AWS services in Amazon DataZone for end-to-end governance of your data in AWS. This integration helps you circumvent the prescriptive default data lake and data warehouse blueprints.

To learn more about Amazon DataZone and how to get started, refer to the Getting started guide. Check out the YouTube playlist for some of the latest demos of Amazon DataZone and more information about the capabilities available.


About the Authors

Anish Anturkar is a Software Engineer and Designer and part of Amazon DataZone with an expertise in distributed software solutions. He is passionate about building robust, scalable, and sustainable software solutions for his customers.

Navneet Srivastava is a Principal Specialist and Analytics Strategy Leader, and develops strategic plans for building an end-to-end analytical strategy for large biopharma, healthcare, and life sciences organizations. Navneet is responsible for helping life sciences organizations and healthcare companies deploy data governance and analytical applications, electronic medical records, devices, and AI/ML-based applications, while educating customers about how to build secure, scalable, and cost-effective AWS solutions. His expertise spans across data analytics, data governance, AI, ML, big data, and healthcare-related technologies.

Priya Tiruthani is a Senior Technical Product Manager with Amazon DataZone at AWS. She focuses on improving data discovery and curation required for data analytics. She is passionate about building innovative products to simplify customers’ end-to-end data journey, especially around data governance and analytics. Outside of work, she enjoys being outdoors to hike, capture nature’s beauty, and recently play pickleball.

Subrat Das is a Senior Solutions Architect and part of the Global Healthcare and Life Sciences industry division at AWS. He is passionate about modernizing and architecting complex customer workloads. When he’s not working on technology solutions, he enjoys long hikes and traveling around the world.

Access Amazon Redshift data from Salesforce Data Cloud with Zero Copy Data Federation

Post Syndicated from Vijay Gopalakrishnan original https://aws.amazon.com/blogs/big-data/access-amazon-redshift-data-from-salesforce-data-cloud-with-zero-copy-data-federation/

This post is co-authored by Vijay Gopalakrishnan, Director of Product, Salesforce Data Cloud.

In today’s data-driven business landscape, organizations collect a wealth of data across various touch points and unify it in a central data warehouse or a data lake to deliver business insights. This data is primarily used for analytical and machine learning purposes, but not easily accessible by the business users across Sales, Service, and Marketing teams to make data driven decisions. Salesforce and Amazon collaborated to address this challenge, by making the data accessible to the users in the flow of their work, with Zero Copy Data Federation between Salesforce Data Cloud and Amazon Redshift. This solution empowers businesses to access Redshift data within the Salesforce Data Cloud, breaking down data silos, gaining deeper insights, and creating unified customer profiles to deliver highly personalized experiences across various touchpoints. By eliminating the need for data replication, this integration improves efficiency and reduces costs while enabling real-time access to valuable business data.

In this post, we explore the benefits of the new Zero Copy Data Federation and provide a step-by-step guidance to configure it in Salesforce Data Cloud.

What is Salesforce Data Cloud?

Salesforce Data Cloud is a data platform that unifies all of your company’s data into Salesforce’s Einstein 1 Platform, giving every team a 360-degree view of the customer to drive automation, create analytics, personalize engagement, and power trusted artificial intelligence (AI). Data Cloud creates a holistic customer view by turning volumes of disconnected data into a unified customer profile that’s straightforward to access and understand. This includes diverse datasets like telemetry data, web engagement data, and more across your organization or your external data lakes and warehouses. This unified view helps your Sales, Service, and Marketing teams build personalized customer experiences, invoke data-driven actions and workflows, and safely drive AI across all your Salesforce apps.

What is Amazon Redshift?

Amazon Redshift is a fast, fully managed, petabyte-scale data warehouse service that makes it simple and cost-effective to efficiently analyze all your data using your existing business intelligence (BI) tools. It’s optimized for datasets ranging from a few hundred gigabytes to a petabyte or more and delivers better price-performance compared to most traditional data warehousing solutions. With a fully managed AI powered massively parallel processing (MPP) architecture, Amazon Redshift makes business decision-making quick and cost-effective.

What is Zero Copy Data Federation?

Zero Copy Data Federation, a Salesforce Data Cloud capability, unifies Salesforce and Amazon Redshift data through a point-and-click interface. It provides secure, real-time access to Redshift data without copying, keeping enterprise data in place. This eliminates replication overhead and ensures access to current information, enhancing data integration while maintaining data integrity and efficiency.

Data federated from Amazon Redshift is represented as a native data cloud object which power various Data Cloud features, including marketing segmentation, activations, and process automation. With these capabilities at your fingertips, you can enrich unified customer profile in Salesforce Data Cloud with transaction data from Amazon Redshift to create a rich customer 360, gain insights, harness predictive and generative AI on the unified data, and ultimately deliver highly personalized experiences across multiple touchpoints.

The following diagram depicts Zero Copy Data Federation flow, key features enabled and few potential actions and activations.

solution architecture

Connection to Amazon Redshift is established by deploying a data stream in Salesforce Data Cloud. When you deploy a data stream from Amazon Redshift to Data Cloud, an external data lake object (DLO) is created within the Data Cloud environment. This external DLO acts as a storage container, housing metadata for your federated Redshift data. Importantly, the DLO serves as a reference, pointing to the data physically stored in your Redshift data warehouse, keeping your data in its original location. Similar to native DLOs, the Amazon Redshift backed external DLOs can power several key features, including batch transform, calculated insights, identity resolution, query, segmentation, and activation, among others. Customer unified profiles enriched with Redshift data could be actioned by Amazon SageMaker to drive predictive outcomes and activated across several platforms, including Amazon Ads and Salesforce Marketing Cloud, for creating audience journeys and running targeted campaigns.

To increase performance, you can opt for acceleration, which is designed to enhance query runtimes. For more information on this feature, refer to Acceleration in Data Federation.

To summarize, Zero Copy Data Federation provides the following benefits:

  • Unified data view: Integrates external data seamlessly with Salesforce data for a comprehensive customer view.
  • Real-time access: Provides near real-time access to data stored in external sources like Amazon Redshift.
  • Data efficiency: Eliminates the need to copy or move large datasets, reducing storage costs and data duplication.
  • Cost-effective: Reduces data transfer pipeline and storage costs associated with traditional data integration methods.
  • Enhanced security: Data remains in its original secure environment, reducing exposure risks.
  • Streamlined compliance: Simplifies data governance by maintaining data in its original, regulated environment.

Prerequisites

Before configuring data federation, you must have access to Salesforce Data Cloud and the information to connect to your Redshift provisioned or serverless warehouse. The Redshift warehouse must be publicly accessible and it is recommended to restrict access by allow listing only the Data Cloud IP addresses.

For information on setting up an Amazon Redshift Serverless or Amazon Redshift provisioned cluster, refer to Amazon Redshift Serverless or Amazon Redshift provisioned clusters, respectively.

Configure Zero Copy Data Federation

To federate Redshift data to Salesforce Data Cloud, start by configuring a Redshift connection.

  1. Log in to Salesforce Data Cloud and navigate to Data Cloud Setup.
    Step 1 - Navigate to Data Cloud Setup
  2. In the navigation pane, choose Connectors under Configuration.
    Step 2 choose Connectors under Configuration.
  3. Choose New, choose Amazon Redshift, and choose Next.
    Step 3 choose New, choose Amazon Redshift, and choose Next.
  4. Retrieve the Redshift endpoint by navigating to the Redshift Serverless or provisioned cluster in the AWS console. Following image shows how to obtain the endpoint URL for Redshift serverless.
    Step 4 Retrieve the Redshift endpoint
  5. Back in Salesforce Data Cloud, configure the connector with a unique name and enter the endpoint from your Redshift server.
  6. Enter the user name and password configured for your Redshift serverless namespace.
  7. Enter the name of the database configured in your Redshift serverless namespace.
    Configure the coonector
  8. Choose Test Connection to confirm you’re able to successfully connect to the Redshift instance and choose Save.
    Confirm connection and Save

Create a Redshift Zero Copy Data Federation data stream

Complete the following steps to create a data stream using the connection you created:

  1. Navigate to Data Cloud and choose Data Streams in the navigation bar.
  2. Choose New to set up a new data stream.
    set up a new data stream
  3. Choose Amazon Redshift and choose Next.
    Select Amazon Redshift
  4. Choose your connector, database, and objects, then choose Next.
    Choose your connector, database, and objects, then choose Next.
  5. Configure the object, category, primary key, and fields:
    1. Set the object name and object API name. For more information, see Data Lake Object Naming Standards.
    2. Set the category to specify the type of data to ingest. For more information, see Category.
    3. Set the primary key to identify the incoming records uniquely. For more information, see Primary Key.
    4. Select the source fields you want to ingest.
  6. Choose Next.
    Configure the object, category, primary key, and fields. And choose Next
  7. Select the relevant data space. Choose default if you don’t have any other data space provisioned in your organization. For more information, see Manage Data Spaces.
  8. If you want to query the data in your Redshift instance with reduced latency, select Enable acceleration and choose your acceleration schedule. For more information, see Acceleration in Data Federation.
  9. Choose Deploy.
    deploy

On successful deployment, a data stream is created.

On successful deployment, a data stream is created.

Use cases for Zero Copy Data Federation

The following are key use cases enabled by Zero Copy Data Federation between Redshift and Salesforce Data Cloud:

  • Marketing insurance campaign journey – Combine customer profile, insurance policy, and plan data in Amazon Redshift with customer data in Salesforce Cloud for targeted outreach campaigns in Marketing Cloud. This facilitates cross-selling of other financial products.
  • Targeted promotions and customer outreach – Merge customer purchase and profile data from Amazon Redshift with customer feedback and service data in Salesforce for targeted customer outreach in Marketing Cloud, including promotional deals.
  • Customer satisfaction using service cloud data – Combine customer and case data in Salesforce with customer feedback data in Amazon Redshift to determine customer satisfaction ratings, enhancing service quality.
  • Prioritized offers and data-driven next-best actions – Utilize customer billing accounts and service data from Salesforce along with prospect, order, and billing data in Amazon Redshift to generate prioritized offers and next-best actions. The transition from ETL pipelines to Zero Copy BYOL integration has streamlined operations.
  • Customer segmentation and activation – Federate purchase data and billing history from Amazon Redshift to enrich unified profiles in Salesforce Data Cloud and generate actionable insights based on the recency, frequency, and monetary value to create customer segments and activate to your desired source.
  • Customer 360 with rich insights – Enrich customer profiles in Salesforce Data Cloud with purchase, billing, and product data from Amazon Redshift to empower Marketing, Sales, and Service teams to improve customer engagement with rich customer insights.

Conclusion

Zero Copy Data Federation between Salesforce Data Cloud and Amazon Redshift empowers businesses to break down data silos, enhance customer experiences, and drive operational efficiencies. By federating Redshift data to Salesforce Data Cloud, organizations can make informed decisions faster, personalize customer interactions at scale, and optimize resources across marketing, sales, service, and operations. This integration sets a new standard for data-driven business success in the digital age. Check out the Salesforce Zero Copy Data Federation announcement and the following resources to learn more and get started:


About the Authors

Vijay Gopalakrishnan is a Director of Product Management with Salesforce with several years of experience in the data space. He currently is a part of the Salesforce Data Cloud team.

Ravi Bhattiprolu is a Sr. Partner Solutions Architect at AWS. Ravi works with strategic ISV partners, Salesforce and Tableau, to deliver innovative and well-architected products and solutions that help joint customers achieve their business and technical objectives.

Avijit Goswami is a Principal Solutions Architect at AWS specialized in data and analytics. He supports AWS strategic customers in building high-performing, secure, and scalable data lake solutions on AWS using AWS managed services and open-source solutions. Outside of his work, Avijit likes to travel, hike, watch sports, and listen to music.

Ife Stewart is a Principal Solutions Architect in the Strategic ISV segment at AWS. She has been engaged with Salesforce Data Cloud over the last 2 years to help build integrated customer experiences across Salesforce and AWS. Ife has over 10 years of experience in technology. She is an advocate for diversity and inclusion in the technology field.

Mike Patterson is a Senior Customer Solutions Manager in the Strategic ISV segment at AWS. He has partnered with Salesforce Data Cloud to align business objectives with innovative AWS solutions to achieve impactful customer experiences. In Mike’s spare time, he enjoys spending time with his family, sports, and outdoor activities.

Perform reindexing in Amazon OpenSearch Serverless using Amazon OpenSearch Ingestion

Post Syndicated from Utkarsh Agarwal original https://aws.amazon.com/blogs/big-data/perform-reindexing-in-amazon-opensearch-serverless-using-amazon-opensearch-ingestion/

Amazon OpenSearch Serverless is a serverless deployment option for Amazon OpenSearch Service that makes it straightforward to run search and analytics workloads without managing infrastructure. Customers using OpenSearch Serverless often need to copy documents between two indexes within the same collection or across different collections. This primarily arises from two scenarios:

  • Reindexing – You frequently need to update or modify index mapping due to evolving data needs or schema changes
  • Disaster recovery – Although OpenSearch Serverless data is inherently durable, you may want to copy data across AWS Regions for added redundancy and resiliency

Amazon OpenSearch Ingestion had recently introduced a feature supporting OpenSearch as a source. OpenSearch Ingestion, a fully managed, serverless data collector, facilitates real-time ingestion of log, metric, and trace data into OpenSearch Service domains and OpenSearch Serverless collections. We can leverage this feature to address these two scenarios, by reading the data from an OpenSearch Serverless Collection. This capability allows you to effortlessly copy data between indexes, making data management tasks more streamlined and eliminating the need for custom code.

In this post, we outline the steps to copy data between two indexes in the same OpenSearch Serverless collection using the new OpenSearch source feature of OpenSearch Ingestion. This is particularly useful for reindexing operations where you want to change your data schema. OpenSearch Serverless and OpenSearch Ingestion are both serverless services that enable you to seamlessly handle your data workflows, providing optimal performance and scalability.

Solution overview

The following diagram shows the flow of copying documents from the source index to the destination index using an OpenSearch Ingestion pipeline.

Implementing the solution consists of the following steps:

  1. Create an AWS Identity and Access Management (IAM) role to use as an OpenSearch Ingestion pipeline role.
  2. Update the data access policy attached to the OpenSearch Serverless collection.
  3. Create an OpenSearch Ingestion pipeline that simply copies data from one index to another, or you can even create an index template using the OpenSearch Ingestion pipeline to define explicit mapping, and then copy the data from the source index to the destination index with the defined mapping applied.

Prerequisites

To get started, you must have an active OpenSearch Serverless collection with an index that you want to reindex (copy). Refer to Creating collections to learn more about creating a collection.

When the collection is ready, note the following details:

  • The endpoint of the OpenSearch Serverless collection
  • The name of the index from which the documents need to be copied
  • If the collection is defined as a VPC collection, note down the name of the network policy attached to the collection

You use these details in the ingestion pipeline configuration.

Create an IAM role to use as a pipeline role

An OpenSearch Ingestion pipeline needs certain permissions to pull data from the source and write to its sink. For this walkthrough, both the source and sink are the same, but if the source and sink collections are different, modify the policy accordingly.

Complete the following steps:

  1. Create an IAM policy (opensearch-ingestion-pipeline-policy) that provides permission to read and send data to the OpenSearch Serverless collection. The following is a sample policy with least privileges (modify {account-id}, {region}, {collection-id} and {collection-name} accordingly):
    {
        "Version": "2012-10-17",
        "Statement": [{
                "Action": [
                    "aoss:BatchGetCollection",
                    "aoss:APIAccessAll"
                ],
                "Effect": "Allow",
                "Resource": "arn:aws:aoss:{region}:{account-id}:collection/{collection-id}"
            },
            {
                "Action": [
                    "aoss:CreateSecurityPolicy",
                    "aoss:GetSecurityPolicy",
                    "aoss:UpdateSecurityPolicy"
                ],
                "Effect": "Allow",
                "Resource": "*",
                "Condition": {
                    "StringEquals": {
                        "aoss:collection": "{collection-name}"
                    }
                }
            }
        ]
    }

  2. Create an IAM role (opensearch-ingestion-pipeline-role) that the OpenSearch Ingestion pipeline will assume. While creating the role, use the policy you created (opensearch-ingestion-pipeline-policy). The role should have the following trust relationship (modify {account-id} and {region} accordingly):
    {
        "Version": "2012-10-17",
        "Statement": [{
            "Effect": "Allow",
            "Principal": {
                "Service": "osis-pipelines.amazonaws.com"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": "{account-id}"
                },
                "ArnLike": {
                    "aws:SourceArn": "arn:aws:osis:{region}:{account-id}:pipeline/*"
                }
            }
        }]
    }

  3. Record the ARN of the newly created IAM role (arn:aws:iam::111122223333:role/opensearch-ingestion-pipeline-role).

Update the data access policy attached to the OpenSearch Serverless collection

After you create the IAM role, you need to update the data access policy attached to the OpenSearch Serverless collection. Data access policies control access to the OpenSearch operations that OpenSearch Serverless supports, such as PUT <index> or GET _cat/indices. To perform the update, complete the following steps:

  1. On the OpenSearch Service console, under Serverless in the navigation pane, choose Collections.
  2. From the list of the collections, choose your OpenSearch Serverless collection.
  3. On the Overview tab, in the Data access section, choose the associated policy.
  4. Choose Edit.
  5. Edit the policy in the JSON editor to add the following JSON rule block in the existing JSON (modify {account-id} and {collection-name} accordingly):
    {
        "Rules": [{
            "Resource": [
                "index/{collection-name}/*"
            ],
            "Permission": [
                "aoss:CreateIndex",
                "aoss:UpdateIndex",
                "aoss:DescribeIndex",
                "aoss:ReadDocument",
                "aoss:WriteDocument"
            ],
            "ResourceType": "index"
        }],
        "Principal": [
            "arn:aws:iam::{account-id}:role/opensearch-ingestion-pipeline-role"
        ],
        "Description": "Provide access to OpenSearch Ingestion Pipeline Role"
    }

You can also use the Visual Editor method to choose Add another rule and add the preceding permissions for arn:aws:iam::{account-id}:role/opensearch-ingestion-pipeline-role.

  1. Choose Save.

Now you have successfully allowed the OpenSearch Ingestion role to perform OpenSearch operations against the OpenSearch Serverless collection.

Create and configure the OpenSearch Ingestion pipeline to copy the data from one index to another

Complete the following steps:

  1. On the OpenSearch Service console, choose Pipelines under Ingestion in the navigation pane.
  2. Choose Create a pipeline.
  3. In Choose Blueprint, select OpenSearchDataMigrationPipeline.
  4. For Pipeline name, enter a name (for example, sample-ingestion-pipeline).
  5. For Pipeline capacity, you can define the minimum and maximum capacity to scale up the resources. For this walkthrough, you can use the default value of 2 Ingestion OCUs for Min capacity and 4 Ingestion OCUs for Max capacity. However, you can even choose different values as OpenSearch Ingestion automatically scales your pipeline capacity according to your estimated workload, based on the minimum and maximum Ingestion OpenSearch Compute Units (Ingestion OCUs) that you specify.
  6. Update the following information for the source:
    1. Uncomment hosts and specify the endpoint of the existing OpenSearch Serverless collection that was copied as part of prerequisites.
    2. Uncomment include and index_name_regex, and specify the name of the index that will act as the source (in this demo, we’re using logs-2024.03.01).
    3. Uncomment region under aws and specify the AWS Region where your OpenSearch Serverless collection is (for example, us-east-1).
    4. Uncomment sts_role_arn under aws and specify the role that has permission to read data from the OpenSearch Serverless collection (for example, arn:aws:iam::111122223333:role/opensearch-ingestion-pipeline-role). This is the same role that was added in the data access policy of the collection.
    5. Update the serverless flag to true.
    6. If the OpenSearch Serverless collection has VPC access, uncomment serverless_options and network_policy_name and specify the name of the network policy used for the collection.
    7. Uncomment scheduling, interval, index_read_count, and start_time and modify these parameters accordingly.
      Using these parameters makes sure the OpenSearch Ingestion pipeline processes the indexes multiple times (to pick up new documents).
      Note – If the collection specified in the sink is of the Time series or Vector search type, you can keep the scheduling, interval, index_read_count, and start_time parameters commented.
  1. Update the following information for the sink:
    1. Uncomment hosts and specify the endpoint of the existing OpenSearch Serverless collection.
    2. Uncomment sts_role_arn under aws and specify the role that has permission to write data into the OpenSearch Serverless collection (for example, arn:aws:iam::111122223333:role/opensearch-ingestion-pipeline-role). This is the same role that was added in the data access policy of the collection.
    3. Update the serverless flag to true.
    4. If the OpenSearch Serverless collection has VPC access, uncomment serverless_options and network_policy_name and specify the name of the network policy used for the collection.
    5. Update the value for index and provide the index name to which you want to transfer the documents (for example, new-logs-2024.03.01).
    6. For document_id, you can get the ID from the document metadata in the source and use the same in the target.
      However, it is important to note that custom document IDs are only supported for the Search type of collection. If your collection is of the Time Series or Vector Search type, you should comment out the document_id line.
    7. (Optional) The values for bucket, region and sts_role_arn keys within the dlq section can be modified to capture any failed requests in an S3 bucket.
      Note – Additional permission to opensearch-ingestion-pipeline-role needs to be given, if configuring DLQ. Please refer Writing to a dead-letter queue, for the changes required.
      For this walkthrough, you will not set up a DLQ. You can remove the entire dlq block.
  1. Now click on Validate pipeline to validate the pipeline configuration.
  2. For Network settings, choose your preferred setting:
    1. Choose VPC access and select your VPC, subnet, and security group to set up the access privately. Choose this option if the OpenSearch Serverless collection has VPC access. AWS recommends using a VPC endpoint for all production workloads.
    2. Choose Public to use public access. For this walkthrough, we select Public because the collection is also accessible from public network.
  3. For Log Publishing Option, you can either create a new Amazon CloudWatch group or use an existing CloudWatch group to write the ingestion logs. This provides access to information about errors and warnings raised during the operation, which can help during troubleshooting. For this walkthrough, choose Create new group.
  4. Choose Next, and verify the details you specified for your pipeline settings.
  5. Choose Create pipeline.

It will take a couple of minutes to create the ingestion pipeline. After the pipeline is created, you will see the documents in the destination index, specified in the sink (for example, new-logs-2024.03.01). After all the documents are copied, you can validate the number of documents by using the count API.

When the process is complete, you have the option to stop or delete the pipeline. If you choose to keep the pipeline running, it will continue to copy new documents from the source index according to the defined schedule, if specified.

In this walkthrough, the endpoint defined in the hosts parameter under source and sink of the pipeline configuration belonged to the same collection which was of the Search type. If the collections are different, you need to modify the permissions for the IAM role (opensearch-ingestion-pipeline-role) to allow access to both collections. Additionally, make sure you update the data access policy for both the collections to grant access to the OpenSearch Ingestion pipeline.

Create an index template using the OpenSearch Ingestion pipeline to define mapping

In OpenSearch, you can define how documents and their fields are stored and indexed by creating a mapping. The mapping specifies the list of fields for a document. Every field in the document has a field type, which defines the type of data the field contains. OpenSearch Service dynamically maps data types in each incoming document if an explicit mapping is not defined. However, you can use the template_type parameter with the index-template value and template_content with JSON of the content of the index-template in the pipeline configuration to define explicit mapping rules. You also need to define the index_type parameter with the value as custom.

The following code shows an example of the sink portion of the pipeline and the usage of index_type, template_type, and template_content:

sink:
    - opensearch:
        # Provide an AWS OpenSearch Service domain endpoint
        hosts: [ "<<OpenSearch-Serverless-Collection-Endpoint>>" ]
        aws:
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com
          sts_role_arn: "arn:aws:iam::111122223333:role/opensearch-ingestion-pipeline-role"
          # Provide the region of the domain.
          region: "us-east-1"
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection
          serverless: true
          # serverless_options:
            # Specify a name here to create or update network policy for the serverless collection
            # network_policy_name: "network-policy-name"
        # This will make it so each document in the source cluster will be written to the same index in the destination cluster
        index: "new-logs-2024.03.01"
        index_type: custom
        template_type: index-template
        template_content: >
          {
            "template" : {
              "mappings" : {
                "properties" : {
                  "Data" : {
                    "type" : "text"
                  },
                  "EncodedColors" : {
                    "type" : "binary"
                  },
                  "Type" : {
                    "type" : "keyword"
                  },
                  "LargeDouble" : {
                    "type" : "double"
                  }          
                }
              }
            }
          }
        # This will make it so each document in the source cluster will be written with the same document_id in the destination cluster
        document_id: "${getMetadata(\"opensearch-document_id\")}"
        # Enable the 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x
        # distribution_version: "es6"
        # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html
        # enable_request_compression: true/false
        # Enable the S3 DLQ to capture any failed requests in an S3 bucket
        # dlq:
          # s3:
            # Provide an S3 bucket
            # bucket: "<<your-dlq-bucket-name>>"
            # Provide a key path prefix for the failed requests
            # key_path_prefix: "<<logs/dlq>>"
            # Provide the region of the bucket.
            # region: "<<us-east-1>>"
            # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com
            # sts_role_arn: "<<arn:aws:iam::111122223333:role/opensearch-ingestion-pipeline-role>>"

Or you can create the index first, with the mapping in the collection before you start the pipeline.

If you want to create a template using an OpenSearch Ingestion pipeline, you need to provide aoss:UpdateCollectionItems and aoss:DescribeCollectionItems permission for the collection in the data access policy for the pipeline role (opensearch-ingestion-pipeline-role). The updated JSON block for the rule would look like the following:

{
    "Rules": [
      {
        "Resource": [
          "collection/{collection-name}"
        ],
        "Permission": [
          "aoss:UpdateCollectionItems",
          "aoss:DescribeCollectionItems"
        ],
        "ResourceType": "collection"
      },
      {
        "Resource": [
          "index/{collection-name}/*"
        ],
        "Permission": [
          "aoss:CreateIndex",
          "aoss:UpdateIndex",
          "aoss:DescribeIndex",
          "aoss:ReadDocument",
          "aoss:WriteDocument"
        ],
        "ResourceType": "index"
      }
    ],
    "Principal": [
      "arn:aws:iam::{account-id}:role/opensearch-ingestion-pipeline-role"
    ],
    "Description": "Provide access to OpenSearch Ingestion Pipeline Role"
  }

Conclusion

In this post, we showed how to use an OpenSearch Ingestion pipeline to copy data from one index to another in an OpenSearch Serverless collection. OpenSearch Ingestion also allows you to perform transformation of data using various processors. AWS offers various resources for you to quickly start building pipelines using OpenSearch Ingestion. You can use various built-in pipeline integrations to quickly ingest data from Amazon DynamoDB, Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Security Lake, Fluent Bit, and many more. You can use the following OpenSearch Ingestion blueprints to build data pipelines with minimal configuration changes.


About the Authors

Utkarsh Agarwal is a Cloud Support Engineer in the Support Engineering team at Amazon Web Services. He specializes in Amazon OpenSearch Service. He provides guidance and technical assistance to customers thus enabling them to build scalable, highly available, and secure solutions in the AWS Cloud. In his free time, he enjoys watching movies, TV series, and of course, cricket. Lately, he has also been attempting to master the art of cooking in his free time – the taste buds are excited, but the kitchen might disagree.

Prashant Agrawal is a Sr. Search Specialist Solutions Architect with Amazon OpenSearch Service. He works closely with customers to help them migrate their workloads to the cloud and helps existing customers fine-tune their clusters to achieve better performance and save on cost. Before joining AWS, he helped various customers use OpenSearch and Elasticsearch for their search and log analytics use cases. When not working, you can find him traveling and exploring new places. In short, he likes doing Eat → Travel → Repeat.

Uncover social media insights in real time using Amazon Managed Service for Apache Flink and Amazon Bedrock

Post Syndicated from Francisco Morillo original https://aws.amazon.com/blogs/big-data/uncover-social-media-insights-in-real-time-using-amazon-managed-service-for-apache-flink-and-amazon-bedrock/

With over 550 million active users, X (formerly known as Twitter) has become a useful tool for understanding public opinion, identifying sentiment, and spotting emerging trends. In an environment where over 500 million tweets are sent each day, it’s crucial for brands to effectively analyze and interpret the data to maximize their return on investment (ROI), which is where real-time insights play an essential role.

Amazon Managed Service for Apache Flink helps you to transform and analyze streaming data in real time with Apache Flink. Apache Flink supports stateful computation over a large volume of data in real time with exactly-once consistency guarantees. Moreover, Apache Flink’s support for fine-grained control of time with highly customizable window logic enables the implementation of the advanced business logic required for building a streaming data platform. Stream processing and generative artificial intelligence (AI) have emerged as powerful tools to harness the potential of real time data. Amazon Bedrock, along with foundation models (FMs) such as Anthropic Claude on Amazon Bedrock, empowers a new wave of AI adoption by enabling natural language conversational experiences.

In this post, we explore how to combine real-time analytics with the capabilities of generative AI and use state-of-the-art natural language processing (NLP) models to analyze tweets through queries related to your brand, product, or topic of choice. It goes beyond basic sentiment analysis and allows companies to provide actionable insights that can be used immediately to enhance customer experience. These include:

  • Identifying rising trends and discussion topics related to your brand
  • Conducting granular sentiment analysis to truly understand customers’ opinions
  • Detecting nuances such as emojis, acronyms, sarcasm, and irony
  • Spotting and addressing concerns proactively before they spread
  • Guiding product development based on feature requests and feedback
  • Creating targeted customer segments for information campaigns

This post takes a step-by-step approach to showcase how you can use Retrieval Augmented Generation (RAG) to reference real-time tweets as a context for large language models (LLMs). RAG is the process of optimizing the output of an LLM so it references an authoritative knowledge base outside of its training data sources before generating a response. LLMs are trained on vast volumes of data and use billions of parameters to generate original output for tasks such as answering questions, translating languages, and completing sentences. RAG extends the already powerful capabilities of LLMs to specific domains or an organization’s internal knowledge base, all without the need to retrain the model. It’s a cost-effective approach to improving LLM output so it remains relevant, accurate, and useful in various contexts.

Solution overview

In this section, we explain the flow and architecture of the application. We divide the flow of the application into two parts:

  • Data ingestion – Ingest data from streaming sources, convert it to vector embeddings, and then store them in a vector database
  • Insights retrieval – Invoke an LLM with the user queries to retrieve insights on tweets using the RAG technique

Data ingestion

The following diagram describes the data ingestion flow:

  1. Process feeds from streaming sources, such as social media feeds, Amazon Kinesis Data Streams, or Amazon Managed Service for Apache Kafka (Amazon MSK).
  2. Convert streaming data to vector embeddings in real time.
  3. Store them in a vector database.

Data is ingested from a streaming source (for example, X) and processed using an Apache Flink application. Apache Flink is an open source stream processing framework. It provides powerful streaming capabilities, enabling real-time processing, stateful computations, fault tolerance, high throughput, and low latency. Apache Flink is used to process the streaming data, perform deduplication, and invoke an embedding model to create vector embeddings.

Vector embeddings are numerical representations that capture the relationships and meaning of words, sentences, and other data types. These vector embeddings will be used for semantic search or neural search to retrieve relevant information that will be used as context for the LLM to evaluate the response. After the text data is converted into vectors, the vectors are persisted in an Amazon OpenSearch Service domain, which will be used as a vector database. Unlike traditional relational databases with rows and columns, data points in a vector database are represented by vectors with a fixed number of dimensions, which are clustered based on similarity.

OpenSearch Service offers scalable and efficient similarity search capabilities tailored for handling large volumes of dense vector data. OpenSearch Service seamlessly integrates with other AWS services, enabling you to build robust data pipelines within AWS. As a fully managed service, OpenSearch Service alleviates the operational overhead of managing the underlying infrastructure, while providing essential features like approximate k-Nearest Neighbor (k-NN) search algorithms, dense vector support, and robust monitoring and logging tools through Amazon CloudWatch. These capabilities make OpenSearch Service a suitable solution for applications that require fast and accurate similarity-based retrieval tasks using vector embeddings.

This design enables real-time vector embedding, making it ideal for AI-driven applications.

Insights retrieval

The following diagram shows the flow from the user side, where the user places a query through the frontend and gets a response from the LLM model using the retrieved vector database documents as the context provided in the prompt.

As shown in the preceding figure, to retrieve insights from the LLM, first you need to receive a query from the user. The text query is then converted into vector embeddings using the same model that was used before for the tweets. It’s important to make sure the same embedding model is used for both ingestion and search. The vector embeddings are then used to perform a semantic search in the vector database to obtain the related vectors and associated text. This serves as the context for the prompt. Next, the previous conversation history (if any) is added to the prompt. This serves as the conversation history for the model. Finally, the user’s question is also included in the prompt and the LLM is invoked to get the response.

For the purpose of this post, we don’t take into consideration the conversation history or store it for later use.

Solution architecture

Now that you understand the overall process flow, let’s analyze the following architecture using AWS services step by step.

The first part of the preceding figure shows the data ingestion process:

  1. A user authenticates with Amazon Cognito.
  2. The user connects to the Streamlit frontend and configures the following parameters: query terms, API bearer token, and frequency to retrieve tweets.
  3. Managed Service for Apache Flink is used to consume and process the tweets in real time and stores in Apache Flink’s state the parameters for making the API requests received from the frontend application.
  4. The streaming application uses Apache Flink’s async I/O to invoke the Amazon Titan Embeddings model through the Amazon Bedrock API.
  5. Amazon Bedrock returns a vector embedding for each tweet.
  6. The Apache Flink application then writes the vector embedding with the original text of the tweet into an OpenSearch Service k-NN index.

The remainder of the architecture diagram shows the insights retrieval process:

  1. A user sends a query through the Streamlit frontend application.
  2. An AWS Lambda function is invoked by Amazon API Gateway, passing the user query as input.
  3. The Lambda function uses LangChain to orchestrate the RAG process. As a first step, the function invokes the Amazon Titan Embeddings model on Amazon Bedrock to create a vector embedding for the question.
  4. Amazon Bedrock returns the vector embedding for the question.
  5. As a second step in the RAG orchestration process, the Lambda function performs a semantic search in OpenSearch Service and retrieves the relevant documents related to the question.
  6. OpenSearch Service returns the relevant documents containing the tweet text to the Lambda function.
  7. As a last step in the LangChain orchestration process, the Lambda function augments the prompt, adding the context and using few-shot prompting. The augmented prompt, including instructions, examples, context, and query, is sent to the Anthropic Claude model through the Amazon Bedrock API.
  8. Amazon Bedrock returns the answer to the question in natural language to the Lambda function.
  9. The response is sent back to the user through API Gateway.
  10. API Gateway provides the response to the user question in the Streamlit application.

The solution is available in the GitHub repo. Follow the README file to deploy the solution.

Now that you understand the overall flow and architecture, let’s dive deeper into some of the key steps to understand how it works.

Amazon Bedrock chatbot UI

The Amazon Bedrock chatbot Streamlit application is designed to provide insights from tweets, whether they are real tweets ingested from the X API or simulated tweets or messages from the My Social Media application.

In the Streamlit application, we can provide the parameters that will be used to make the API requests to the X Developer API and pull the data from X. We developed an Apache Flink application that adjusts the API requests based on the provided parameters.

As parameters, you need to provide the following:

  • Bearer token for API authorization – This is obtained from the X Developer platform when you sign up to use the APIs.
  • Query terms to be used to filter the tweets consumed – You can use the search operators available in the X documentation.
  • Frequency of the request – The X basic API only allows you to make a request every 15 seconds. If a lower interval is set, the application won’t pull data.

The parameters are sent to Kinesis Data Streams through API Gateway and are consumed by the Apache Flink application.

My Social Media UI

The My Social Media application is a Streamlit application that serves as an additional UI. Through this application, users can compose and send messages, simulating the experience of posting on a social media site. These messages are then ingested into an AWS data pipeline consisting of API Gateway, Kinesis Data Streams, and an Apache Flink application. The Apache Flink application processes the incoming messages, invokes an Amazon Bedrock embedding model, and stores the data in an OpenSearch Service cluster.

To accommodate both real X data and simulated data from the My Social Media application, we’ve set up separate indexes within the OpenSearch Service cluster. This separation allows users to choose which data source they want to analyze or query. The Streamlit application features a sidebar option called Use X Index that acts as a toggle. When this option is enabled, the application queries and analyzes data from the index containing real tweets ingested from the X API. If the option is disabled, the application queries and displays data from the index containing messages sent through the My Social Media application.

Apache Flink is used because of its ability to scale with the increasing volume of tweets. The Apache Flink application is responsible for performing data ingestion as explained previously. Let’s dive into the details of the flow.

Consume data from X

We use Apache Flink to process the API parameters sent from the Streamlit UI. We store the parameters in Apache Flink’s state, which allows us to modify and update the parameters without having to restart the application. We use the ProcessFunction to use Apache Flink’s internal timers to schedule the frequency of requests to fetch tweets. In this post, we use X’s Recent search API, which allows us to access filtered public tweets posted over the last 7 days. The API response is paginated and returns a maximum of 100 tweets on each request in reverse chronological order. If there are more tweets to be consumed, the response of the previous request will return a token, which needs to be used in the next API call. After we receive the tweets from the API, we apply the following transformations:

  • Filter out the empty tweets (tweets without any text).
  • Partition the set of tweets by author ID. This helps distribute the processing to multiple subtasks in Apache Flink.
  • Apply a deduplication logic to only process tweets that haven’t been processed. For this, we store the already processed tweet ID in Apache Flink’s state and match and filter out the tweets that have already been processed. We store the tweets’ ID grouped by author ID, which can cause the state size of the application to increase. Because the API only provides tweets from the last 7 days when invoked, we have introduced a time-to-live (TTL) of 7 days so we don’t grow the application’s state indefinitely. You can modify this based on your requirements.
  • Convert tweets into JSON objects for a later Amazon Bedrock API invocation.

Create vector embeddings

The vector embeddings are created by invoking the Amazon Titan Embeddings model through the Amazon Bedrock API. Asynchronous invocations of external APIs are important performance considerations when building a stream processing architecture. Synchronous calls increase latency, reduce throughput, and can be a bottleneck for overall processing.

To invoke the Amazon Bedrock API, you will use the Amazon Bedrock Runtime dependency in Java, which provides an asynchronous client that allows us invoke Amazon Bedrock models asynchronously through the BedrockRuntimeAsyncClient. This is invoked to create the embeddings. For this we use Apache Flink’s async I/O to make asynchronous requests to external APIs. Apache Flink’s async I/O is a library within Apache Flink that allows you to write asynchronous, non-blocking operators for stream processing applications, enabling better utilization of resources and higher throughput. We provide the asynchronous function to be called, the timeout duration that determines how long an asynchronous operation can take before it’s considered failed, and the maximum number of requests that should be in progress at any point in time. Limiting the number of concurrent requests makes sure that the operator won’t accumulate an ever-growing backlog of pending requests. However, this can cause backpressure after the capacity is exhausted. Because we use the timestamp of creation when we ingest into OpenSearch Service and so order won’t affect our results, we can use Apache Flink’s async I/O unordered function, allowing us to have better throughput and performance. See the following code:

DataStream<JSONObject> resultStream = AsyncDataStream

.unorderedWait(inputJSON, new BedRockEmbeddingModelAsyncTweetFunction(), 15000, TimeUnit.MILLISECONDS, 1000)
.uid("tweet-async-function");

Let’s have a closer look into the Apache Flink async I/O function. The following is within the CompletableFuture Java class:

  1. First, we create the Amazon Bedrock Runtime async client:
BedrockRuntimeAsyncClient runtime = BedrockRuntimeAsyncClient.builder()
.region(Region.of(region))  // Use the specified AWS region 
.build();
  1. We then extract the tweet for the event and build the payload that we will send to Amazon Bedrock:
String stringBody = jsonObject.getString("tweet");

ArrayList<String> stringList = new ArrayList<>();


stringList.add(stringBody);


JSONObject jsonBody = new JSONObject()
.put("inputText", stringBody);


SdkBytes body = SdkBytes.fromUtf8String(jsonBody.toString());
  1. After we have the payload, we can call the InvokeModel API and invoke Amazon Titan to create the vector embeddings for the tweets:
InvokeModelRequest request = InvokeModelRequest.builder()
        
.modelId("amazon.titan-embed-text-v1")
        
.contentType("application/json")
        
.accept("*/*")
        
.body(body)
        
.build();

CompletableFuture<InvokeModelResponse> futureResponse = runtime.invokeModel(request);
  1. After receiving the vector, we append the following fields to the output JSONObject:
    1. Cleaned tweet
    2. Tweet creation timestamp
    3. Number of likes of the tweet
    4. Number of retweets
    5. Number of views from the tweet (impressions)
    6. Tweet ID
// Extract and process the response when it is available
JSONObject response = new JSONObject(
        futureResponse.join().body().asString(StandardCharsets.UTF_8)
);

// Add additional fields related to tweet data to the response
response.put("tweet", jsonObject.get("tweet"));
response.put("@timestamp", jsonObject.get("created_at"));
response.put("likes", jsonObject.get("likes"));
response.put("retweet_count", jsonObject.get("retweet_count"));
response.put("impression_count", jsonObject.get("impression_count"));
response.put("_id", jsonObject.get("_id"));

return response;

This will return the embeddings, original text, additional fields, and the number of tokens used for the embedding. In our connector, we are only consuming messages in English, as well as ignoring messages that are retweets from other tweets.

The same processing steps are replicated for messages coming from the My Social Media application (manually ingested).

Store vector embeddings in OpenSearch Service

We use OpenSearch Service as a vector database for semantic search. Before we can write the data into OpenSearch Service, we need to create an index that supports semantic search. We are using the k-NN plugin. The vector database index mapping should have the following properties for storing vectors for similarity search:

"embeddings": {
        "type": "knn_vector",
        "dimension": 1536,
        "method": {
          "name": "hnsw",
          "space_type": "l2",
          "engine": "nmslib",
          "parameters": {
            "ef_construction": 128,
            "m": 24
          }
        }
      }

The key parameters are as follows:

  • type – This specifies that the field will hold vector data for a k-NN similarity search. The value should be knn_vector.
  • dimension – The number of dimensions for each vector. This must match the model dimension. In this case we use 1,536 dimensions, the same as the Amazon Titan Text Embeddings v1 model.
  • method – Defines the algorithm and parameters for indexing and searching the vectors:
    • name – The identifier for the nearest neighbor method. We use hierarchical navigable small worlds (HNSW)—a hierarchical proximity graph approach—to run a approximate k-NN (A-NN) search because standard k-NN is not a scalable approach.
    • space_type – The vector space used to calculate the distance between vectors. It supports multiple space type. The default value is 12.
    • engine – The approximate k-NN library to use for indexing and search. The available libraries are faiss, nmslib, and Lucene.
    • ef_construction – The size of the dynamic list used during k-NN graph creation. Higher values result in a more accurate graph but slower indexing speed.
    • m – The number of bidirectional links that the plugin creates for each new element. Increasing and decreasing this value can have a large impact on memory consumption. Keep this value between 2–100.

Standard k-NN search methods compute similarity using a brute-force approach that measures the nearest distance between a query and a number of points, which produces exact results. This works well for most applications. However, in the case of extremely large datasets with high dimensionality, this creates a scaling problem that reduces the efficiency of the search. The approximate k-NN search methods used by OpenSearch Service use approximate nearest neighbor (ANN) algorithms from the nmslib, faiss, and Lucene libraries to power k-NN search. These search methods employ ANN to improve search latency for large datasets. Of the three search methods the k-NN plugin provides, this method offers the best search scalability for large datasets. This approach is the preferred method when a dataset reaches hundreds of thousands of vectors. For more information about the different methods and their trade-offs, refer to Comprehensive Guide To Approximate Nearest Neighbors Algorithms.

To use the k-NN plugin’s approximate search functionality, we must first create a k-NN index with index.knn set to true:

    "settings" : {
      "index" : {
        "knn": true,
        "number_of_shards" : "5",
        "number_of_replicas" : "1"
      }
    }

After we have our indexes created, we can sink the data from our Apache Flink application into OpenSearch.

RetrievalQA using Lambda and LangChain implementation

For this part, we take an input question from the user and invoke a Lambda function. The Lambda function retrieves relevant tweets from OpenSearch Service as context and generates an answer using the LangChain RAG chain RetrievalQA. LangChain is a framework for developing applications powered by language models.

First, some setup. We instantiate the bedrock-runtime client that will allow the Lambda function to invoke the models:

bedrock_runtime = boto3.client("bedrock-runtime", "us-east-1")

embeddings = BedrockEmbeddings(model_id="amazon.titan-embed-text-v1", client=bedrock_runtime)

The BedrockEmbeddings class uses the Amazon Bedrock API to generate embeddings for the user’s input question. It strips new line characters from the text. Notice that we need to pass as an argument the instantiation of the bedrock_runtime client and the model ID for the Amazon Titan Text Embeddings v1 model.

Next, we instantiate the client for the OpenSearchVectorSeach LangChain class that will allow the Lambda function to connect to the OpenSearch Service domain and perform the semantic search against the previously indexed X embeddings. For the embedding function, we’re passing the embeddings model that we defined previously. This will be used during the LangChain orchestration process:

os_client = OpenSearchVectorSearch(
        index_name=aos_index,
        embedding_function=embeddings,
        http_auth=(os.environ['aosUser'], os.environ['aosPassword']),
        opensearch_url=os.environ['aosDomain'],
        timeout=300,
        use_ssl=True,
        verify_certs=True,
        connection_class=RequestsHttpConnection,
        )

We need to define the LLM model from Amazon Bedrock to use for text generation. The temperature is set to 0 to reduce hallucinations:

model_kwargs={"temperature": 0, "max_tokens": 4096}

llm = BedrockChat(
    model_id="anthropic.claude-3-haiku-20240307-v1:0",
    client=bedrock_runtime,
    model_kwargs=model_kwargs
)

Next, in our Lambda function, we create the prompt to instruct the model on the specific task of analyzing hundreds of tweets in the context. To normalize the output, we use a prompt engineering technique called few-shot prompting. Few-shot prompting allows language models to learn and generate responses based on a small number of examples or demonstrations provided in the prompt itself. In this approach, instead of training the model on a large dataset, we provide a few examples of the desired task or output within the prompt. These examples serve as a guide or conditioning for the model, enabling it to understand the context and the desired format or pattern of the response. When presented with a new input after the examples, the model can then generate an appropriate response by following the patterns and context established by the few-shot demonstrations in the prompt.

As part of the prompt, we then provide examples of questions and answers, so the chatbot can follow the same pattern when used (see the Lambda function to view the complete prompt):

template = """As a helpful agent that is an expert analysing tweets, please answer the question using only the provided tweets from the context in <context></context> tags. If you don't see valuable information on the tweets provided in the context in <context></context> tags, say you don't have enough tweets related to the question. Cite the relevant context you used to build your answer. Print in a bullet point list the top most influential tweets from the context at the end of the response.
    
    Find below some examples:
    <example1>
    question: 
    What are the main challenges or concerns mentioned in tweets about using Bedrock as a generative AI service on AWS, and how can they be addressed?
    
    answer:
    Based on the tweets provided in the context, the main challenges or concerns mentioned about using Bedrock as a generative AI service on AWS are:

1.	...
2.	...
3.	...
4.	...
...
    
    To address these concerns:

1.	...
2.	...
3.	...
4.	...
...

    Top tweets from context:

    [1] ...
    [2] ...
    [3] ...
    [4] ...

    </example1>
    
    <example2>
    ...
    </example2>
    
    Human: 
    
    question: {question}
    
    <context>
    {context}
    </context>
    
    Assistant:"""

    prompt = PromptTemplate(input_variables=["context","question"], template=template)

We then create the RetrievalQA LangChain chain using the prompt template, Anthropic Claude on Amazon Bedrock, and the OpenSearch Service retriever configured previously. The RetrievalQA LangChain chain will orchestrate the following RAG steps:

  • Invoke the text embedding model to create a vector for the user’s question
  • Perform a semantic search on OpenSearch Service using the vector to retrieve the relevant tweets to the user’s question (k=200)
  • Invoke the LLM model using the augmented prompt containing the prompt template, context (stuffed retrieved tweets) and question
chain = RetrievalQA.from_chain_type(
    llm=llm,
    verbose=True,
    chain_type="stuff",
    retriever=os_client.as_retriever(
        search_type="similarity",
        search_kwargs={
            "k": 200, 
            "space_type": "l2", 
            "vector_field": "embeddings", 
            "text_field": text_field
        }
    ),
    chain_type_kwargs = {"prompt": prompt}
)

Finally, we run the chain:

answer = chain.invoke({"query": message})

The response from the LLM is sent back to the user application. As shown in the following screenshot:

Considerations

You can extend the solution provided in this post. When you do, consider the following suggestions:

  • Configure index retention and rollover in OpenSearch Service to manage index lifecycle and data retention effectively
  • Incorporate chat history into the chatbot to provide richer context and improve the relevance of LLM responses
  • Add filters and hybrid search with the possibility to modify the weight given to the keyword and semantic search to enhance search on RAG
  • Modify the TTL for Apache Flink’s state to match your requirements (the solution in this post uses 7 days)
  • Enable logging to API Gateway and in the Streamlit application.

Summary

This post demonstrates how to combine real-time analytics with generative AI capabilities to analyze tweets related to a brand, product, or topic of interest. It uses Amazon Managed Service for Apache Flink to process tweets from the X API, create vector embeddings using the Amazon Titan Embeddings model on Amazon Bedrock, and store the embeddings in an OpenSearch Service index configured for vector similarity search—all these steps happen in real time.

The post also explains how users can input queries through a Streamlit frontend application, which invokes a Lambda function. This Lambda function retrieves relevant tweets from OpenSearch Service by performing semantic search on the stored embeddings using the LangChain RetrievalQA chain. As a result, it generates insightful answers using the Anthropic Claude LLM on Amazon Bedrock.

The solution enables identifying trends, conducting sentiment analysis, detecting nuances, addressing concerns, guiding product development, and creating targeted customer segments based on real-time X data.

To get started with generative AI, visit Generative AI on AWS for information about industry use cases, tools to build and scale generative AI applications, as well as the post Exploring real-time streaming for generative AI Applications for other use cases for streaming with generative AI.


About the Authors

Francisco Morillo is a Streaming Solutions Architect at AWS, specializing in real-time analytics architectures. With over five years in the streaming data space, Francisco has worked as a data analyst for startups and as a big data engineer for consultancies, building streaming data pipelines. He has deep expertise in Amazon Managed Streaming for Apache Kafka (Amazon MSK) and Amazon Managed Service for Apache Flink. Francisco collaborates closely with AWS customers to build scalable streaming data solutions and advanced streaming data lakes, ensuring seamless data processing and real-time insights.

Sergio Garcés Vitale is a Senior Solutions Architect at AWS, passionate about generative AI. With over 10 years of experience in the telecommunications industry, where he helped build data and observability platforms, Sergio now focuses on guiding Retail and CPG customers in their cloud adoption, as well as customers across all industries and sizes in implementing Artificial Intelligence use cases.

Subham Rakshit is a Streaming Specialist Solutions Architect for Analytics at AWS based in the UK. He works with customers to design and build search and streaming data platforms that help them achieve their business objective. Outside of work, he enjoys spending time solving jigsaw puzzles with his daughter.

Run Apache Spark 3.5.1 workloads 4.5 times faster with Amazon EMR runtime for Apache Spark

Post Syndicated from Ashok Chintalapati original https://aws.amazon.com/blogs/big-data/run-apache-spark-3-5-1-workloads-4-5-times-faster-with-amazon-emr-runtime-for-apache-spark/

The Amazon EMR runtime for Apache Spark is a performance-optimized runtime that is 100% API compatible with open source Apache Spark. It offers faster out-of-the-box performance than Apache Spark through improved query plans, faster queries, and tuned defaults. Amazon EMR on EC2, Amazon EMR Serverless, Amazon EMR on Amazon EKS, and Amazon EMR on AWS Outposts all use this optimized runtime, which is 4.5 times faster than Apache Spark 3.5.1 and has 2.8 times better price-performance based on an industry standard benchmark derived from TPC-DS at 3 TB scale (note that our TPC-DS derived benchmark results are not directly comparable with official TPC-DS benchmark results).

We added 35 optimizations since the EOY 2022 release, EMR 6.9, that are included in both EMR 7.0 and EMR 7.1. These improvements are turned on by default and are 100% API compatible with Apache Spark. Some of the improvements since our previous post, Amazon EMR on EKS widens the performance gap, include:

  • Spark physical plan operator improvements – We continue to improve Spark runtime performance by changing the operator algorithms:
    • Optimized data structures used in hash joins for performance and memory requirements, allowing the use of more performant join algorithm for more cases
    • Optimized sorting for partial window
    • Optimized rollup operations
    • Improved sort algorithm for shuffle partitioning
    • Optimized hash aggregate operator
    • More efficient decimal arithmetic operations
    • Aggregates based on Parquet statistics
  • Spark query planning improvements – We introduced new rules in the Spark’s Catalyst optimizer to improve efficiency:
    • Adaptively minimize redundant joins
    • Adaptively identify and disable unhelpful optimizations at runtime
    • Infer more advanced Bloom filters and dynamic partition pruning filters from complex query plans to reduce amount of data shuffled and read from Amazon Simple Storage Service (Amazon S3)
  • Fewer requests to Amazon S3 – We reduced requests sent to Amazon S3 when reading Parquet files by minimizing unnecessary requests and introducing a cache for Parquet footers.
  • Java 17 as default Java runtime used in Amazon EMR 7.0 – Java 17 was extensively tested and tuned for optimal performance, allowing us to make it the default Java runtime for Amazon EMR 7.0.

For more details on EMR Spark performance optimizations, refer to Optimize Spark performance.

In this post, we share the testing methodology and benchmark results comparing the latest Amazon EMR versions (7.0 and 7.1) with the EOY 2022 release (version 6.9) and Apache Spark 3.5.1 to demonstrate the latest cost improvements Amazon EMR has achieved.

Benchmark results for Amazon EMR 7.1 vs. Apache Spark 3.5.1

To evaluate the Spark engine performance, we ran benchmark tests with the 3 TB TPC-DS dataset. We used EMR Spark clusters for benchmark tests on Amazon EMR and installed Apache Spark 3.5.1 on Amazon Elastic Compute Cloud (Amazon EC2) clusters designated for open source Spark (OSS) benchmark runs. We ran tests on separate EC2 clusters comprised of nine r5d.4xlarge instances for each of Apache Spark 3.5.1, Amazon EMR 6.9.0, and Amazon EMR 7.1. The primary node has 16 vCPU and 128 GB memory and eight worker nodes have a total of 128 vCPU and 1024 GB memory. We tested with Amazon EMR defaults to highlight the out-of-the-box experience and tuned Apache Spark with the minimal settings needed to provide a fair comparison.

For the source data, we chose the 3 TB scale factor, which contains 17.7 billion records, approximately 924 GB of compressed data in Parquet file format. The setup instructions and technical details can be found in the GitHub repository. We used Spark’s in-memory data catalog to store metadata for TPC-DS databases and tables. spark.sql.catalogImplementation is set to the default value in-memory. The fact tables are partitioned by the date column, which consists of partitions ranging from 200–2,100. No statistics were pre-calculated for these tables.

A total of 104 SparkSQL queries were run in three iterations sequentially and an average of each query’s runtime in these three iterations was used for comparison. The average of the three iterations’ runtime on Amazon EMR 7.1 was 0.51 hours, which is 1.9 times faster than Amazon EMR 6.9 and 4.5 times faster than Apache Spark 3.5.1. The following figure illustrates the total runtimes in seconds.

The per-query speedup on Amazon EMR 7.1 when compared to Apache Spark 3.5.1 is illustrated in the following chart. Although Amazon EMR is faster than Apache Spark on all TPC-DS queries, the speedup is much greater on some queries than on others. The horizontal axis represents queries in the TPC-DS 3 TB benchmark ordered by the Amazon EMR speedup descending and the vertical axis shows the speedup of queries due to the Amazon EMR runtime.

Cost comparison

Our benchmark outputs the total runtime and geometric mean figures to measure the Spark runtime performance by simulating a real-world complex decision support use case. The cost metric can provide us with additional insights. Cost estimates are computed using the following formulas. They factor in Amazon EC2, Amazon Elastic Block Store (Amazon EBS), and Amazon EMR costs, but don’t include Amazon S3 GET and PUT costs.

  • Amazon EC2 cost (include SSD cost) = number of instances * r5d.4xlarge hourly rate * job runtime in hours
    • 4xlarge hourly rate = $1.152 per hour
  • Root Amazon EBS cost = number of instances * Amazon EBS per GB-hourly rate * root EBS volume size * job runtime in hours
  • Amazon EMR cost = number of instances * r5d.4xlarge Amazon EMR cost * job runtime in hours
    • 4xlarge Amazon EMR cost = $0.27 per hour
  • Total cost = Amazon EC2 cost + root Amazon EBS cost + Amazon EMR cost

Based on the calculation, the Amazon EMR 7.1 benchmark result demonstrates a 2.8 times improvement in job cost compared to Apache Spark 3.5.1 and a 1.7 times improvement when compared to Amazon EMR 6.9.

Metric Amazon EMR 7.1 Amazon EMR 6.9 Apache Spark 3.5.1
Runtime in hours 0.51 0.87 1.76
Number of EC2 instances 9 9 9
Amazon EBS Size 20gb 20gb 20gb
Amazon EC2 cost $5.29 $9.02 $18.25
Amazon EBS cost $0.01 $0.02 $0.04
Amazon EMR cost $1.24 $2.11 $0.00
Total cost $6.54 $11.15 $18.29
Cost Savings Baseline Amazon EMR 7.1 is 1.7 times better Amazon EMR 7.1 is 2.8 times better

Run OSS Spark benchmarking

For running Apache Spark 3.5.1, we used the following configurations to set up an EC2 cluster. We used one primary node and eight worker nodes of type r5d.4xlarge.

EC2 Instance vCPU Memory (GiB) Instance Storage (GB) EBS Root Volume (GB)
r5d.4xlarge 16 128 2 x 300 NVMe SSD 20GB

Prerequisites

The following prerequisites are required to run the benchmarking:

  1. Using the instructions in the emr-spark-benchmark GitHub repo, set up the TPC-DS source data in your S3 bucket and your local computer.
  2. Build the benchmark application following the steps provided in Steps to build spark-benchmark-assembly application and copy the benchmark application to your S3 bucket. Alternatively, copy spark-benchmark-assembly-3.5.1.jar to your S3 bucket.

This benchmark application is built from branch tpcds-v2.13. If you’re building a new benchmark application, switch to the correct branch after downloading the source code from the GitHub repo.

Create and configure a YARN cluster on Amazon EC2

Follow the instructions in the emr-spark-benchmark GitHub repo to create an OSS Spark cluster on Amazon EC2 using Flintrock.

Based on the cluster selection for this test, the following are the configurations used:

Run the TPC-DS benchmark for Apache Spark 3.5.1

Complete the following steps to run the TPC-DS benchmark for Apache Spark 3.5.1:

  1. Log in to the OSS cluster primary using flintrock login $CLUSTER_NAME.
  2. Submit your Spark job:
    1. The TPC-DS source data is at s3a://<YOUR_S3_BUCKET>/BLOG_TPCDS-TEST-3T-partitioned. Check the prerequisites on how to set up the source data.
    2. The results are created in s3a://<YOUR_S3_BUCKET>/benchmark_run.
    3. You can track progress in /media/ephemeral0/spark_run.log.
spark-submit \
--master yarn \
--deploy-mode client \
--class com.amazonaws.eks.tpcds.BenchmarkSQL \
--conf spark.driver.cores=4 \
--conf spark.driver.memory=10g \
--conf spark.executor.cores=16 \
--conf spark.executor.memory=100g \
--conf spark.executor.instances=8 \
--conf spark.network.timeout=2000 \
--conf spark.executor.heartbeatInterval=300s \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.shuffle.service.enabled=false \
--conf spark.hadoop.fs.s3a.aws.credentials.provider=com.amazonaws.auth.InstanceProfileCredentialsProvider \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--conf spark.jars.packages=org.apache.hadoop:hadoop-aws:3.3.4 \
spark-benchmark-assembly-3.5.1.jar \
s3a://<YOUR_S3_BUCKET>/BLOG_TPCDS-TEST-3T-partitioned \
s3a://<YOUR_S3_BUCKET>/benchmark_run \
/opt/tpcds-kit/tools parquet 3000 3 false \
q1-v2.13,q10-v2.13,q11-v2.13,q12-v2.13,q13-v2.13,q14a-v2.13,q14b-v2.13,q15-v2.13,q16-v2.13,\
q17-v2.13,q18-v2.13,q19-v2.13,q2-v2.13,q20-v2.13,q21-v2.13,q22-v2.13,q23a-v2.13,q23b-v2.13,\
q24a-v2.13,q24b-v2.13,q25-v2.13,q26-v2.13,q27-v2.13,q28-v2.13,q29-v2.13,q3-v2.13,q30-v2.13,\
q31-v2.13,q32-v2.13,q33-v2.13,q34-v2.13,q35-v2.13,q36-v2.13,q37-v2.13,q38-v2.13,q39a-v2.13,\
q39b-v2.13,q4-v2.13,q40-v2.13,q41-v2.13,q42-v2.13,q43-v2.13,q44-v2.13,q45-v2.13,q46-v2.13,\
q47-v2.13,q48-v2.13,q49-v2.13,q5-v2.13,q50-v2.13,q51-v2.13,q52-v2.13,q53-v2.13,q54-v2.13,\
q55-v2.13,q56-v2.13,q57-v2.13,q58-v2.13,q59-v2.13,q6-v2.13,q60-v2.13,q61-v2.13,q62-v2.13,\
q63-v2.13,q64-v2.13,q65-v2.13,q66-v2.13,q67-v2.13,q68-v2.13,q69-v2.13,q7-v2.13,q70-v2.13,\
q71-v2.13,q72-v2.13,q73-v2.13,q74-v2.13,q75-v2.13,q76-v2.13,q77-v2.13,q78-v2.13,q79-v2.13,\
q8-v2.13,q80-v2.13,q81-v2.13,q82-v2.13,q83-v2.13,q84-v2.13,q85-v2.13,q86-v2.13,q87-v2.13,\
q88-v2.13,q89-v2.13,q9-v2.13,q90-v2.13,q91-v2.13,q92-v2.13,q93-v2.13,q94-v2.13,q95-v2.13,\
q96-v2.13,q97-v2.13,q98-v2.13,q99-v2.13,ss_max-v2.13 \
true > /media/ephemeral0/spark_run.log 2>&1 &!

Summarize the results

When the Spark job is complete, download the test result file from the output S3 bucket s3a://<YOUR_S3_BUCKET>/benchmark_run/timestamp=xxxx/summary.csv/xxx.csv. You can use the Amazon S3 console and navigate to the output bucket location or use the Amazon Command Line Interface (AWS CLI).

The Spark benchmark application creates a timestamp folder and writes a summary file inside a summary.csv prefix. Your timestamp and file name will be different from the one shown in the preceding example.

The output CSV files have four columns without header names:

  • Query name
  • Median time
  • Minimum time
  • Maximum time

Because we have three runs, we can then compute the average and geometric mean of the runtimes.

Run the TPC-DS benchmark using Amazon EMR Spark

For detailed instructions, see Steps to run Spark Benchmarking.

Prerequisites

Complete the following prerequisite steps:

  1. Run aws configure to configure your AWS CLI shell to point to the benchmarking account. Refer to Configure the AWS CLI for instructions.
  2. Upload the benchmark application to Amazon S3.

Deploy the EMR cluster and run the benchmark job

Complete the following steps to run the benchmark job:

  1. Use the AWS CLI command as shown in Deploy EMR Cluster and run benchmark job to spin up an EMR on EC2 cluster. Update the provided script with the correct Amazon EMR version and root volume size, and provide the values required. Refer to create-cluster for a detailed description of the AWS CLI options.
  2. Store the cluster ID from the response. You need this in the next step.
  3. Submit the benchmark job in Amazon EMR using add-steps in the AWS CLI:
    1. Replace <cluster ID> with the cluster ID from the create cluster response.
    2. The benchmark application is at s3://<YOUR_S3_BUCKET>/spark-benchmark-assembly-3.5.1.jar.
    3. The TPC-DS source data is at s3://<YOUR_S3_BUCKET>/BLOG_TPCDS-TEST-3T-partitioned.
    4. The results are created in s3://<YOUR_S3_BUCKET>/benchmark_run.
aws emr add-steps \
    --cluster-id <cluster ID>  \
    --steps Type=Spark,Name="TPCDS Benchmark Job",Args=[--class,com.amazonaws.eks.tpcds.BenchmarkSQL,s3://<YOUR_S3_BUCKET>/spark-benchmark-assembly-3.5.1.jar,s3://<YOUR_S3_BUCKET>/BLOG_TPCDS-TEST-3T-partitioned,s3://<YOUR_S3_BUCKET>/benchmark_run,/home/hadoop/tpcds-kit/tools,parquet,3000,3,false,'q1-v2.13\,q10-v2.13\,q11-v2.13\,q12-v2.13\,q13-v2.13\,q14a-v2.13\,q14b-v2.13\,q15-v2.13\,q16-v2.13\,q17-v2.13\,q18-v2.13\,q19-v2.13\,q2-v2.13\,q20-v2.13\,q21-v2.13\,q22-v2.13\,q23a-v2.13\,q23b-v2.13\,q24a-v2.13\,q24b-v2.13\,q25-v2.13\,q26-v2.13\,q27-v2.13\,q28-v2.13\,q29-v2.13\,q3-v2.13\,q30-v2.13\,q31-v2.13\,q32-v2.13\,q33-v2.13\,q34-v2.13\,q35-v2.13\,q36-v2.13\,q37-v2.13\,q38-v2.13\,q39a-v2.13\,q39b-v2.13\,q4-v2.13\,q40-v2.13\,q41-v2.13\,q42-v2.13\,q43-v2.13\,q44-v2.13\,q45-v2.13\,q46-v2.13\,q47-v2.13\,q48-v2.13\,q49-v2.13\,q5-v2.13\,q50-v2.13\,q51-v2.13\,q52-v2.13\,q53-v2.13\,q54-v2.13\,q55-v2.13\,q56-v2.13\,q57-v2.13\,q58-v2.13\,q59-v2.13\,q6-v2.13\,q60-v2.13\,q61-v2.13\,q62-v2.13\,q63-v2.13\,q64-v2.13\,q65-v2.13\,q66-v2.13\,q67-v2.13\,q68-v2.13\,q69-v2.13\,q7-v2.13\,q70-v2.13\,q71-v2.13\,q72-v2.13\,q73-v2.13\,q74-v2.13\,q75-v2.13\,q76-v2.13\,q77-v2.13\,q78-v2.13\,q79-v2.13\,q8-v2.13\,q80-v2.13\,q81-v2.13\,q82-v2.13\,q83-v2.13\,q84-v2.13\,q85-v2.13\,q86-v2.13\,q87-v2.13\,q88-v2.13\,q89-v2.13\,q9-v2.13\,q90-v2.13\,q91-v2.13\,q92-v2.13\,q93-v2.13\,q94-v2.13\,q95-v2.13\,q96-v2.13\,q97-v2.13\,q98-v2.13\,q99-v2.13\,ss_max-v2.13',true],ActionOnFailure=CONTINUE

Summarize the results

After the job is complete, retrieve the summary results from s3://<YOUR_S3_BUCKET>/benchmark_run in the same way as the OSS benchmark runs and compute the average and geomean for Amazon EMR runs.

Clean up

To avoid incurring future charges, delete the resources you created using the instructions in the Cleanup section of the GitHub repo.

Summary

Amazon EMR continues to improve the EMR runtime for Apache Spark, leading to a performance improvement of 1.9x year-over-year and 4.5x faster performance than OSS Spark 3.5.1. We recommend that you stay up to date with the latest Amazon EMR release to take advantage of the latest performance benefits.

To keep up to date, subscribe to the Big Data Blog’s RSS feed to learn more about the EMR runtime for Apache Spark, configuration best practices, and tuning advice.


About the author

Ashok Chintalapati is a software development engineer for Amazon EMR at Amazon Web Services.

Steve Koonce is an Engineering Manager for EMR at Amazon Web Services.

Apply fine-grained access and transformation on the SUPER data type in Amazon Redshift

Post Syndicated from Ritesh Sinha original https://aws.amazon.com/blogs/big-data/apply-fine-grained-access-and-transformation-on-the-super-data-type-in-amazon-redshift/

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing ETL (extract, transform, and load), business intelligence (BI), and reporting tools. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads such as BI, predictive analytics, and real-time streaming analytics.

Amazon Redshift, a cloud data warehouse service, supports attaching dynamic data masking (DDM) policies to paths of SUPER data type columns, and uses the OBJECT_TRANSFORM function with the SUPER data type. SUPER data type columns in Amazon Redshift contain semi-structured data like JSON documents. Previously, data masking in Amazon Redshift only worked with regular table columns, but now you can apply masking policies specifically to elements within SUPER columns. For example, you could apply a masking policy to mask sensitive fields like credit card numbers within JSON documents stored in a SUPER column. This allows for more granular control over data masking in Amazon Redshift. Amazon Redshift gives you more flexibility in how you apply data masking to protect sensitive information stored in SUPER columns containing semi-structured data.

With DDM support in Amazon Redshift, you can do the following:

  • Define masking policies that apply custom obfuscation policies, such as masking policies to handle credit card, personally identifiable information (PII) entries, HIPAA or GDPR needs, and more
  • Transform the data at query time to apply masking policies
  • Attach masking policies to roles or users
  • Attach multiple masking policies with varying levels of obfuscation to the same column in a table and assign them to different roles with priorities to avoid conflicts
  • Implement cell-level masking by using conditional columns when creating your masking policy
  • Use masking policies to partially or completely redact data, or hash it by using user-defined functions (UDFs)

In this post, we demonstrate how a retail company can control the access of PII data stored in the SUPER data type to users based on their access privilege without duplicating the data.

Solution overview

For our use case, we have the following data access requirements:

  • Users from the Customer Service team should be able to view the order data but not PII information
  • Users from the Sales team should be able to view customer IDs and all order information
  • Users from the Executive team should be able to view all the data
  • Staff should not be able to view any data

The following diagram illustrates how DDM support in Amazon Redshift policies works with roles and users for our retail use case.

The solution encompasses creating masking policies with varying masking rules and attaching one or more to the same role and table with an assigned priority to remove potential conflicts. These policies may pseudonymize results or selectively nullify results to comply with retailers’ security requirements. We refer to multiple masking policies being attached to a table as a multi-modal masking policy. A multi-modal masking policy consists of three parts:

  • A data masking policy that defines the data obfuscation rules
  • Roles with different access levels depending on the business case
  • The ability to attach multiple masking policies on a user or role and table combination with priority for conflict resolution

Prerequisites

To implement this solution, you need the following prerequisites:

Prepare the data

To set up our use case, complete the following steps:

  1. On the Amazon Redshift console, choose Query editor v2 under Explorer in the navigation pane.

If you’re familiar with SQL Notebooks, you can download the SQL notebook for the demonstration and import it to quickly get started.

  1. Create the table and populate contents:
    -- 1- Create the orders table
    drop table if exists public.order_transaction;
    create table public.order_transaction (
     data_json super
    );
    
    -- 2- Populate the table with sample values
    INSERT INTO public.order_transaction
    VALUES
        (
            json_parse('
            {
            "c_custkey": 328558,
            "c_name": "Customer#000328558",
            "c_phone": "586-436-7415",
            "c_creditcard": "4596209611290987",
            "orders":{
              "o_orderkey": 8014018,
              "o_orderstatus": "F",
              "o_totalprice": 120857.71,
              "o_orderdate": "2024-01-01"
              }
            }'
            )
        ),
        (
            json_parse('
            {
            "c_custkey": 328559,
            "c_name": "Customer#000328559",
            "c_phone": "789-232-7421",
            "c_creditcard": "8709000219329924",
            "orders":{
              "o_orderkey": 8014019,
              "o_orderstatus": "S",
              "o_totalprice": 9015.98,
              "o_orderdate": "2024-01-01"
              }
            }'
            )
        ),
        (
            json_parse('
            {
            "c_custkey": 328560,
            "c_name": "Customer#000328560",
            "c_phone": "276-564-9023",
            "c_creditcard": "8765994378650090",
            "orders":{
              "o_orderkey": 8014020,
              "o_orderstatus": "C",
              "o_totalprice": 18765.56,
              "o_orderdate": "2024-01-01"
              }
            }
            ')
        );

Implement the solution

To satisfy the security requirements, we need to make sure that each user sees the same data in different ways based on their granted privileges. To do that, we use user roles combined with masking policies as follows:

  1. Create users and roles, and add users to their respective roles:
    --create four users
    set session authorization admin;
    CREATE USER Kate_cust WITH PASSWORD disable;
    CREATE USER Ken_sales WITH PASSWORD disable;
    CREATE USER Bob_exec WITH PASSWORD disable;
    CREATE USER Jane_staff WITH PASSWORD disable;
    
    -- 1. Create User Roles
    CREATE ROLE cust_srvc_role;
    CREATE ROLE sales_srvc_role;
    CREATE ROLE executives_role;
    CREATE ROLE staff_role;
    
    -- note that public role exists by default.
    -- Grant Roles to Users
    GRANT ROLE cust_srvc_role to Kate_cust;
    GRANT ROLE sales_srvc_role to Ken_sales;
    GRANT ROLE executives_role to Bob_exec;
    GRANT ROLE staff_role to Jane_staff;
    
    -- note that regualr_user is attached to public role by default.
    GRANT ALL ON ALL TABLES IN SCHEMA "public" TO ROLE cust_srvc_role;
    GRANT ALL ON ALL TABLES IN SCHEMA "public" TO ROLE sales_srvc_role;
    GRANT ALL ON ALL TABLES IN SCHEMA "public" TO ROLE executives_role;
    GRANT ALL ON ALL TABLES IN SCHEMA "public" TO ROLE staff_role;

  2. Create masking policies:
    -- Mask Full Data
    CREATE MASKING POLICY mask_full
    WITH(pii_data VARCHAR(256))
    USING ('000000XXXX0000'::TEXT);
    
    -- This policy rounds down the given price to the nearest 10.
    CREATE MASKING POLICY mask_price
    WITH(price INT)
    USING ( (FLOOR(price::FLOAT / 10) * 10)::INT );
    
    -- This policy converts the first 12 digits of the given credit card to 'XXXXXXXXXXXX'.
    CREATE MASKING POLICY mask_credit_card
    WITH(credit_card TEXT)
    USING ( 'XXXXXXXXXXXX'::TEXT || SUBSTRING(credit_card::TEXT FROM 13 FOR 4) );
    
    -- This policy mask the given date
    CREATE MASKING POLICY mask_date
    WITH(order_date TEXT)
    USING ( 'XXXX-XX-XX'::TEXT);
    
    -- This policy mask the given phone number
    CREATE MASKING POLICY mask_phone
    WITH(phone_number TEXT)
    USING ( 'XXX-XXX-'::TEXT || SUBSTRING(phone_number::TEXT FROM 9 FOR 4) );

  3. Attach the masking policies:
    • Attach the masking policy for the customer service use case:
      --customer_support (cannot see customer PHI/PII data but can see the order id , order details and status etc.)
      
      set session authorization admin;
      
      ATTACH MASKING POLICY mask_full
      ON public.order_transaction(data_json.c_custkey)
      TO ROLE cust_srvc_role;
      
      ATTACH MASKING POLICY mask_phone
      ON public.order_transaction(data_json.c_phone)
      TO ROLE cust_srvc_role;
      
      ATTACH MASKING POLICY mask_credit_card
      ON public.order_transaction(data_json.c_creditcard)
      TO ROLE cust_srvc_role;
      
      ATTACH MASKING POLICY mask_price
      ON public.order_transaction(data_json.orders.o_totalprice)
      TO ROLE cust_srvc_role;
      
      ATTACH MASKING POLICY mask_date
      ON public.order_transaction(data_json.orders.o_orderdate)
      TO ROLE cust_srvc_role;

    • Attach the masking policy for the sales use case:
      --sales —> can see the customer ID (non phi data) and all order info
      
      set session authorization admin;
      
      ATTACH MASKING POLICY mask_phone
      ON public.order_transaction(data_json.customer.c_phone)
      TO ROLE sales_srvc_role;

    • Attach the masking policy for the staff use case:
      --Staff — > cannot see any data about the order. all columns masked for them ( we can hand pick some columns) to show the functionality
      
      set session authorization admin;
      
      ATTACH MASKING POLICY mask_full
      ON public.order_transaction(data_json.orders.o_orderkey)
      TO ROLE staff_role;
      
      ATTACH MASKING POLICY mask_pii_full
      ON public.order_transaction(data_json.orders.o_orderstatus)
      TO ROLE staff_role;
      
      ATTACH MASKING POLICY mask_pii_price
      ON public.order_transaction(data_json.orders.o_totalprice)
      TO ROLE staff_role;
      
      ATTACH MASKING POLICY mask_date
      ON public.order_transaction(data_json.orders.o_orderdate)
      TO ROLE staff_role;

Test the solution

Let’s confirm that the masking policies are created and attached.

  1. Check that the masking policies are created with the following code:
    -- 1.1- Confirm the masking policies are created
    SELECT * FROM svv_masking_policy;

  2. Check that the masking policies are attached:
    -- 1.2- Verify attached masking policy on table/column to user/role.
    SELECT * FROM svv_attached_masking_policy;

Now you can test that different users can see the same data masked differently based on their roles.

  1. Test that the customer support can’t see customer PHI/PII data but can see the order ID, order details, and status:
    set session authorization Kate_cust;
    select * from order_transaction;

  2. Test that the sales team can see the customer ID (non PII data) and all order information:
    set session authorization Ken_sales;
    select * from order_transaction;

  3. Test that the executives can see all data:
    set session authorization Bob_exec;
    select * from order_transaction;

  4. Test that the staff can’t see any data about the order. All columns should masked for them.
    set session authorization Jane_staff;
    select * from order_transaction;

Object_Transform function

In this section, we dive into the capabilities and benefits of the OBJECT_TRANSFORM function and explore how it empowers you to efficiently reshape your data for analysis. The OBJECT_TRANSFORM function in Amazon Redshift is designed to facilitate data transformations by allowing you to manipulate JSON data directly within the database. With this function, you can apply transformations to semi-structured or SUPER data types, making it less complicated to work with complex data structures in a relational database environment.

Let’s look at some usage examples.

First, create a table and populate contents:

--1- Create the customer table 

DROP TABLE if exists customer_json;

CREATE TABLE customer_json (
    col_super super,
    col_text character varying(100) ENCODE lzo
) DISTSTYLE AUTO;

--2- Populate the table with sample data 

INSERT INTO customer_json
VALUES
    (
        
        json_parse('
            {
                "person": {
                    "name": "GREGORY HOUSE",
                    "salary": 120000,
                    "age": 17,
                    "state": "MA",
                    "ssn": ""
                }
            }
        ')
        ,'GREGORY HOUSE'
    ),
    (
        json_parse('
              {
                "person": {
                    "name": "LISA CUDDY",
                    "salary": 180000,
                    "age": 30,
                    "state": "CA",
                    "ssn": ""
                }
            }
        ')
        ,'LISA CUDDY'
    ),
     (
        json_parse('
              {
                "person": {
                    "name": "JAMES WILSON",
                    "salary": 150000,
                    "age": 35,
                    "state": "WA",
                    "ssn": ""
                }
            }
        ')
        ,'JAMES WILSON'
    )
;
-- 3 select the data 

SELECT * FROM customer_json;

Apply the transformations with the OBJECT_TRANSFORM function:

SELECT
    OBJECT_TRANSFORM(
        col_super
        KEEP
            '"person"."name"',
            '"person"."age"',
            '"person"."state"'
           
        SET
            '"person"."name"', LOWER(col_super.person.name::TEXT),
            '"person"."salary"',col_super.person.salary + col_super.person.salary*0.1
    ) AS col_super_transformed
FROM customer_json;

As you can see in the example, by applying the transformation with OBJECT_TRANSFORM, the person name is formatted in lowercase and the salary is increased by 10%. This demonstrates how the transformation makes is less complicated to work with semi-structured or nested data types.

Clean up

When you’re done with the solution, clean up your resources:

  1. Detach the masking policies from the table:
    -- Cleanup
    --reset session authorization to the default
    RESET SESSION AUTHORIZATION;

  2. Drop the masking policies:
    DROP MASKING POLICY mask_pii_data CASCADE;

  3. Revoke or drop the roles and users:
    REVOKE ROLE cust_srvc_role from Kate_cust;
    REVOKE ROLE sales_srvc_role from Ken_sales;
    REVOKE ROLE executives_role from Bob_exec;
    REVOKE ROLE staff_role from Jane_staff;
    DROP ROLE cust_srvc_role;
    DROP ROLE sales_srvc_role;
    DROP ROLE executives_role;
    DROP ROLE staff_role;
    DROP USER Kate_cust;
    DROP USER Ken_sales;
    DROP USER Bob_exec;
    DROP USER Jane_staff;

  4. Drop the table:
    DROP TABLE order_transaction CASCADE;
    DROP TABLE if exists customer_json;

Considerations and best practices

Consider the following when implementing this solution:

  • When attaching a masking policy to a path on a column, that column must be defined as the SUPER data type. You can only apply masking policies to scalar values on the SUPER path. You can’t apply masking policies to complex structures or arrays.
  • You can apply different masking policies to multiple scalar values on a single SUPER column as long as the SUPER paths don’t conflict. For example, the SUPER paths a.b and a.b.c conflict because they’re on the same path, with a.b being the parent of a.b.c. The SUPER paths a.b.c and a.b.d don’t conflict.

Refer to Using dynamic data masking with SUPER data type paths for more details on considerations.

Conclusion

In this post, we discussed how to use DDM support for the SUPER data type in Amazon Redshift to define configuration-driven, consistent, format-preserving, and irreversible masked data values. With DDM support in Amazon Redshift, you can control your data masking approach using familiar SQL language. You can take advantage of the Amazon Redshift role-based access control capability to implement different levels of data masking. You can create a masking policy to identify which column needs to be masked, and you have the flexibility of choosing how to show the masked data. For example, you can completely hide all the information of the data, replace partial real values with wildcard characters, or define your own way to mask the data using SQL expressions, Python, or Lambda UDFs. Additionally, you can apply conditional masking based on other columns, which selectively protects the column data in a table based on the values in one or more columns.

We encourage you to create your own user-defined functions for various use cases and achieve your desired security posture using dynamic data masking support in Amazon Redshift.


About the Authors

Ritesh Kumar Sinha is an Analytics Specialist Solutions Architect based out of San Francisco. He has helped customers build scalable data warehousing and big data solutions for over 16 years. He loves to design and build efficient end-to-end solutions on AWS. In his spare time, he loves reading, walking, and doing yoga.

Tahir Aziz is an Analytics Solution Architect at AWS. He has worked with building data warehouses and big data solutions for over 15+ years. He loves to help customers design end-to-end analytics solutions on AWS. Outside of work, he enjoys traveling and cooking.

Omama Khurshid is an Acceleration Lab Solutions Architect at Amazon Web Services. She focuses on helping customers across various industries build reliable, scalable, and efficient solutions. Outside of work, she enjoys spending time with her family, watching movies, listening to music, and learning new technologies.

Introducing AWS Glue usage profiles for flexible cost control

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-usage-profiles-for-flexible-cost-control/

AWS Glue is a serverless data integration service that enables you to run extract, transform, and load (ETL) workloads on your data in a scalable and serverless manner. One of the main advantages of using a cloud platform is its flexibility; you can provision compute resources when you actually need them. However, with this ease of creating resources comes a risk of spiraling cloud costs when those resources are left unmanaged or without guardrails. As a result, admins need to balance avoiding high infrastructure costs with allowing users to work without unnecessary friction.

To address that, today we are excited to announce the general availability of AWS Glue usage profiles. With AWS Glue usage profiles, admins can create different profiles for various classes of users within the account, such as developers, testers, and product teams. Each profile is a unique set of parameters that can be assigned to different types of users. For example, developers may need more workers and can have a higher number of maximum workers, whereas product teams may need fewer workers and a lower timeout or idle timeout value.

How AWS Glue usage profiles works

An AWS Glue usage profile is a resource identified by an Amazon Resource Name (ARN) for better governance of resources. Admins have the ability to create AWS Glue usage profiles and define default values to be used when a parameter value is not provided. For example, you can create an AWS Glue usage profile with the default number of workers set to 2. When you sign in to the AWS Glue console using the AWS Identity and Access Management (IAM) user associated with the usage profile and create a new job, the initial value configured for the number of workers shows as 2 instead of the service default of 10.

Additionally, you can specify a set of allowed values for validation when a user associated with this profile creates a resource. If the parameter is numeric, admins can define a range of allowed values by specifying minimum and maximum values, instead of a specific set. For example, you can create an AWS Glue usage profile that allows only G.1X worker types. When you sign in to the AWS Glue console using an IAM user associated with this usage profile and create a job with a G.2X worker type, saving it will result in a failure.

Because an AWS Glue profile is a resource identified by an ARN, all the default IAM controls apply, including action-based, resource-based, and tag-based authorization. Admins update the IAM policy of users who create AWS Glue resources, granting them read permission on the profiles. This enables users to view the profiles. In order to use them when making API calls to create AWS Glue resources, admins will tag the user or role with glue:UsageProfile as the key and the profile name as the value. AWS Glue validates the API requests such as CreateJob, UpdateJob, StartJobRun, and CreateSession based on the values specified in the AWS Glue profile and raise appropriate exceptions.

In the following sections, we demonstrate how to create AWS Glue usage profiles, assign profiles to users, and demonstrate the usage profiles in action.

Create an AWS Glue usage profiles

To get started and create AWS Glue usage profiles, complete the following steps:

  1. On the AWS Glue console, choose Cost management in the navigation pane.

Let’s create your first usage profile for your developers.

  1. Choose Create usage profile.
  2. For Usage profile name, enter developer.
  3. Under Customize configurations for jobs, for Number of workers, for Default, enter 20.
  4. For Default worker type, choose G.1X.
  5. For Allowed worker types, choose G.1X, G.2X, G.4X, and G.8X.
  6. For Customize configurations for sessions, configure the same values.
  7. Choose Create usage profile.

Next, create another usage profile for your business analysts, who need fewer workers and a lower timeout or idle timeout value.

  1. Choose Create usage profile.
  2. For Usage profile name, enter analyst.
  3. Under Customize configurations for jobs, for Number of workers, for Default, enter 2. For Maximum, enter 5.
  4. For Default worker type, choose G.1X.
  5. For Allowed worker types, choose only G.1X.
  6. For Timeout, for Default, enter 60. For Maximum, enter 120.
  7. For Customize configurations for sessions, configure the same values.
  8. For Idle timeout, for Default, enter 10. For Maximum, enter 60.
  9. Choose Create usage profile.

You have successfully created two usage profiles.

Assign usage profiles

Restrictions can only be applied to AWS Glue API calls made by IAM users or roles if the profile is assigned to them. There are two steps that the admin needs to take in order to assign a profile:

  • In IAM, create a tag named glue:UsageProfile on the user or role, with the name of the profile used as the tag value
  • The IAM policy assigned to the user or role needs to be updated to include the glue:GetUsageProfile IAM action permission to read the assigned profile

Follow these steps to create two new users, each assigned a different profile:

  1. On the IAM console, choose Users in the navigation pane.
  2. Choose Create user.
  3. For User name, enter blogDeveloper.
  4. Select Provide user access to the AWS Management Console and I want to create an IAM user.
  5. You can enter a custom password or let one be generated (in the latter case, select Show password so you can use it later to sign in).
  6. Choose Next.
  7. Attach the managed policies AWSGlueConsoleFullAccess and IAMReadOnlyAccess.
  8. Choose Next.
  9. Review the summary and complete the creation.
  10. Remember the password for later and choose Return to users list and choose the user just created.
  11. On the Permissions tab, for Add permissions, choose Create inline policy.
  12. In the policy editor, switch to JSON and enter the following policy, replacing the AWS Region, account ID, and usage profile name placeholders. For the usage profile name, use the value developer for the user blogDeveloper and analyst for the role blogAnalyst.
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "glue:GetUsageProfile"
          ],
          "Resource": [
            "arn:aws:glue:<aws region>:<account id>:usageProfile/<usage profile name>"
          ]
        },
        {
          "Effect": "Allow",
          "Action": [
            "iam:PassRole"
          ],
          "Resource": [
            "*"
          ],
          "Condition": {
            "StringLike": {
              "iam:PassedToService": [
                "glue.amazonaws.com"
              ]
            }
          }
        }
      ]
    }

  13. Name the policy GlueUsageProfilePermission and complete the creation.
  14. On the Tags tab, add a new tag with the name glue:UsageProfile and the value developer.

Repeat the steps to create a user named blogAnalyst, and replace the ARN in the policy with arn:aws:glue:<aws region>:<account id>:usageProfile/analyst. Make sure the Region and account ID are populated before updating the policy. For the tag value, specify analyst instead of developer.

On the AWS Glue console, navigate to the developer usage profile. You can see that the status has been changed from Not assigned to Assigned.

Lastly, complete the following steps to create two IAM roles for AWS Glue jobs and sessions with the profile.

  1. Create two IAM roles for AWS Glue. Name them GlueServiceRole-developer and GlueServiceRole-analyst.
  2. Configure the following inline policies by replacing the Region, account ID, and usage profile name placeholders. For the usage profile name placeholder, use the value developer for the role GlueServiceRole-developer and analyst for the role GlueServiceRole-analyst.
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "glue:GetUsageProfile"
          ],
          "Resource": [
            "arn:aws:glue:<aws region>:<account id>:usageProfile/<usage profile name>"
          ]
        },
        {
          "Effect": "Allow",
          "Action": [
            "iam:PassRole"
          ],
          "Resource": [
            "*"
          ],
          "Condition": {
            "StringLike": {
              "iam:PassedToService": [
                "glue.amazonaws.com"
              ]
            }
          }
        }
      ]
    }

  3. On the Tags tab for the IAM role, add a new tag with the name glue:UsageProfile and the value developer for GlueServiceRole-developer and analyst for GlueServiceRole-analyst.

Usage profiles in action: Jobs

Now you have two users with different AWS Glue profiles assigned. Let’s test them and see the differences. First, let’s try the user blogDeveloper to see how the profile developer works.

  1. Open the AWS Glue console with the blogDeveloper user.
  2. Choose ETL jobs in the navigation pane and choose Script editor.
  3. Choose Create script.
  4. Choose the Job details tab.

The default number of Requested number of workers is 20, which corresponds to the default setting of the profile developer.

Next, let’s try the user blogAnalyst to see how the profile analyst works.

  1. Open AWS Glue console with the blogAnalyst user.
  2. Choose ETL jobs in the navigation pane and choose Script editor.
  3. Choose Create script.
  4. Choose the Job details tab.

The default number of Requested number of workers is 2, which corresponds to the default setting of the profile analyst.

Additionally, the default number of Job timeout is 60, which corresponds to the default setting of the profile analyst.

  1. For Worker type, choose the dropdown menu.

Only G.1X is available and G.2X, G.4X, and G.8X are disabled. This is because we allowed the profile analyst to choose G.1X.

  1. For Requested number of workers, enter 20 to simulate invalid input.

You will see the waring message The maximum number of workers cannot exceed 5 for usage profile "analyst".

Now, the user blogAnalyst is attempting to run a job in the account where the number of workers set for the job is 20. However, the maximum number of workers in the profile assigned to this user is 5. When the user tries to run the job, it fails with an error, as shown in the following screenshot.

In this example, we’ve demonstrated how usage profiles manage AWS Glue jobs based on the preconfigured values in the profiles.

Usage profiles in action: Sessions

Next, continue using the user blogAnalyst and try the AWS Glue Studio notebook interface to see how interactive sessions work with usage profiles:

  1. Open the AWS Glue console with the blogAnalyst user.
  2. Choose ETL jobs in the navigation pane and choose Notebook.
  3. For IAM role, choose GlueServiceRole-analyst.
  4. Choose Create notebook.
  5. Wait for the notebook to be ready.

In the second notebook cell, %number_of_workers is set to 2, which corresponds to the default value of the profile analyst.

  1. Update %number_of_workers from 2 to 10 to simulate an invalid access pattern:
    %number_of_workers 10

  2. Run the cell.

You get an error message saying “Provided number of workers is not within the range [1, 5] in the analyst profile.”

This is because the given value of 10 exceeds the maximum number of workers set in the profile assigned to this user.

  1. Update %number_of_workers from 10 to 5 to simulate a valid access pattern:
    %number_of_workers 5

  2. Run the cell.

This time, the session has been successfully created.

Now you have observed how usage profiles manage AWS Glue interactive sessions based on the preconfigured values in the profiles.

Conclusion

This post demonstrated how AWS Glue usage profiles allow you to manage your AWS Glue resources with ease and flexibility.

With AWS Glue usage profiles, you can manage and control resources of different users in order to set your organization’s best practices and save costs. AWS Glue usage profiles serve as a guardrail to prevent unauthorized resource usage from occurring.

Try out the feature for yourself, and leave any feedback or questions in the comments.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team, with a background in machine learning and AI.

Keerthi Chadalavada is a Senior Software Development Engineer at AWS Glue. She is passionate about designing and building end-to-end solutions to address customer data integration and analytic needs.

Gal HeyneGal Heyne is a Product Manager for AWS Glue with a strong focus on AI/ML, data engineering, and BI. She is passionate about developing a deep understanding of customers’ business needs and collaborating with engineers to design easy-to-use data products.

Optimize storage costs in Amazon OpenSearch Service using Zstandard compression

Post Syndicated from Sarthak Aggarwal original https://aws.amazon.com/blogs/big-data/optimize-storage-costs-in-amazon-opensearch-service-using-zstandard-compression/

This post is co-written with Praveen Nischal and Mulugeta Mammo from Intel.

Amazon OpenSearch Service is a managed service that makes it straightforward to secure, deploy, and operate OpenSearch clusters at scale in the AWS Cloud. In an OpenSearch Service domain, the data is managed in the form of indexes. Based on the usage pattern, an OpenSearch cluster may have one or more indexes, and their shards are spread across the data nodes in the cluster. Each data node has a fixed disk size and the disk usage is dependent on the number of index shards stored on the node. Each index shard may occupy different sizes based on its number of documents. In addition to the number of documents, one of the important factors that determine the size of the index shard is the compression strategy used for an index.

As part of an indexing operation, the ingested documents are stored as immutable segments. Each segment is a collection of various data structures, such as inverted index, block K dimensional tree (BKD), term dictionary, or stored fields, and these data structures are responsible for retrieving the document faster during the search operation. Out of these data structures, stored fields, which are largest fields in the segment, are compressed when stored on the disk and based on the compression strategy used, the compression speed and the index storage size will vary.

In this post, we discuss the performance of the Zstandard algorithm, which was introduced in OpenSearch v2.9, amongst other available compression algorithms in OpenSearch.

Importance of compression in OpenSearch

Compression plays a crucial role in OpenSearch, because it significantly impacts the performance, storage efficiency and overall usability of the platform. The following are some key reasons highlighting the importance of compression in OpenSearch:

  1. Storage efficiency and cost savings OpenSearch often deals with vast volumes of data, including log files, documents, and analytics datasets. Compression techniques reduce the size of data on disk, leading to substantial cost savings, especially in cloud-based and/or distributed environments.
  2. Reduced I/O operations Compression reduces the number of I/O operations required to read or write data. Fewer I/O operations translate into reduced disk I/O, which is vital for improving overall system performance and resource utilization.
  3. Environmental impact By minimizing the storage requirements and reduced I/O operations, compression contributes to a reduction in energy consumption and a smaller carbon footprint, which aligns with sustainability and environmental goals.

When configuring OpenSearch, it’s essential to consider compression settings carefully to strike the right balance between storage efficiency and query performance, depending on your specific use case and resource constraints.

Core concepts

Before diving into various compression algorithms that OpenSearch offers, let’s look into three standard metrics that are often used while comparing compression algorithms:

  1. Compression ratio The original size of the input compared with the compressed data, expressed as a ratio of 1.0 or greater
  2. Compression speed The speed at which data is made smaller (compressed), expressed in MBps of input data consumed
  3. Decompression speed The speed at which the original data is reconstructed from the compressed data, expressed in MBps

Index codecs

OpenSearch provides support for codecs that can be used for compressing the stored fields. Until OpenSearch 2.7, OpenSearch provided two codecs or compression strategies: LZ4 and Zlib. LZ4 is analogous to best_speed because it provides faster compression but a lesser compression ratio (consumes more disk space) when compared to Zlib. LZ4 is used as the default compression algorithm if no explicit codec is specified during index creation and is preferred by most because it provides faster indexing and search speeds though it consumes relatively more space than Zlib. Zlib is analogous to best_compression because it provides a better compression ratio (consumes less disk space) when compared to LZ4, but it takes more time to compress and decompress, and therefore has higher latencies for indexing and search operations. Both LZ4 and Zlib codecs are part of the Lucene core codecs.

Zstandard codec

The Zstandard codec was introduced in OpenSearch as an experimental feature in version 2.7, and it provides Zstandard-based compression and decompression APIs. The Zstandard codec is based on JNI binding to the Zstd native library.

Zstandard is a fast, lossless compression algorithm aimed at providing a compression ratio comparable to Zlib but with faster compression and decompression speed comparable to LZ4. The Zstandard compression algorithm is available in two different modes in OpenSearch: zstd and zstd_no_dict. For more details, see Index codecs.

Both codec modes aim to balance compression ratio, index, and search throughput. The zstd_no_dict option excludes a dictionary for compression at the expense of slightly larger index sizes.

With the recent OpenSearch 2.9 release, the Zstandard codec has been promoted from experimental to mainline, making it suitable for production use cases.

Create an index with the Zstd codec

You can use the index.codec during index creation to create an index with the Zstd codec. The following is an example using the curl command (this command requires the user to have necessary privileges to create an index):

# Creating an index
curl -XPUT "http://localhost:9200/your_index" -H 'Content-Type: application/json' -d'
{
  "settings": {
    "index.codec": "zstd"
  }
}'

Zstandard compression levels

With Zstandard codecs, you can optionally specify a compression level using the index.codec.compression_level setting, as shown in the following code. This setting takes integers in the [1, 6] range. A higher compression level results in a higher compression ratio (smaller storage size) with a trade-off in speed (slower compression and decompression speeds lead to higher indexing and search latencies). For more details, see Choosing a codec.

# Creating an index
curl -XPUT "http://localhost:9200/your_index" -H 'Content-Type: application/json' -d'
{
  "settings": {
    "index.codec": "zstd",
    "index.codec.compression_level": 2
  }
}
'

Update an index codec setting

You can update the index.codec and index.codec.compression_level settings any time after the index is created. For the new configuration to take effect, the index needs to be closed and reopened.

You can update the setting of an index using a PUT request. The following is an example using curl commands.

Close the index:

# Close the index 
curl -XPOST "http://localhost:9200/your_index/_close"

Update the index settings:

# Update the index.codec and codec.compression_level setting
curl -XPUT "http://localhost:9200/your_index/_settings" -H 'Content-Type: application/json' -d' 
{ 
  "index": {
    "codec": "zstd_no_dict", 
    "codec.compression_level": 3 
  } 
}'

Reopen the index:

# Reopen the index
curl -XPOST "http://localhost:9200/your_index/_open"

Changing the index codec settings doesn’t immediately affect the size of existing segments. Only new segments created after the update will reflect the new codec setting. To have consistent segment sizes and compression ratios, it may be necessary to perform a reindexing or other indexing processes like merges.

Benchmarking compression performance of compression in OpenSearch

To understand the performance benefits of Zstandard codecs, we carried out a benchmark exercise.

Setup

The server setup was as follows:

  1. Benchmarking was performed on an OpenSearch cluster with a single data node which acts as both data and coordinator node and with a dedicated cluster_manager node.
  2. The instance type for the data node was r5.2xlarge and the cluster_manager node was r5.xlarge, both backed by an Amazon Elastic Block Store (Amazon EBS) volume of type GP3 and size 100GB.

Benchmarking was set up as follows:

  1. The benchmark was run on a single node of type c5.4xlarge (sufficiently large to avoid hitting client-side resource constraints) backed by an EBS volume of type GP3 and size 500GB.
  2. The number of clients was 16 and bulk size was 1024
  3. The workload was nyc_taxis

The index setup was as follows:

  1. Number of shards: 1
  2. Number of replicas: 0

Results

From the experiments, zstd provides a better compression ratio compared to Zlib (best_compression) with a slight gain in write throughput and with similar read latency as LZ4 (best_speed). zstd_no_dict provides 14% better write throughput than LZ4 (best_speed) and a slightly lower compression ratio than Zlib (best_compression).

The following table summarizes the benchmark results.

Limitations

Although Zstd provides the best of both worlds (compression ratio and compression speed), it has the following limitations:

  1. Certain queries that fetch the entire stored fields for all the matching documents may observe an increase in latency. For more information, see Changing an index codec.
  2. You can’t use the zstd and zstd_no_dict compression codecs for k-NN or Security Analytics indexes.

Conclusion

Zstandard compression provides a good balance between storage size and compression speed, and is able to tune the level of compression based on the use case. Intel and the OpenSearch Service team collaborated on adding Zstandard as one of the compression algorithms in OpenSearch. Intel contributed by designing and implementing the initial version of compression plugin in open-source which was released in OpenSearch v2.7 as experimental feature. OpenSearch Service team worked on further improvements, validated the performance results and integrated it into the OpenSearch server codebase where it was released in OpenSearch v2.9 as a generally available feature.

If you would want to contribute to OpenSearch, create a GitHub issue and share your ideas with us. We would also be interested in learning about your experience with Zstandard in OpenSearch Service. Please feel free to ask more questions in the comments section.


About the Authors

Praveen Nischal is a Cloud Software Engineer, and leads the cloud workload performance framework at Intel.

Mulugeta Mammo is a Senior Software Engineer, and currently leads the OpenSearch Optimization team at Intel.

Akash Shankaran is a Software Architect and Tech Lead in the Xeon software team at Intel. He works on pathfinding opportunities, and enabling optimizations for data services such as OpenSearch.

Sarthak Aggarwal is a Software Engineer at Amazon OpenSearch Service. He has been contributing towards open-source development with indexing and storage performance as a primary area of interest.

Prabhakar Sithanandam is a Principal Engineer with Amazon OpenSearch Service. He primarily works on the scalability and performance aspects of OpenSearch.

How Cloudinary transformed their petabyte scale streaming data lake with Apache Iceberg and AWS Analytics

Post Syndicated from Yonatan Dolan original https://aws.amazon.com/blogs/big-data/how-cloudinary-transformed-their-petabyte-scale-streaming-data-lake-with-apache-iceberg-and-aws-analytics/

This post is co-written with Amit Gilad, Alex Dickman and Itay Takersman from Cloudinary. 

Enterprises and organizations across the globe want to harness the power of data to make better decisions by putting data at the center of every decision-making process. Data-driven decisions lead to more effective responses to unexpected events, increase innovation and allow organizations to create better experiences for their customers. However, throughout history, data services have held dominion over their customers’ data. Despite the potential separation of storage and compute in terms of architecture, they are often effectively fused together. This amalgamation empowers vendors with authority over a diverse range of workloads by virtue of owning the data. This authority extends across realms such as business intelligence, data engineering, and machine learning thus limiting the tools and capabilities that can be used.

The landscape of data technology is swiftly advancing, driven frequently by projects led by the open source community in general and the Apache foundation specifically. This evolving open source landscape allows customers complete control over data storage, processing engines and permissions expanding the array of available options significantly. This approach also encourages vendors to compete based on the value they provide to businesses, rather than relying on potential fusing of storage and compute. This fosters a competitive environment that prioritizes customer acquisition and prompts vendors to differentiate themselves through unique features and offerings that cater directly to the specific needs and preferences of their clientele.

A modern data strategy redefines and enables sharing data across the enterprise and allows for both reading and writing of a singular instance of the data using an open table format. The open table format accelerates companies’ adoption of a modern data strategy because it allows them to use various tools on top of a single copy of the data.

Cloudinary is a cloud-based media management platform that provides a comprehensive set of tools and services for managing, optimizing, and delivering images, videos, and other media assets on websites and mobile applications. It’s widely used by developers, content creators, and businesses to streamline their media workflows, enhance user experiences, and optimize content delivery.

In this blog post, we dive into different data aspects and how Cloudinary breaks the two concerns of vendor locking and cost efficient data analytics by using Apache Iceberg, Amazon Simple Storage Service (Amazon S3), Amazon Athena, Amazon EMR, and AWS Glue.

Short overview of Cloudinary’s infrastructure

Cloudinary infrastructure handles over 20 billion requests daily with every request generating event logs. Various data pipelines process these logs, storing petabytes (PBs) of data per month, which after processing data stored on Amazon S3, are then stored in Snowflake Data Cloud. These datasets serve as a critical resource for Cloudinary internal teams and data science groups to allow detailed analytics and advanced use cases.

Until recently, this data was mostly prepared by automated processes and aggregated into results tables, used by only a few internal teams. Cloudinary struggled to use this data for additional teams who had more online, real time, lower-granularity, dynamic usage requirements. Making petabytes of data accessible for ad-hoc reports became a challenge as query time increased and costs skyrocketed along with growing compute resource requirements. Cloudinary data retention for the specific analytical data discussed in this post was defined as 30 days. However, new use cases drove the need for increased retention, which would have led to significantly higher cost.

The data is flowing from Cloudinary log providers into files written into Amazon S3 and notified through events pushed to Amazon Simple Queue Service (Amazon SQS). Those SQS events are ingested by a Spark application running in Amazon EMR Spark, which parses and enriches the data. The processed logs are written in Apache Parquet format back to Amazon S3 and then automatically loaded to a Snowflake table using Snowpipe.

Why Cloudinary chose Apache Iceberg

Apache Iceberg is a high-performance table format for huge analytic workloads. Apache Iceberg brings the reliability and simplicity of SQL tables to big data, while making it possible for processing engines such as Apache Spark, Trino, Apache Flink, Presto, Apache Hive, and Impala to safely work with the same tables at the same time.

A solution based on Apache Iceberg encompasses complete data management, featuring simple built-in table optimization capabilities within an existing storage solution. These capabilities, along with the ability to use multiple engines on top of a singular instance of data, helps avoid the need for data movement between various solutions.

While exploring the various controls and options in configuring Apache Iceberg, Cloudinary had to adapt its data to use AWS Glue Data Catalog, as well as move a significant volume of data to Apache Iceberg on Amazon S3. At this point it became clear that costs would be significantly reduced, and while it had been a key factor since the planning phase, it was now possible to get concrete numbers. One example is that Cloudinary was now able to store 6 months of data for the same storage price that was previously paid for storing 1 month of data. This cost saving was achieved by using Amazon S3 storage tiers as well as improved compression (Zstandard), further enhanced by the fact that Parquet files were sorted.

Since Apache Iceberg is well supported by AWS data services and Cloudinary was already using Spark on Amazon EMR, they could integrate writing to Data Catalog and start an additional Spark cluster to handle data maintenance and compaction. As exploration continued with Apache Iceberg, some interesting performance metrics were found. For example, for certain queries, Athena runtime was 2x–4x faster than Snowflake.

Integration of Apache Iceberg

The integration of Apache Iceberg was done before loading data to Snowflake. The data is written to an Iceberg table using Apache Parquet data format and AWS Glue as the data catalog. In addition, a Spark application on Amazon EMR runs in the background handling compaction of the Parquet files to optimal size for querying through various tools such as Athena, Trino running on top of EMR, and Snowflake.

Challenges faced

Cloudinary faced several challenges while building its petabyte-scale data lake, including:

  • Determining optimal table partitioning
  • Optimizing ingestion
  • Solving the small files problem to improve query performance
  • Cost effectively maintaining Apache Iceberg tables
  • Choosing the right query engine

In this section, we describe each of these challenges and the solutions implemented to address them. Many of the tests to check performance and volumes of data scanned have used Athena because it provides a simple to use, fully serverless, cost effective, interface without the need to setup infrastructure.

Determining optimal table partitioning

Apache Iceberg makes partitioning easier for the user by implementing hidden partitioning. Rather than forcing the user to supply a separate partition filter at query time, Iceberg tables can be configured to map regular columns to the partition keys. Users don’t need to maintain partition columns or even understand the physical table layout to get fast and accurate query results.

Iceberg has several partitioning options. One example is when partitioning timestamps, which can be done by year, month, day, and hour. Iceberg keeps track of the relationship between a column value and its partition without requiring additional columns. Iceberg can also partition categorical column values by identity, hash buckets, or truncation. In addition, Iceberg partitioning is user-friendly because it also allows partition layouts to evolve over time without breaking pre-written queries. For example, when using daily partitions and the query pattern changes over time to be based on hours, it’s possible to evolve the partitions to hourly ones, thus making queries more efficient. When evolving such a partition definition, the data in the table prior to the change is unaffected, as is its metadata. Only data that is written to the table after the evolution is partitioned with the new definition, and the metadata for this new set of data is kept separately. When querying, each partition layout’s respective metadata is used to identify the files that need to be accessed; this is called split-planning. Split-planning is one of many Iceberg features that are made possible due to the table metadata, which creates a separation between the physical and the logical storage. This concept makes Iceberg extremely versatile.

Determining the correct partitioning is key when working with large data sets because it affects query performance and the amount of data being scanned. Because this migration was from existing tables from Snowflake native storage to Iceberg, it was crucial to test and provide a solution with the same or better performance for the existing workload and types of queries.

These tests were possible due to Apache Iceberg’s:

  1. Hidden partitions
  2. Partition transformations
  3. Partition evolution

These allowed altering table partitions and testing which strategy works best without data rewrite.

Here are a few partitioning strategies that were tested:

  1. PARTITIONED BY (days(day), customer_id)
  2. PARTITIONED BY (days(day), hour(timestamp))
  3. PARTITIONED BY (days(day), bucket(N, customer_id))
  4. PARTITIONED BY (days(day))

Each partitioning strategy that was reviewed generated significantly different results both during writing as well as during query time. After careful results analysis, Cloudinary decided to partition the data by day and combine it with sorting, which allows them to sort data within partitions as would be elaborated in the compaction section.

Optimizing ingestion

Cloudinary receives billions of events in files from its providers in various formats and sizes and stores those on Amazon S3, resulting in terabytes of data processed and stored every day.

Because the data doesn’t come in a consistent manner and it’s not possible to predict the incoming rate and file size of the data, it was necessary to find a way of keeping cost down while maintaining high throughput.

This was achieved by using EventBridge to push each file received into Amazon SQS, where it was processed using Spark running on Amazon EMR in batches. This allowed processing the incoming data at high throughput and scale clusters according to queue size while keeping costs down.

Example of fetching 100 messages (files) from Amazon SQS with Spark:

var client = AmazonSQSClientBuilder.standard().withRegion("us-east-1").build()
var getMessageBatch: Iterable[Message] = DistributedSQSReceiver.client.receiveMessage(new ReceiveMessageRequest().withQueueUrl(queueUrl).withMaxNumberOfMessages(10)).getMessages.asScala
sparkSession.sparkContext.parallelize(10) .map(_ => getMessageBatch) .collect().flatMap(_.toList) .toList

When dealing with a high data ingestion rate for a specific partition prefix, Amazon S3 might potentially throttle requests and return a 503 status code (service unavailable). To address this scenario, Cloudinary used an Iceberg table property called write.object-storage.enabled, which incorporates a hash prefix into the stored Amazon S3 object path. This approach was deemed efficient and effectively mitigated Amazon S3 throttling problems.

Solving the small file problem and improving query performance

In modern data architectures, stream processing engines such as Amazon EMR are often used to ingest continuous streams of data into data lakes using Apache Iceberg. Streaming ingestion to Iceberg tables can suffer from two problems:

  • It generates many small files that lead to longer query planning, which in turn can impact read performance.
  • Poor data clustering, which can make file pruning less effective. This typically occurs in the streaming process when there is insufficient new data to generate optimal file sizes for reading, such as 512 MB.

Because partition is a key factor in the number of files produced and Cloudinary’s data is time based and most queries use a time filter, it was decided to address the optimization of our data lake in multiple ways.

First, Cloudinary set all the necessary configurations that helped reduce the number of files while appending data in the table by setting write.target-file-size-bytes, which allows defining the default target file size. Setting spark.sql.shuffle.partitions in Spark can reduce the number of output files by controlling the number of partitions used during shuffle operations, which affects how data is distributed across tasks, consequently minimizing the number of output files generated after transformations or aggregations.

Because the above approach only addressed the small file problem but didn’t eliminate it entirely, Cloudinary used another capability of Apache Iceberg that can compact data files in parallel using Spark with the rewriteDataFiles action. This action combines small files into larger files to reduce metadata overhead and minimize the amount of Amazon S3 GetObject API operation usage.

Here is where it can get complicated. When running compaction, Cloudinary needed to choose which strategy to apply out of the three that Apache Iceberg offers; each one having its own advantages and disadvantages:

  1. Binpack – simply rewrites smaller files to a target size
  2. Sort – data sorting based on different columns
  3. Z-order – a technique to colocate related data in the same set of files

At first, the Binpack compaction strategy was evaluated. This strategy works fastest and combines small files together to reach the target file size defined and after running it a significant improvement in query performance was observed.

As mentioned previously, data was partitioned by day and most queries ran on a specific time range. Because data comes from external vendors and sometimes arrives late, it was noticed that when running queries on compacted days, a lot of data was being scanned, because the specific time range could reside across many files. The query engine (Athena, Snowflake, and Trino with Amazon EMR) needed to scan the entire partition to fetch only the relevant rows.

To increase query performance even further, Cloudinary decided to change the compaction process to use sort, so now data is partitioned by day and sorted by requested_at (timestamp when the action occurred) and customer ID.

This strategy is costlier for compaction because it needs to shuffle the data in order to sort it. However, after adopting this sort strategy, two things were noticeable: the same queries that ran before now scanned around 50 percent less data, and query run time was improved by 30 percent to 50 percent.

Cost effectively maintaining Apache Iceberg tables

Maintaining Apache Iceberg tables is crucial for optimizing performance, reducing storage costs, and ensuring data integrity. Iceberg provides several maintenance operations to keep your tables in good shape. By incorporating these operations Cloudinary were able to cost-effectively manage their Iceberg tables.

Expire snapshots

Each write to an Iceberg table creates a new snapshot, or version, of a table. Snapshots can be used for time-travel queries, or the table can be rolled back to any valid snapshot.

Regularly expiring snapshots is recommended to delete data files that are no longer needed and to keep the size of table metadata small. Cloudinary decided to retain snapshots for up to 7 days to allow easier troubleshooting and handling of corrupted data which sometimes arrives from external sources and aren’t identified upon arrival. SparkActions.get().expireSnapshots(iceTable).expireOlderThan(TimeUnit.DAYS.toMillis(7)).execute()

Remove old metadata files

Iceberg keeps track of table metadata using JSON files. Each change to a table produces a new metadata file to provide atomicity.

Old metadata files are kept for history by default. Tables with frequent commits, like those written by streaming jobs, might need to regularly clean metadata files.

Configuring the following properties will make sure that only the latest ten metadata files are kept and anything older is deleted.

write.metadata.delete-after-commit.enabled=true 
write.metadata.previous-versions-max=10

Delete orphan files

In Spark and other distributed processing engines, when tasks or jobs fail, they might leave behind files that aren’t accounted for in the table metadata. Moreover, in certain instances, the standard snapshot expiration process might fail to identify files that are no longer necessary and not delete them.

Apache Iceberg offers a deleteOrphanFiles action that will take care of unreferenced files. This action might take a long time to complete if there are a large number of files in the data and metadata directories. A metadata or data file is considered orphan if it isn’t reachable by any valid snapshot. The set of actual files is built by listing the underlying storage using the Amazon S3 ListObjects operation, which makes this operation expensive. It’s recommended to run this operation periodically to avoid increased storage usage; however, too frequent runs can potentially offset this cost benefit.

A good example of how critical it is to run this procedure is to look at the following diagram, which shows how this procedure removed 112 TB of storage.

Rewriting manifest files

Apache Iceberg uses metadata in its manifest list and manifest files to speed up query planning and to prune unnecessary data files. Manifests in the metadata tree are automatically compacted in the order that they’re added, which makes queries faster when the write pattern aligns with read filters.

If a table’s write pattern doesn’t align with the query read filter pattern, metadata can be rewritten to re-group data files into manifests using rewriteManifests.

While Cloudinary already had a compaction process that optimized data files, they noticed that manifest files also required optimization. It turned out that in certain cases, Cloudinary reached over 300 manifest files—which were small, often under 8Mb in size—and due to late arriving data, manifest files were pointing to data in different partitions. This caused query planning to run for 12 seconds for each query.

Cloudinary initiated a separate scheduled process of rewriteManifests, and after it ran, the number of manifest files was reduced to approximately 170 files and as a result of more alignment between manifests and query filters (based on partitions), query planning was improved by three times to approximately 4 seconds.

Choosing the right query engine

As part of Cloudinary exploration aimed at testing various query engines, they initially outlined several key performance indicators (KPIs) to guide their search, including support for Apache Iceberg alongside integration with existing data sources such as MySQL and Snowflake, the availability of a web interface for effortless one-time queries, and cost optimization. In line with these criteria, they opted to evaluate various solutions including Trino on Amazon EMR, Athena, and Snowflake with Apache Iceberg support (at that time it was available as a Private Preview). This approach allowed for the assessment of each solution against defined KPIs, facilitating a comprehensive understanding of their capabilities and suitability for Cloudinary’s requirements.

Two of the more quantifiable KPIs that Cloudinary was planning to evaluate were cost and performance. Cloudinary realized early in the process that different queries and usage types can potentially benefit from different runtime engines. They decided to focus on four runtime engines.

Engine Details
Snowflake native XL data warehouse on top of data stored within Snowflake
Snowflake with Apache Iceberg support XL data warehouse on top of data stored in S3 in Apache Iceberg tables
Athena On-demand mode
Amazon EMR Trino Opensource Trino on top of eight nodes (m6g.12xl) cluster

The test included four types of queries that represent different production workloads that Cloudinary is running. They’re ordered by size and complexity from the simplest one to the most heavy and complex.

Query Description Data scanned Returned results set
Q1 Multi-day aggregation on a single tenant Single digit GBs <10 rows
Q2 Single-day aggregation by tenant across multiple tenant Dozens of GBs 100 thousand rows
Q3 Multi-day aggregation across multiple tenants Hundreds of GBs <10 rows
Q4 Heavy series of aggregations and transformations on a multi-tenant dataset to derive access metrics Single digit TBs >1 billion rows

The following graphs show the cost and performance of the four engines across the different queries. To avoid chart scaling issues, all costs and query durations were normalized based on Trino running on Amazon EMR. Cloudinary considered Query 4 to be less suitable for Athena because it involved processing and transforming extremely large volumes of complex data.

Some important aspects to consider are:

  • Cost for EMR running Trino was derived based on query duration only, without considering cluster set up, which on average launches in just under 5 minutes.
  • Cost for Snowflake (both options) was derived based on query duration only, without considering cold start (more than 10 seconds on average) and a Snowflake warehouse minimum charge of 1 minute.
  • Cost for Athena was based on the amount of data scanned; Athena doesn’t require cluster set up and the query queue time is less than 1 second.
  • All costs are based on list on-demand (OD) prices.
  • Snowflake prices are based on Standard edition.

The above chart shows that, from a cost perspective, Amazon EMR running Trino on top of Apache Iceberg tables was superior to other engines, in certain cases up to ten times less expensive. However, Amazon EMR setup requires additional expertise and skills compared to the no-code, no infrastructure management offered by Snowflake and Athena.

In terms of query duration, it’s noticeable that there’s no clear engine of choice for all types of queries. In fact, Amazon EMR, which was the most cost-effective option, was only fastest in two out of the four query types. Another interesting point is that Snowflake’s performance on top of Apache Iceberg is almost on-par with data stored within Snowflake, which adds another great option for querying their Apache Iceberg data-lake. The following table shows the cost and time for each query and product.

. Amazon EMR Trino Snowflake (XL) Snowflake (XL) Iceberg Athena
Query1 $0.01
5 seconds
$0.08
8 seconds
$0.07
8 seconds
$0.02
11 seconds
Query2 $0.12
107 seconds
$0.25
28 seconds
$0.35
39 seconds
$0.18
94 seconds
Query3 $0.17
147 seconds
$1.07
120 seconds
$1.88
211 seconds
$1.22
26 seconds
Query4 $6.43
1,237 seconds
$11.73
1,324 seconds
$12.71
1,430 seconds
N/A

Benchmarking conclusions

While every solution presents its own set of advantages and drawbacks—whether in terms of pricing, scalability, optimizing for Apache Iceberg, or the contrast between open source versus closed source—the beauty lies in not being constrained to a single choice. Embracing Apache Iceberg frees you from relying solely on a single solution. In certain scenarios where queries must be run frequently while scanning up to hundreds of gigabytes of data with an aim to evade warm-up periods and keep costs down, Athena emerged as the best choice. Conversely, when tackling hefty aggregations that demanded significant memory allocation while being mindful of cost, the preference leaned towards using Trino on Amazon EMR. Amazon EMR was significantly more cost efficient when running longer queries, because boot time cost could be discarded. Snowflake stood out as a great option when queries could be joined with other tables already residing within Snowflake. This flexibility allowed harnessing the strengths of each service, strategically applying them to suit the specific needs of various tasks without being confined to a singular solution.

In essence, the true power lies in the ability to tailor solutions to diverse requirements, using the strengths of different environments to optimize performance, cost, and efficiency.

Conclusion

Data lakes built on Amazon S3 and analytics services such as Amazon EMR and Amazon Athena, along with the open source Apache Iceberg framework, provide a scalable, cost-effective foundation for modern data architectures. It enables organizations to quickly construct robust, high-performance data lakes that support ACID transactions and analytics workloads. This combination is the most refined way to have an enterprise-grade open data environment. The availability of managed services and open source software helps companies to implement data lakes that meet their needs.

Since building a data lake solution on top of Apache Iceberg, Cloudinary has seen major enhancements. The data lake infrastructure enables Cloudinary to extend their data retention by six times while lowering the cost of storage by over 25 percent. Furthermore, query costs dropped by more than 25–40 percent thanks to the efficient querying capabilities of Apache Iceberg and the query optimizations provided in the Athena version 3, which is now based on Trino as its engine. The ability to retain data for longer as well as providing it to various stakeholders while reducing cost is a key component in allowing Cloudinary to be more data driven in their operation and decision-making processes.

Using a transactional data lake architecture that uses Amazon S3, Apache Iceberg, and AWS Analytics services can greatly enhance an organization’s data infrastructure. This allows for sophisticated analytics and machine learning, fueling innovation while keeping costs down and allowing the use of a plethora of tools and services without limits.


About the Authors

Yonatan Dolan is a Principal Analytics Specialist at Amazon Web Services. He is located in Israel and helps customers harness AWS analytical services to leverage data, gain insights, and derive value. Yonatan is an Apache Iceberg evangelist.

Amit Gilad is a Senior Data Engineer on the Data Infrastructure team at Cloudinar. He is currently leading the strategic transition from traditional data warehouses to a modern data lakehouse architecture, utilizing Apache Iceberg to enhance scalability and flexibility.

Alex Dickman is a Staff Data Engineer on the Data Infrastructure team at Cloudinary. He focuses on engaging with various internal teams to consolidate the team’s data infrastructure and create new opportunities for data applications, ensuring robust and scalable data solutions for Cloudinary’s diverse requirements.

Itay Takersman is a Senior Data Engineer at Cloudinary data infrastructure team. Focused on building resilient data flows and aggregation pipelines to support Cloudinary’s data requirements.

Modernize your data observability with Amazon OpenSearch Service zero-ETL integration with Amazon S3

Post Syndicated from Joshua Bright original https://aws.amazon.com/blogs/big-data/modernize-your-data-observability-with-amazon-opensearch-service-zero-etl-integration-with-amazon-s3/

We are excited to announce the general availability of Amazon OpenSearch Service zero-ETL integration with Amazon Simple Storage Service (Amazon S3) for domains running 2.13 and above. The integration is new way for customers to query operational logs in Amazon S3 and Amazon S3-based data lakes without needing to switch between tools to analyze operational data. By querying across OpenSearch Service and S3 datasets, you can evaluate multiple data sources to perform forensic analysis of operational and security events. The new integration with OpenSearch Service supports AWS’s zero-ETL vision to reduce the operational complexity of duplicating data or managing multiple analytics tools by enabling you to directly query your operational data, reducing costs and time to action.

OpenSearch is an open source, distributed search and analytics suite derived from Elasticsearch 7.10. OpenSearch Service currently has tens of thousands of active customers with hundreds of thousands of clusters under management processing hundreds of trillions of requests per month.

Amazon S3 is an object storage service offering industry-leading scalability, data availability, security, and performance. Organizations of all sizes and industries can store and protect any amount of data for virtually any use case, such as data lakes, cloud-centered applications, and mobile apps. With cost-effective storage classes and user-friendly management features, you can optimize costs, organize data, and configure fine-tuned access controls to meet specific business, organizational, and compliance requirements. Let’s dig into this exciting new feature for OpenSearch Service.

Benefits of using OpenSearch Service zero-ETL integration with Amazon S3

OpenSearch Service zero-ETL integration with Amazon S3 allows you to use the rich analytics capabilities of OpenSearch Service SQL and PPL directly on infrequently queried data stored outside of OpenSearch Service in Amazon S3. It also integrates with other OpenSearch integrations so you can install prepackaged queries and visualizations to analyze your data, making it straightforward to quickly get started.

The following diagram illustrates how OpenSearch Service unlocks value stored in infrequently queried logs from popular AWS log types.

You can use OpenSearch Service direct queries to query data in Amazon S3. OpenSearch Service provides a direct query integration with Amazon S3 as a way to analyze operational logs in Amazon S3 and data lakes based in Amazon S3 without having to switch between services. You can now analyze data in cloud object stores and simultaneously use the operational analytics and visualizations of OpenSearch Service.

Many customers currently use Amazon S3 to store event data for their solutions. For operational analytics, Amazon S3 is typically used as a destination for VPC Flow Logs, Amazon S3 Access Logs, AWS Load Balancer Logs, and other event sources from AWS services. Customers also store data directly from application events in Amazon S3 for compliance and auditing needs. The durability and scalability of Amazon S3 makes it an obvious data destination for many customers that want a longer-term storage or archival option at a cost-effective price point.

Bringing data from these sources into OpenSearch Service stored in hot and warm storage tiers may be prohibitive due to the size and volume of the events being generated. For some of these event sources that are stored into OpenSearch Service indexes, the volume of queries run against the data doesn’t justify the cost to continue to store them in their cluster. Previously, you would pick and choose which event sources you brought in for ingestion into OpenSearch Service based on the storage provisioned in your cluster. Access to other data meant using different tools such as Amazon Athena to view the data on Amazon S3.

For a real-world example, let’s see how using the new integration benefited Arcesium.

“Arcesium provides advanced cloud-native data, operations, and analytics capabilities for the financial services industry. Our software platform processes many millions of transactions a day, emitting large volumes of log and audit records along the way. The volume of log data we needed to process, store, and analyze was growing exponentially given our retention and compliance needs. Amazon OpenSearch Service’s new zero-ETL integration with Amazon S3 is helping our business scale by allowing us to analyze infrequently queried logs already stored in Amazon S3 instead of incurring the operational expense of maintaining large and costly online OpenSearch clusters or building ad hoc ingestion pipelines.”

– Kyle George, SVP & Global Head of Infrastructure at Arcesium.

With direct queries with Amazon S3, you no longer need to build complex extract, transform, and load (ETL) pipelines or incur the expense of duplicating data in both OpenSearch Service and Amazon S3 storage.

Fundamental concepts

After configuring a direct query connection, you’ll need to create tables in the AWS Glue Data Catalog using the OpenSearch Service Query Workbench. The direct query connection relies on the metadata in Glue Data Catalog tables to query data stored in Amazon S3. Note that tables created by AWS Glue crawlers or Athena are not currently supported.

By combining the structure of Data Catalog tables, SQL indexing techniques, and OpenSearch Service indexes, you can accelerate query performance, unlock advanced analytics capabilities, and contain querying costs. Below are a few examples of how you can accelerate your data:

  • Skipping indexes – You ingest and index only the metadata of the data stored in Amazon S3. When you query a table with a skipping index, the query planner references the index and rewrites the query to efficiently locate the data, instead of scanning all partitions and files. This allows the skipping index to quickly narrow down the specific location of the stored data that’s relevant to your analysis.
  • Materialized views – With materialized views, you can use complex queries, such as aggregations, to power dashboard visualizations. Materialized views ingest a small amount of your data into OpenSearch Service storage.
  • Covering indexes – With a covering index, you can ingest data from a specified column in a table. This is the most performant of the three indexing types. Because OpenSearch Service ingests all data from your desired column, you get better performance and can perform advanced analytics. OpenSearch Service creates a new index from the covering index data. You can use this new index for dashboard visualizations and other OpenSearch Service functionality, such as anomaly detection or geospatial capabilities.

As new data comes in to your S3 bucket, you can configure a refresh interval for your materialized views and covering indexes to provide local access to the most current data on Amazon S3.

Solution overview

Let’s take a test drive using VPC Flow Logs as your source! As mentioned before, many AWS services emit logs to Amazon S3. VPC Flow Logs is a feature of Amazon Virtual Private Cloud (Amazon VPC) that enables you to capture information about the IP traffic going to and from network interfaces in your VPC. For this walkthrough, you perform the following steps:

  1. Create an S3 bucket if you don’t already have one available.
  2. Enable VPC Flow Logs using an existing VPC that can generate traffic and store the logs as Parquet on Amazon S3.
  3. Verify the logs exist in your S3 bucket.
  4. Set up a direct query connection to the Data Catalog and the S3 bucket that has your data.
  5. Install the integration for VPC Flow Logs.

Create an S3 bucket

If you have an existing S3 bucket, you can reuse that bucket by creating a new folder inside of the bucket. If you need to create a bucket, navigate to the Amazon S3 console and create an Amazon S3 bucket with a name that is suitable for your organization.

Enable VPC Flow Logs

Complete the following steps to enable VPC Flow Logs:

  1. On the Amazon VPC console, choose a VPC that has application traffic that can generate logs.
  2. On the Flow Logs tab, choose Create flow log.
  3. For Filter, choose ALL.
  4. Set Maximum aggregation interval to 1 minute.
  5. For Destination, choose Send to an Amazon S3 bucket and provide the S3 bucket ARN from the bucket you created earlier.
  6. For Log record format, choose Custom format and select Standard attributes.

For this post, we don’t select any of the Amazon Elastic Container Service (Amazon ECS) attributes because they’re not implemented with OpenSearch integrations as of this writing.

  1. For Log file format, choose Parquet.
  2. For Hive-compatible S3 prefix, choose Enable.
  3. Set Partition logs by time to every 1 hour (60 minutes).

Validate you are receiving logs in your S3 bucket

Navigate to the S3 bucket you created earlier to see that data is streaming into your S3 bucket. If you drill down and navigate the directory structure, you find that the logs are delivered in an hourly folder and emitted every minute.

Now that you have VPC Flow Logs flowing into an S3 bucket, you need to set up a connection between your data on Amazon S3 and your OpenSearch Service domain.

Set up a direct query data source

In this step, you create a direct query data source which uses Glue Data Catalog tables and your Amazon S3 data. The action creates all the necessary infrastructure to give you access to the Hive metastore (databases and tables in Glue Data Catalog and the data housed in Amazon S3 for the bucket and folder combination you want the data source to have access to. It will also wire in all the appropriate permissions with the Security plugin’s fine-grained access control so you don’t have to worry about permissions to get started.

Complete the following steps to set up your direct query data source:

  1. On the OpenSearch Service domain, choose Domains in the navigation pane.
  2. Choose your domain.
  3. On the Connections tab, choose Create new connection.
  4. For Name, enter a name without dashes, such as zero_etl_walkthrough.
  5. For Description, enter a descriptive name.
  6. For Data source type, choose Amazon S3 with AWS Glue Data Catalog.
  7. For IAM role, if this is your first time, let the direct query setup take care of the permissions by choosing Create a new role. You can edit it later based on your organization’s compliance and security needs. For this post, we name the role zero_etl_walkthrough.
  8. For S3 buckets, use the one you created.
  9. Do not select the check box to grant access to all new and existing buckets.
  10. For Checkpoint S3 bucket, use the same bucket you created. The checkpoint folders get created for you automatically.
  11. For AWS Glue tables, because you don’t have anything that you have created in the Data Catalog, enable Grant access to all existing and new tables.

The VPC Flow Logs OpenSearch integration will create resources in the Data Catalog, and you will need access to pick those resources up.

  1. Choose Create.

Now that the initial setup is complete, you can install the OpenSearch integration for VPC Flow Logs.

Install the OpenSearch integration for VPC Flow Logs

The integrations plugin contains a wide variety of prebuilt dashboards, visualizations, mapping templates, and other resources that make visualizing and working with data generated by your sources simpler. The integration for Amazon VPC installs a variety of resources to view your VPC Flow Logs data as it sits in Amazon S3.

In this section, we show you how to make sure you have the most up-to-date integration packages for installation. We then show you how to install the OpenSearch integration. In most cases, you will have the latest integrations such as VPC Flow Logs, NGINX, HA Proxy, or Amazon S3 (access logs) at the time of the release of a minor or major version. However, OpenSearch is an open source community-led project, and you can expect that there will be version changes and new integrations not yet included with your current deployment.

Verify the latest version of the OpenSearch integration for Amazon VPC

You may have upgraded from earlier versions of OpenSearch Service to OpenSearch Service version 2.13. Let’s confirm that your deployment matches what is present in this post.

On OpenSearch Dashboards, navigate to the Integrations tab and choose Amazon VPC. You will see a release version for the integration.

Confirm that you have version 1.1.0 or higher. If your deployment doesn’t have it, you can install the latest version of the integration from the OpenSearch catalog. Complete the following steps:

  1. Navigate to the OpenSearch catalog.
  2. Choose Amazon VPC Flow Logs.
  3. Download the 1.1.0 Amazon VPC Integration file from the repository folder labeled amazon_vpc_flow_1.1.0.
  4. In the OpenSearch Dashboard’s Dashboard Management plugin, choose Saved objects.
  5. Choose Import and browse your local folders.
  6. Import the downloaded file.

The file contains all the necessary objects to create an integration. After it’s installed, you can proceed to the steps to set up the Amazon VPC OpenSearch integration.

Set up the OpenSearch integration for Amazon VPC

Let’s jump in and install the integration:

  1. In OpenSearch Dashboards, navigate to the Integrations tab.
  2. Choose the Amazon VPC integration.
  3. Confirm the version is 1.1.0 or higher and choose Set Up.
  4. For Display Name, keep the default.
  5. For Connection Type, choose S3 Connection.
  6. For Data Source, choose the direct query connection alias you created in prior steps. In this post, we use zero_etl_walkthrough.
  7. For Spark Table Name, keep the prepopulated value of amazon_vpc_flow.
  8. For S3 Data Location, enter the S3 URI of your log folder created by VPC Flow Logs set up in the prior steps. In this post, we use s3://zero-etl-walkthrough/AWSLogs/.

S3 bucket names are globally unique, and you may want to consider using bucket names that conform to your company’s compliance guidance. UUIDs plus a descriptive name are good options to guarantee uniqueness.

  1. For S3 Checkpoint Location, enter the S3 URI of your checkpoint folder which you define. Checkpoints store metadata for the direct query feature. Make sure you pick any empty or unused path in the bucket you choose. In this post, we use s3://zero-etl-walkthrough/CP/, which is in the same bucket we created earlier.
  2. Select Queries (recommended) and Dashboards and Visualizations for Flint Integrations using live queries.

You get a message that states “Setting Up the Integration – this can take several minutes.” This particular integration sets up skipping indexes and materialized views on top of your data in Amazon S3. The materialized view aggregates the data into a backing index that occupies a significantly smaller data footprint in your cluster compared to ingesting all the data and building visualizations on top of it.

When the Amazon VPC integration installation is complete, you have a broad variety of assets to play with. If you navigate to the installed integrations, you will find queries, visualizations, and other assets that can help you jumpstart your data exploration using data sitting on Amazon S3. Let’s look at the dashboard that gets installed for this integration.

I love it! How much does it cost?

With OpenSearch Service direct queries, you only pay for the resources consumed by your workload. OpenSearch Service charges for only the compute needed to query your external data as well as maintain optional indexes in OpenSearch Service. The compute capacity is measured in OpenSearch Compute Units (OCUs). If no queries or indexing activities are active, no OCUs are consumed. The following table contains sample compute prices based on searching HTTP logs in IAD.

Data scanned per query (GB) OCU price per query (USD)
1-10 $0.026
100 $0.24
1000 $1.35

Because the price is based on the OCUs used per query, this solution is tailored for infrequently queried data. If your users query data often, it makes more sense to fully ingest into OpenSearch Service and take advantage of storage optimization techniques such as using OR1 instances or UltraWarm.

OCUs consumed by zero-ETL integrations will be populated in AWS Cost Explorer. This will be at the account level. You can account for OCU usage at the account level and set thresholds and alerts when thresholds have been crossed. The format of the usage type to filter on in Cost Explorer is RegionCode-DirectQueryOCU (OCU-hours). You can create a budget using AWS Budgets and configure an alert to be notified when DirectQueryOCU (OCU-Hours) usage meets the threshold you set. You can also optionally use an Amazon Simple Notification Service (Amazon SNS) topic with an AWS Lambda function as a target to turn off a data source when a threshold criterion is met.

Summary

Now that you have a high-level understanding of the direct query connection feature, OpenSearch integrations, and how the OpenSearch Service zero-ETL integration with Amazon S3 works, you should consider using the feature as part of your organization’s toolset. With OpenSearch Service zero-ETL integration with Amazon S3, you now have a new tool for event analysis. You can bring hot data into OpenSearch Service for near real-time analysis and alerting. For the infrequently queried, larger data, mainly used for post-event analysis and correlation, you can query that data on Amazon S3 without moving the data. The data stays in Amazon S3 for cost-effective storage, and you access that data as needed without building additional infrastructure to move the data into OpenSearch Service for analysis.

For more information, refer to Working with Amazon OpenSearch Service direct queries with Amazon S3.


About the authors

Joshua Bright is a Senior Product Manager at Amazon Web Services. Joshua leads data lake integration initiatives within the OpenSearch Service team. Outside of work, Joshua enjoys listening to birds while walking in nature.

Kevin Fallis is an Principal Specialist Search Solutions Architect at Amazon Web Services. His passion is to help customers leverage the correct mix of AWS services to achieve success for their business goals. His after-work activities include family, DIY projects, carpentry, playing drums, and all things music.


Sam Selvan
is a Principal Specialist Solution Architect with Amazon OpenSearch Service.

Integrate Tableau and Okta with Amazon Redshift using AWS IAM Identity Center

Post Syndicated from Debu Panda original https://aws.amazon.com/blogs/big-data/integrate-tableau-and-okta-with-amazon-redshift-using-aws-iam-identity-center/

This blog post is co-written with Sid Wray and Jake Koskela from Salesforce, and Adiascar Cisneros from Tableau. 

Amazon Redshift is a fast, scalable cloud data warehouse built to serve workloads at any scale. With Amazon Redshift as your data warehouse, you can run complex queries using sophisticated query optimization to quickly deliver results to Tableau, which offers a comprehensive set of capabilities and connectivity options for analysts to efficiently prepare, discover, and share insights across the enterprise. For customers who want to integrate Amazon Redshift with Tableau using single sign-on capabilities, we introduced AWS IAM Identity Center integration to seamlessly implement authentication and authorization.

IAM Identity Center provides capabilities to manage single sign-on access to AWS accounts and applications from a single location. Redshift now integrates with IAM Identity Center, and supports trusted identity propagation, making it possible to integrate with third-party identity providers (IdP) such as Microsoft Entra ID (Azure AD), Okta, Ping, and OneLogin. This integration positions Amazon Redshift as an IAM Identity Center-managed application, enabling you to use database role-based access control on your data warehouse for enhanced security. Role-based access control allows you to apply fine grained access control using row level, column level, and dynamic data masking in your data warehouse.

AWS and Tableau have collaborated to enable single sign-on support for accessing Amazon Redshift from Tableau. Tableau now supports single sign-on capabilities with Amazon Redshift connector to simplify the authentication and authorization. The Tableau Desktop 2024.1 and Tableau Server 2023.3.4 releases support trusted identity propagation with IAM Identity Center. This allows users to seamlessly access Amazon Redshift data within Tableau using their external IdP credentials without needing to specify AWS Identity and Access Management (IAM) roles in Tableau. This single sign-on integration is available for Tableau Desktop, Tableau Server, and Tableau Prep.

In this post, we outline a comprehensive guide for setting up single sign-on to Amazon Redshift using integration with IAM Identity Center and Okta as the IdP. By following this guide, you’ll learn how to enable seamless single sign-on authentication to Amazon Redshift data sources directly from within Tableau Desktop, streamlining your analytics workflows and enhancing security.

Solution overview

The following diagram illustrates the architecture of the Tableau SSO integration with Amazon RedShift, IAM Identity Center, and Okta.

Figure 1: Solution overview for Tableau integration with Amazon Redshift using IAM Identity Center and Okta

The solution depicted in Figure 1 includes the following steps:

  1. The user configures Tableau to access Redshift using IAM Identity Center authentication
  2. On a user sign-in attempt, Tableau initiates a browser-based OAuth flow and redirects the user to the Okta login page to enter the login credentials.
  3. On successful authentication, Okta issues an authentication token (id and access token) to Tableau
  4. Redshift driver then makes a call to Redshift-enabled IAM Identity Center application and forwards the access token.
  5. Redshift passes the token to Identity Center and requests an access token.
  6. Identity Center verifies/validates the token using the OIDC discovery connection to the trusted token issuer and returns an Identity Center generated access token for the same user. In Figure 1, Trusted Token Issuer (TTI) is the Okta server that Identity Center trusts to provide tokens that third-party applications like Tableau uses to call AWS services.
  7. Redshift then uses the token to obtain the user and group membership information from IAM Identity Center.
  8. Tableau user will be able to connect with Amazon Redshift and access data based on the user and group membership returned from IAM Identity Center.

Prerequisites

Before you begin implementing the solution, make sure that you have the following in place:

Walkthrough

In this walkthrough, you build the solution with following steps:

  • Set up the Okta OIDC application
  • Set up the Okta authorization server
  • Set up the Okta claims
  • Setup the Okta access policies and rules
  • Setup trusted token issuer in AWS IAM Identity Center
  • Setup client connections and trusted token issuers
  • Setup the Tableau OAuth config files for Okta
  • Install the Tableau OAuth config file for Tableau Desktop
  • Setup the Tableau OAuth config file for Tableau Server or Tableau Cloud
  • Federate to Amazon Redshift from Tableau Desktop
  • Federate to Amazon Redshift from Tableau Server

Set up the Okta OIDC application

To create an OIDC web app in Okta, you can follow the instructions in this video, or use the following steps to create the wep app in Okta admin console:

Note: The Tableau Desktop redirect URLs should always use localhost. The examples below also use localhost for the Tableau Server hostname for ease of testing in a test environment. For this setup, you should also access the server at localhost in the browser. If you decide to use localhost for early testing, you will also need to configure the gateway to accept localhost using this tsm command:

 tsm configuration set -k gateway.public.host -v localhost

In a production environment, or Tableau Cloud, you should use the full hostname that your users will access Tableau on the web, along with https. If you already have an environment with https configured, you may skip the localhost configuration and use the full hostname from the start.

  1. Sign in to your Okta organization as a user with administrative privileges.
  2. On the admin console, under Applications in the navigation pane, choose Applications.
  3. Choose Create App Integration.
  4. Select OIDC – OpenID Connect as the Sign-in method and Web Application as the Application type.
  5. Choose Next.
  6. In General Settings:
    1. App integration name: Enter a name for your app integration. For example, Tableau_Redshift_App.
    2. Grant type: Select Authorization Code and Refresh Token.
    3. Sign-in redirect URIs: The sign-in redirect URI is where Okta sends the authentication response and ID token for the sign-in request. The URIs must be absolute URIs. Choose Add URl and along with the default URl, add the following URIs.
      • http://localhost:55556/Callback
      • http://localhost:55557/Callback
      • http://localhost:55558/Callback
      • http://localhost/auth/add_oauth_token
    4. Sign-out redirect URIs: keep the default value as http://localhost:8080.
    5. Skip the Trusted Origins section and for Assignments, select Skip group assignment for now.
    6. Choose Save.
Figure 2: OIDC application

Figure 2: OIDC application

  1. In the General Settings section, choose Edit and select Require PKCE as additional verification under Proof Key for Code Exchange (PKCE). This option indicates if a PKCE code challenge is required to verify client requests.
  2. Choose Save.
Figure 3: OIDC App Overview

Figure 3: OIDC App Overview

  1. Select the Assignments tab and then choose Assign to Groups. In this example, we’re assigning awssso-finance and awssso-sales.
  2. Choose Done.

Figure 4: OIDC application group assignments

For more information on creating an OIDC app, see Create OIDC app integrations.

Set up the Okta authorization server

Okta allows you to create multiple custom authorization servers that you can use to protect your own resource servers. Within each authorization server you can define your own OAuth 2.0 scopes, claims, and access policies. If you have an Okta Developer Edition account, you already have a custom authorization server created for you called default.

For this blog post, we use the default custom authorization server. If your application has requirements such as requiring more scopes, customizing rules for when to grant scopes, or you need more authorization servers with different scopes and claims, then you can follow this guide.

Figure 5: Authorization server

Set up the Okta claims

Tokens contain claims that are statements about the subject (for example: name, role, or email address). For this example, we use the default custom claim sub. Follow this guide to create claims.

Figure 6: Create claims

Setup the Okta access policies and rules

Access policies are containers for rules. Each access policy applies to a particular OpenID Connect application. The rules that the policy contains define different access and refresh token lifetimes depending on the nature of the token request. In this example, you create a simple policy for all clients as shown in Figure 7 that follows. Follow this guide to create access policies and rules.

Figure 7: Create access policies

Rules for access policies define token lifetimes for a given combination of grant type, user, and scope. They’re evaluated in priority order and after a matching rule is found, no other rules are evaluated. If no matching rule is found, then the authorization request fails. This example uses the role depicted in Figure 8 that follows. Follow this guide to create rules for your use case.

Figure 8: Access policy rules

Setup trusted token issuer in AWS IAM Identity Center

At this point, you switch to setting up the AWS configuration, starting by adding a trusted token issuer (TTI), which makes it possible to exchange tokens. This involves connecting IAM Identity Center to the Open ID Connect (OIDC) discovery URL of the external OAuth authorization server and defining an attribute-based mapping between the user from the external OAuth authorization server and a corresponding user in Identity Center. In this step, you create a TTI in the centralized management account. To create a TTI:

  1. Open the AWS Management Console and navigate to IAM Identity Center, and then to the Settings page.
  2. Select the Authentication tab and under Trusted token issuers, choose Create trusted token issuer.
  3. On the Set up an external IdP to issue trusted tokens page, under Trusted token issuer details, do the following:
    • For Issuer URL, enter the OIDC discovery URL of the external IdP that will issue tokens for trusted identity propagation. The administrator of the external IdP can provide this URL (for example, https://prod-1234567.okta.com/oauth2/default).

To get the issuer URL from Okta, sign in as an admin to Okta and navigate to Security and then to API and choose default under the Authorization Servers tab and copy the Issuer URL

Figure 9: Authorization server issuer

  1. For Trusted token issuer name, enter a name to identify this trusted token issuer in IAM Identity Center and in the application console.
  2. Under Map attributes, do the following:
    • For Identity provider attribute, select an attribute from the list to map to an attribute in the IAM Identity Center identity store.
    • For IAM Identity Center attribute, select the corresponding attribute for the attribute mapping.
  3. Under Tags (optional), choose Add new tag, enter a value for Key and optionally for Value. Choose Create trusted token issuer. For information about tags, see Tagging AWS IAM Identity Center resources.

This example uses Subject (sub) as the Identity provider attribute to map with Email from the IAM identity Center attribute. Figure 10 that follows shows the set up for TTI.

Figure 10: Create Trusted Token Issuer

Setup client connections and trusted token issuers

In this step, the Amazon Redshift applications that exchange externally generated tokens must be configured to use the TTI you created in the previous step. Also, the audience claim (or aud claim) from Okta must be specified. In this example, you are configuring the Amazon Redshift application in the member account where the Amazon Redshift cluster or serverless instance exists.

  1. Select IAM Identity Center connection from Amazon Redshift console menu.

Figure 11: Amazon Redshift IAM Identity Center connection

  1. Select the Amazon Redshift application that you created as part of the prerequisites.
  2. Select the Client connections tab and choose Edit.
  3. Choose Yes under Configure client connections that use third-party IdPs.
  4. Select the checkbox for Trusted token issuer which you have created in the previous section.
  5. Enter the aud claim value under section Configure selected trusted token issuers. For example, okta_tableau_audience.

To get the audience value from Okta, sign in as an admin to Okta and navigate to Security and then to API and choose default under the Authorization Servers tab and copy the Audience value.

Figure 12: Authorization server audience

Note: The audience claim value must exactly match with IdP audience value otherwise your OIDC connection with third part application like Tableau will fail.

  1. Choose Save.

Figure 13: Adding Audience Claim for Trusted Token Issuer

Setup the Tableau OAuth config files for Okta

At this point, your IAM Identity Center, Amazon Redshift, and Okta configuration are complete. Next, you need to configure Tableau.

To integrate Tableau with Amazon Redshift using IAM Identity Center, you need to use a custom XML. In this step, you use the following XML and replace the values starting with the $ sign and highlighted in bold. The rest of the values can be kept as they are, or you can modify them based on your use case. For detailed information on each of the elements in the XML file, see the Tableau documentation on GitHub.

Note: The XML file will be used for all the Tableau products including Tableau Desktop, Server, and Cloud.

<?xml version="1.0" encoding="utf-8"?>
<pluginOAuthConfig>
<dbclass>redshift</dbclass>
<oauthConfigId>custom_redshift_okta</oauthConfigId>
<clientIdDesktop>$copy_client_id_from_okta_oidc_app</clientIdDesktop>
<clientSecretDesktop>$copy_client_secret_from_okta_oidc_app</clientSecretDesktop>
<redirectUrisDesktop>http://localhost:55556/Callback</redirectUrisDesktop>
<redirectUrisDesktop>http://localhost:55557/Callback</redirectUrisDesktop>
<redirectUrisDesktop>http://localhost:55558/Callback</redirectUrisDesktop>
<authUri>https://$copy_okta_host_value.okta.com/oauth2/default/v1/authorize</authUri>
<tokenUri>https://$copy_okta_host_value.okta.com/oauth2/default/v1/token</tokenUri>
<scopes>openid</scopes>
<scopes>email</scopes>
<scopes>profile</scopes>
<scopes>offline_access</scopes>
<capabilities>
<entry>
<key>OAUTH_CAP_FIXED_PORT_IN_CALLBACK_URL</key>
<value>true</value>
</entry>
<entry>
<key>OAUTH_CAP_PKCE_REQUIRES_CODE_CHALLENGE_METHOD</key>
<value>true</value>
</entry>
<entry>
<key>OAUTH_CAP_REQUIRE_PKCE</key>
<value>true</value>
</entry>
<entry>
<key>OAUTH_CAP_SUPPORTS_STATE</key>
<value>true</value>
</entry>
<entry>
<key>OAUTH_CAP_CLIENT_SECRET_IN_URL_QUERY_PARAM</key>
<value>true</value>
</entry>
<entry>
<key>OAUTH_CAP_SUPPORTS_GET_USERINFO_FROM_ID_TOKEN</key>
<value>true</value>
</entry>
</capabilities>
<accessTokenResponseMaps>
<entry>
<key>ACCESSTOKEN</key>
<value>access_token</value>
</entry>
<entry>
<key>REFRESHTOKEN</key>
<value>refresh_token</value>
</entry>
<entry>
<key>id-token</key>
<value>id_token</value>
</entry>
<entry>
<key>access-token-issue-time</key>
<value>issued_at</value>
</entry>
<entry>
<key>access-token-expires-in</key>
<value>expires_in</value>
</entry>
<entry>
<key>username</key>
<value>preferred_username</value>
</entry>
</accessTokenResponseMaps>
</pluginOAuthConfig>

The following is an example XML file:

<?xml version="1.0" encoding="utf-8"?>
<pluginOAuthConfig>
<dbclass>redshift</dbclass>
<oauthConfigId>custom_redshift_okta</oauthConfigId>
<clientIdDesktop>ab12345z-a5nvb-123b-123b-1c434ghi1234</clientIdDesktop>
<clientSecretDesktop>3243jkbkjb~~ewf.112121.3432423432.asd834k</clientSecretDesktop>
<redirectUrisDesktop>http://localhost:55556/Callback</redirectUrisDesktop>
<redirectUrisDesktop>http://localhost:55557/Callback</redirectUrisDesktop>
<redirectUrisDesktop>http://localhost:55558/Callback</redirectUrisDesktop>
<authUri>https://prod-1234567.okta.com/oauth2/default/v1/authorize</authUri>
<tokenUri>https://prod-1234567.okta.com/oauth2/default/v1/token</tokenUri>
<scopes>openid</scopes>
<scopes>email</scopes>
<scopes>profile</scopes>
<scopes>offline_access</scopes>
<capabilities>
<entry>
<key>OAUTH_CAP_FIXED_PORT_IN_CALLBACK_URL</key>
<value>true</value>
</entry>
<entry>
<key>OAUTH_CAP_PKCE_REQUIRES_CODE_CHALLENGE_METHOD</key>
<value>true</value>
</entry>
<entry>
<key>OAUTH_CAP_REQUIRE_PKCE</key>
<value>true</value>
</entry>
<entry>
<key>OAUTH_CAP_SUPPORTS_STATE</key>
<value>true</value>
</entry>
<entry>
<key>OAUTH_CAP_CLIENT_SECRET_IN_URL_QUERY_PARAM</key>
<value>true</value>
</entry>
<entry>
<key>OAUTH_CAP_SUPPORTS_GET_USERINFO_FROM_ID_TOKEN</key>
<value>true</value>
</entry>
</capabilities>
<accessTokenResponseMaps>
<entry>
<key>ACCESSTOKEN</key>
<value>access_token</value>
</entry>
<entry>
<key>REFRESHTOKEN</key>
<value>refresh_token</value>
</entry>
<entry>
<key>id-token</key>
<value>id_token</value>
</entry>
<entry>
<key>access-token-issue-time</key>
<value>issued_at</value>
</entry>
<entry>
<key>access-token-expires-in</key>
<value>expires_in</value>
</entry>
<entry>
<key>username</key>
<value>preferred_username</value>
</entry>
</accessTokenResponseMaps>
</pluginOAuthConfig>

Install the Tableau OAuth config file for Tableau Desktop

After the configuration XML file is created, it must be copied to a location to be used by Amazon Redshift Connector from Tableau Desktop. Save the file from the previous step as .xml and save it under Documents\My Tableau Repository\OAuthConfigs.

Note: Currently this integration isn’t supported in macOS because the Redshift ODBC 2.X driver isn’t supported yet for MAC. It will be supported soon.

Setup the Tableau OAuth config file for Tableau Server or Tableau Cloud

To integrate with Amazon Redshift using IAM Identity Center authentication, you must install the Tableau OAuth config file in Tableau Server or Tableau Cloud

  1. Sign in to the Tableau Server or Tableau Cloud using admin credentials.
  2. Navigate to Settings.
  3. Go to OAuth Clients Registry and select Add OAuth Client
  4. Choose following settings:
    • Connection Type: Amazon Redshift
    • OAuth Provider: Custom_IdP
    • Client ID: Enter your IdP client ID value
    • Client Secret: Enter your client secret value
    • Redirect URL: Enter http://localhost/auth/add_oauth_token. This example uses localhost for testing in a local environment. You should use the full hostname with https.
    • Choose OAuth Config File. Select the XML file that you configured in the previous section.
    • Select Add OAuth Client and choose Save.

Figure 14: Create an OAuth connection in Tableau Server or Tableau Cloud

Federate to Amazon Redshift from Tableau Desktop

Now you’re ready to connect to Amazon Redshift from Tableau through federated sign-in using IAM Identity Center authentication. In this step, you create a Tableau Desktop report and publish it to Tableau Server.

  1. Open Tableau Desktop.
  2. Select Amazon Redshift Connector and enter the following values:
    1. Server: Enter the name of the server that hosts the database and the name of the database you want to connect to.
    2. Port: Enter 5439.
    3. Database: Enter your database name. This example uses dev.
    4. Authentication: Select OAuth.
    5. Federation Type: Select Identity Center.
    6. Identity Center Namespace: You can leave this value blank.
    7. OAuth Provider: This value should automatically be pulled from your configured XML. It will be the value from the element oauthConfigId.
    8. Select Require SSL.
    9. Choose Sign in.

Figure 15: Tableau Desktop OAuth connection

  1. Enter your IdP credentials in the browser pop-up window.

Figure 16: Okta Login Page

  1. When authentication is successful, you will see the message shown in Figure 17 that follows.

Figure 17: Successful authentication using Tableau

Congratulations! You’re signed in using IAM Identity Center integration with Amazon Redshift and are ready to explore and analyze your data using Tableau Desktop.

Figure 18: Successfully connected using Tableau Desktop

Figure 19 is a screenshot from the Amazon Redshift system table (sys_query_history) showing that user Ethan from Okta is accessing the sales report.

Figure 19: User audit in sys_query_history

After signing in, you can create your own Tableau Report on the desktop version and publish it to your Tableau Server. For this example, we created and published a report named SalesReport.

Federate to Amazon Redshift from Tableau Server

After you have published the report from Tableau Desktop to Tableau Server, sign in as a non-admin user and view the published report (SalesReport in this example) using IAM Identity Center authentication.

  1. Sign in to the Tableau Server site as a non-admin user.
  2. Navigate to Explore and go to the folder where your published report is stored.
  3. Select the report and choose Sign In.

Figure 20: Tableau Server Sign In

  1. To authenticate, enter your non-admin Okta credentials in the browser pop-up.

Figure 21: Okta Login Page

  1. After your authentication is successful, you can access the report.

Figure 22: Tableau report

Clean up

Complete the following steps to clean up your resources:

  1. Delete the IdP applications that you have created to integrate with IAM Identity Center.
  2. Delete the IAM Identity Center configuration.
  3. Delete the Amazon Redshift application and the Amazon Redshift provisioned cluster or serverless instance that you created for testing.
  4. Delete the IAM role and IAM policy that you created for IAM Identity Center and Amazon Redshift integration.
  5. Delete the permission set from IAM Identity Center that you created for Amazon Redshift Query Editor V2 in the management account.

Conclusion

This post covered streamlining access management for data analytics by using Tableau’s capability to support single sign-on based on the OAuth 2.0 OpenID Connect (OIDC) protocol. The solution enables federated user authentication, where user identities from an external IdP are trusted and propagated to Amazon Redshift. You walked through the steps to configure Tableau Desktop and Tableau Server to integrate seamlessly with Amazon Redshift using IAM Identity Center for single sign-on. By harnessing this integration of a third party IdP with IAM Identity Center, users can securely access Amazon Redshift data sources within Tableau without managing separate database credentials.

Listed below are key resources to learn more about Amazon Redshift integration with IAM Identity Center


About the Authors

Debu-PandaDebu Panda is a Senior Manager, Product Management at AWS. He is an industry leader in analytics, application platform, and database technologies, and has more than 25 years of experience in the IT world.

Sid Wray is a Senior Product Manager at Salesforce based in the Pacific Northwest with nearly 20 years of experience in Digital Advertising, Data Analytics, Connectivity Integration and Identity and Access Management. He currently focuses on supporting ISV partners for Salesforce Data Cloud.

Adiascar Cisneros is a Tableau Senior Product Manager based in Atlanta, GA. He focuses on the integration of the Tableau Platform with AWS services to amplify the value users get from our products and accelerate their journey to valuable, actionable insights. His background includes analytics, infrastructure, network security, and migrations.

Jade Koskela is a Principal Software Engineer at Salesforce. He has over a decade of experience building Tableau with a focus on areas including data connectivity, authentication, and identity federation.

Harshida Patel is a Principal Solutions Architect, Analytics with AWS.

Maneesh Sharma is a Senior Database Engineer at AWS with more than a decade of experience designing and implementing large-scale data warehouse and analytics solutions. He collaborates with various Amazon Redshift Partners and customers to drive better integration.

Ravi Bhattiprolu is a Senior Partner Solutions Architect at Amazon Web Services (AWS). He collaborates with strategic independent software vendor (ISV) partners like Salesforce and Tableau to design and deliver innovative, well-architected cloud products, integrations, and solutions to help joint AWS customers achieve their business goals.

Simplify custom contact center insights with Amazon Connect analytics data lake

Post Syndicated from Donnie Prakoso original https://aws.amazon.com/blogs/aws/simplify-custom-contact-center-insights-with-amazon-connect-analytics-data-lake/

Analytics are vital to the success of a contact center. Having insights into each touchpoint of the customer experience allows you to accurately measure performance and adapt to shifting business demands. While you can find common metrics in the Amazon Connect console, sometimes you need to have more details and custom requirements for reporting based on the unique needs of your business. 

Starting today, the Amazon Connect analytics data lake is generally available. As announced last year as preview, this new capability helps you to eliminate the need to build and maintain complex data pipelines. Amazon Connect data lake is zero-ETL capable, so no extract, transform, or load (ETL) is needed.

Here’s a quick look at the Amazon Connect analytics data lake:

Improving your customer experience with Amazon Connect
Amazon Connect analytics data lake helps you to unify disparate data sources, including customer contact records and agent activity, into a single location. By having your data in a centralized location, you now have access to analyze contact center performance and gain insights while reducing the costs associated with implementing complex data pipelines.

With Amazon Connect analytics data lake, you can access and analyze contact center data, such as contact trace records and Amazon Connect Contact Lens data. This provides you the flexibility to prepare and analyze data with Amazon Athena and use the business intelligence (BI) tools of your choice, such as, Amazon QuickSight and Tableau

Get started with the Amazon Connect analytics data lake
To get started with the Amazon Connect analytics data lake, you’ll first need to have an Amazon Connect instance setup. You can follow the steps in the Create an Amazon Connect instance page to create a new Amazon Connect instance. Because I’ve already created my Amazon Connect instance, I will go straight to showing you how you can get started with Amazon Connect analytics data lake.

First, I navigate to the Amazon Connect console and select my instance.

Then, on the next page, I can set up my analytics data lake by navigating to Analytics tools and selecting Add data share.

This brings up a pop-up dialog, and I first need to define the target AWS account ID. With this option, I can set up a centralized account to receive all data from Amazon Connect instances running in multiple accounts. Then, under Data types, I can select the types I need to share with the target AWS account. To learn more about the data types that you can share in the Amazon Connect analytics data lake, please visit Associate tables for Analytics data lake.

Once it’s done, I can see the list of all the target AWS account IDs with which I have shared all the data types.

Besides using the AWS Management Console, I can also use the AWS Command Line Interface (AWS CLI) to associate my tables with the analytics data lake. The following is a sample command:

$> aws connect batch-associate-analytics-data-set --cli-input-json file:///input_batch_association.json

Where input_batch_association.json is a JSON file that contains association details. Here’s a sample:

{
	"InstanceId": YOUR_INSTANCE_ID,
	"DataSetIds": [
		"<DATA_SET_ID>"
		],
	"TargetAccountId": YOUR_ACCOUNT_ID
} 

Next, I need to approve (or reject) the request in the AWS Resource Access Manager (RAM) console in the target account. RAM is a service to help you securely share resources across AWS accounts. I navigate to AWS RAM and select Resource shares in the Shared with me section.

Then, I select the resource and select Accept resource share

At this stage, I can access shared resources from Amazon Connect. Now, I can start creating linked tables from shared tables in AWS Lake Formation. In the Lake Formation console, I navigate to the Tables page and select Create table.

I need to create a Resource link to a shared table. Then, I fill in the details and select the available Database and the Shared table’s region.

Then, when I select Shared table, it will list all the available shared tables that I can access.

Once I select the shared table, it will automatically populate Shared table’s database and Shared table’s owner ID. Once I’m happy with the configuration, I select Create.

To run some queries for the data, I go to the Amazon Athena console.The following is an example of a query that I ran:

With this configuration, I have access to certain Amazon Connect data types. I can even visualize the data by integrating with Amazon QuickSight. The following screenshot show some visuals in the Amazon QuickSight dashboard with data from Amazon Connect.

Customer voice
During the preview period, we heard lots of feedback from our customers about Amazon Connect analytics data lake. Here’s what our customer say:

Joulica is an analytics platform supporting insights for software like Amazon Connect and Salesforce. Tony McCormack, founder and CEO of Joulica, said, “Our core business is providing real-time and historical contact center analytics to Amazon Connect customers of all sizes. In the past, we frequently had to set up complex data pipelines, and so we are excited about using Amazon Connect analytics data lake to simplify the process of delivering actionable intelligence to our shared customers.”

Things you need to know

  • Pricing — Amazon Connect analytics data lake is available for you to use up to 2 years of data without any additional charges in Amazon Connect. You only need to pay for any services you use to interact with the data.
  • Availability — Amazon Connect analytics data lake is generally available in the following AWS Regions: US East (N. Virginia), US West (Oregon), Africa (Cape Town), Asia Pacific (Mumbai, Seoul, Singapore, Sydney, Tokyo), Canada (Central), and Europe (Frankfurt, London)
  • Learn more — For more information, please visit Analytics data lake documentation page.

Happy building,
Donnie

AWS named a Leader in IDC MarketScape: Worldwide Analytic Stream Processing Software 2024 Vendor Assessment

Post Syndicated from Anna Montalat original https://aws.amazon.com/blogs/big-data/aws-named-a-leader-in-idc-marketscape-worldwide-analytic-stream-processing-software-2024-vendor-assessment/

We’re thrilled to announce that AWS has been named a Leader in the IDC MarketScape: Worldwide Analytic Stream Processing Software 2024 Vendor Assessment (doc #US51053123, March 2024).

We believe this recognition validates the power and performance of Apache Flink for real-time data processing, and how AWS is leading the way to help customers build and run fully managed Apache Flink applications. You can read the full report from IDC.

Unleashing real-time insights for your organization

Apache Flink’s robust architecture enables real-time data processing at scale, making it a favored choice among organizations for its efficiency and speed. With its advanced features for event time processing and state management, Apache Flink empowers users to build complex stream processing applications, making it indispensable for modern data-driven organizations. Managed Service for Apache Flink takes the complexity out of Apache Flink deployment and management, letting you focus on building game-changing applications. With Managed Service for Apache Flink, you can transform and analyze streaming data in real time using Apache Flink and integrate applications with other AWS services. There are no servers and clusters to manage, and there is no compute and storage infrastructure to set up. You pay only for the resources you use.

But what does this mean for your organizations and IT teams? The following are some use cases and benefits:

  • Faster insights, quicker action – Analyze data streams as they arrive, allowing you to react promptly to changing conditions and make informed decisions based on the latest information, achieving agility and competitiveness in dynamic markets.
  • Real-time fraud detection – Identify suspicious activity the moment it occurs, enabling proactive measures to protect your customers and revenue from potential financial losses, bolstering trust and security in your business operations.
  • Personalized customer interactions – Gain insights from user behavior in real time, enabling personalized experiences and the ability to proactively address potential issues before they impact customer satisfaction, fostering loyalty and enhancing brand reputation.
  • Data-driven optimization – Utilize real-time insights from sensor data and machine logs to streamline processes, identify inefficiencies, and optimize resource allocation, driving operational excellence and cost savings while maintaining peak performance.
  • Advanced AI – Continuously feed real-time data to your machine learning (ML) and generative artificial intelligence (AI) models, allowing them to adapt and personalize outputs for more relevant and impactful results.

Beyond the buzzword: Apache Flink in action

Apache Flink’s versatility extends beyond single use cases. The following are just a few examples of how our customers are taking advantage of its capabilities:

  • The National Hockey League is the second oldest of the four major professional team sports leagues in North America. Predicting events such as face-off winning probabilities during a live game is a complex task that requires processing a significant amount of quality historical data and data streams in real time. The NHL constructed the Face-off Probability model using Apache Flink. Managed Service for Apache Flink provides the underlying infrastructure for the Apache Flink applications, removing the need to self-manage an Apache Flink cluster and reducing maintenance complexity and costs.
  • Arity is a technology company focused on making transportation smarter, safer, and more useful. They transform massive amounts of data into actionable insights to help partners better predict risk and make smarter decisions in real time. Arity uses the managed ability of Managed Service for Apache Flink to transform and analyze streaming data in near real time using Apache Flink. On Managed Service for Apache Flink, Arity generates driving behavior insights based on collated driving data.
  • SOCAR is the leading Korean mobility company with strong competitiveness in car sharing. SOCAR solves mobility-related social problems, such as parking difficulties and traffic congestion, and changes the car ownership-oriented mobility habits in Korea.

Join the leaders in stream processing

By choosing Managed Service for Apache Flink, you’re joining a growing community of organizations who are unlocking the power of real-time data analysis. Get started today and see how Apache Flink can transform your data strategy, including powering the next generation of generative AI applications.

Ready to learn more?

Contact us today and discover how Apache Flink can empower your business.


About the author

Anna Montalat is the Product Marketing lead for AWS analytics and streaming data services, including Amazon Managed Streaming for Apache Kafka (MSK), Kinesis Data Streams, Kinesis Video Streams, Amazon Data Firehose, and Amazon Managed Service for Apache Flink, among others. She is passionate about bringing new and emerging technologies to market, working closely with service teams and enterprise customers. Outside of work, Anna skis through winter time and sails through summer.