Tag Archives: Analytics

Use the new SQL commands MERGE and QUALIFY to implement and validate change data capture in Amazon Redshift

Post Syndicated from Yanzhu Ji original https://aws.amazon.com/blogs/big-data/use-the-new-sql-commands-merge-and-qualify-to-implement-and-validate-change-data-capture-in-amazon-redshift/

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. Tens of thousands of customers use Amazon Redshift to process exabytes of data every day to power their analytics workloads.

Amazon Redshift has added many features to enhance analytical processing like ROLLUP, CUBE and GROUPING SETS, which were demonstrated in the post Simplify Online Analytical Processing (OLAP) queries in Amazon Redshift using new SQL constructs such as ROLLUP, CUBE, and GROUPING SETS. Amazon Redshift has recently added many SQL commands and expressions. In this post, we talk about two new SQL features, the MERGE command and QUALIFY clause, which simplify data ingestion and data filtering.

One familiar task in most downstream applications is change data capture (CDC) and applying it to its target tables. This task requires examining the source data to determine if it is an update or an insert to existing target data. Without the MERGE command, you needed to test the new dataset against the existing dataset using a business key. When that didn’t match, you inserted new rows in the existing dataset; otherwise, you updated existing dataset rows with new dataset values.

The MERGE command conditionally merges rows from a source table into a target table. Traditionally, this could only be achieved by using multiple insert, update, or delete statements separately. When using multiple statements to update or insert data, there is a risk of inconsistencies between the different operations. Merge operation reduces this risk by ensuring that all operations are performed together in a single transaction.

The QUALIFY clause filters the results of a previously computed window function according to user‑specified search conditions. You can use the clause to apply filtering conditions to the result of a window function without using a subquery. This is similar to the HAVING clause, which applies a condition to further filter rows from a WHERE clause. The difference between QUALIFY and HAVING is that filtered results from the QUALIFY clause could be based on the result of running window functions on the data. You can use both the QUALIFY and HAVING clauses in one query.

In this post, we demonstrate how to use the MERGE command to implement CDC and how to use QUALIFY to simplify validation of those changes.

Solution overview

In this use case, we have a data warehouse, in which we have a customer dimension table that needs to always get the latest data from the source system. This data must also reflect the initial creation time and last update time for auditing and tracking purposes.

A simple way to solve this is to override the customer dimension fully every day; however, that won’t achieve the update tracking, which is an audit mandate, and it might not be feasible to do for bigger tables.

You can load sample data from Amazon S3 by following the instruction here. Using the existing customer table under sample_data_dev.tpcds, we create a customer dimension table and a source table that will contain both updates for existing customers and inserts for new customers. We use the MERGE command to merge the source table data with the target table (customer dimension). We also show how to use the QUALIFY clause to simplify validating the changes in the target table.

To follow along with the steps in this post, we recommend downloading the accompanying notebook, which contains all the scripts to run for this post. To learn about authoring and running notebooks, refer to Authoring and running notebooks.

Prerequisites

You should have the following prerequisites:

Create and populate the dimension table

We use the existing customer table under sample_data_dev.tpcds to create a customer_dimension table. Complete the following steps:

  1. Create a table using a few selected fields, including the business key, and add a couple of maintenance fields for insert and update timestamps:
     -- create the customer dimension table DROP TABLE IF EXISTS customer_dim CASCADE;
    CREATE TABLE customer_dim ( 
    customer_dim_id     bigint GENERATED BY DEFAULT AS IDENTITY(1, 1), 
    c_customer_sk integer NOT NULL ENCODE az64 distkey,
    c_first_name character(20) ENCODE lzo,
    c_last_name character(30) ENCODE lzo,
    c_current_addr_sk integer ENCODE az64,
    c_birth_country character varying(20) ENCODE lzo,
    c_email_address character(50) ENCODE lzo,
    record_insert_ts    timestamp WITHOUT time ZONE DEFAULT current_timestamp ,
    record_upd_ts       timestamp WITHOUT time ZONE DEFAULT NULL
    )
    SORTKEY (c_customer_sk);

  2. Populate the dimension table:
    -- populate dimension 
    insert into customer_dim 
           (c_customer_sk, c_first_name,c_last_name, c_current_addr_sk, c_birth_country, c_email_address) 
    select  c_customer_sk, c_first_name,c_last_name, c_current_addr_sk, c_birth_country, c_email_address
    from “sample_data_dev”.”tpcds”.”customer”;

  3. Validate the row count and the contents of the table:
    -- check customers count and look at sample data
    select count(1) from customer_dim; 
    select * from customer_dim limit 10;

Simulate customer table changes

Use the following code to simulate changes made to the table:

-- create a source table with some updates and some inserts
-- Update- Email has changed for 100 customers 
drop table if exists src_customer;
create table src_customer distkey(c_customer_sk) as 
select c_customer_sk , c_first_name , c_last_name, c_current_addr_sk, c_birth_country, ‘x’+c_email_address as c_email_address, getdate() as effective_dt
from   customer_dim 
where  c_email_address is not null
limit 100;


-- also let’s add three completely new customers
insert into src_customer values 
(15000001, ‘Customer#15’,’000001’, 10001 ,’USA’    , ‘Customer#[email protected]’, getdate() ),
(15000002, ‘Customer#15’,’000002’, 10002 ,’MEXICO’ , ‘Customer#[email protected]’, getdate() ),
(15000003, ‘Customer#15’,’000003’, 10003 ,’CANADA’ , ‘Customer#[email protected]’, getdate() );

-- check source count
select count(1) from src_customer;

Merge the source table into the target table

Now you have a source table with some changes you need to merge with the customer dimension table.

Before the MERGE command, this type of task needed two separate UPDATE and INSERT commands to implement:

-- merge changes to dim customer
BEGIN TRANSACTION;
-- update current records
UPDATE customer_dim
SET    c_first_name      = src.c_first_name      ,
       c_last_name       = src.c_last_name       , 
       c_current_addr_sk = src.c_current_addr_sk , 
       c_birth_country   = src.c_birth_country   , 
       c_email_address   = src.c_email_address   ,
       record_upd_ts     = current_timestamp
from   src_customer AS src
where  customer_dim.c_customer_sk = src.c_customer_sk ;
-- Insert new records
INSERT INTO customer_dim (c_customer_sk, c_first_name,c_last_name, c_current_addr_sk, c_birth_country, c_email_address) 
select src.c_customer_sk, src.c_first_name,src.c_last_name, src.c_current_addr_sk, src.c_birth_country, src.c_email_address 
from   src_customer AS src
where  src.c_customer_sk NOT IN (select c_customer_sk from customer_dim);
-- end merge operation
COMMIT TRANSACTION;

The MERGE command uses a more straightforward syntax, in which we use the key comparison result to decide if we perform an update DML operation (when matched) or an insert DML operation (when not matched):

MERGE INTO customer_dim using src_customer AS src ON customer_dim.c_customer_sk = src.c_customer_sk
WHEN MATCHED THEN UPDATE 
SET c_first_name      = src.c_first_name      ,
    c_last_name       = src.c_last_name       , 
    c_current_addr_sk = src.c_current_addr_sk , 
    c_birth_country   = src.c_birth_country   , 
    c_email_address   = src.c_email_address   ,
    record_upd_ts     = current_timestamp
WHEN NOT MATCHED THEN INSERT (c_customer_sk, c_first_name,c_last_name, c_current_addr_sk, c_birth_country, c_email_address) 
                      VALUES (src.c_customer_sk, src.c_first_name,src.c_last_name, src.c_current_addr_sk, src.c_birth_country, src.c_email_address );

Validate the data changes in the target table

Now we need to validate the data has made it correctly to the target table. We can first check the updated data using the update timestamp. Because this was our first update, we can examine all rows where the update timestamp is not null:

-- Check the changes
-- to get updates
select * 
from customer_dim
where record_upd_ts is not null

Use QUALIFY to simplify validation of the data changes

We need to examine the data inserted in this table most recently. One way to do that is to rank the data by its insert timestamp and get those with the first rank. This requires using the window function rank() and also requires a subquery to get the results.

Before the availability of QUALIFY, we needed to build that using a subquery like the following:

select customer_dim_id,c_customer_sk ,c_first_name ,c_last_name ,c_current_addr_sk,c_birth_country ,c_email_address ,record_insert_ts ,record_upd_ts 
from 
( select rank() OVER (ORDER BY DATE_TRUNC(‘second’,record_insert_ts) desc) AS rnk, 
         customer_dim_id,c_customer_sk ,c_first_name ,c_last_name ,c_current_addr_sk,c_birth_country ,c_email_address ,record_insert_ts ,record_upd_ts 
  from customer_dim
  where record_upd_ts is null)
where rnk = 1;

The QUALIFY function eliminates the need for the subquery, as in the following code snippet:

-- to get the newly inserted rows we can make use of Qualify feature
select * 
from customer_dim
where record_upd_ts is null
qualify rank() OVER (ORDER BY DATE_TRUNC(‘second’,record_insert_ts) desc) = 1 

Validate all data changes

We can union the results of both queries to get all the inserts and update changes:

-- To get all changes
select *
from (
select 'Updates' as operations, cd.* 
from   customer_dim as cd
where  cd.record_upd_ts is not null
union 
select 'Inserts' as operations, cd.* 
from customer_dim cd
where cd.record_upd_ts is null
qualify rank() OVER (ORDER BY DATE_TRUNC('second',cd.record_insert_ts) desc) = 1 
) order by 1

Clean up

To clean up the resources used in the post, delete the Redshift provisioned cluster or Redshift Serverless workgroup and namespace you created for this post (this will also drop all the objects created).

If you used an existing Redshift provisioned cluster or Redshift Serverless workgroup and namespace, use the following code to drop these objects:

DROP TABLE IF EXISTS customer_dim CASCADE;
DROP TABLE IF EXISTS src_customer CASCADE;

Conclusion

When using multiple statements to update or insert data, there is a risk of inconsistencies between the different operations. The MERGE operation reduces this risk by ensuring that all operations are performed together in a single transaction. For Amazon Redshift customers who are migrating from other data warehouse systems or who regularly need to ingest fast-changing data into their Redshift warehouse, the MERGE command is a straightforward way to conditionally insert, update, and delete data from target tables based on existing and new source data.

In most analytic queries that use window functions, you may need to use those window functions in your WHERE clause as well. However, this is not permitted, and to do so, you have to build a subquery that contains the required window function and then use the results in the parent query in the WHERE clause. Using the QUALIFY clause eliminates the need for a subquery and therefore simplifies the SQL statement and makes it less difficult to write and read.

We encourage you to start using those new features and give us your feedback. For more details, refer to MERGE and QUALIFY clause.


About the authors

Yanzhu Ji is a Product Manager in the Amazon Redshift team. She has experience in product vision and strategy in industry-leading data products and platforms. She has outstanding skill in building substantial software products using web development, system design, database, and distributed programming techniques. In her personal life, Yanzhu likes painting, photography, and playing tennis.

Ahmed Shehata is a Senior Analytics Specialist Solutions Architect at AWS based on Toronto. He has more than two decades of experience helping customers modernize their data platforms. Ahmed is passionate about helping customers build efficient, performant, and scalable analytic solutions.

Ranjan Burman is an Analytics Specialist Solutions Architect at AWS. He specializes in Amazon Redshift and helps customers build scalable analytical solutions. He has more than 16 years of experience in different database and data warehousing technologies. He is passionate about automating and solving customer problems with cloud solutions.

Explore visualizations with AWS Glue interactive sessions

Post Syndicated from Annie Nelson original https://aws.amazon.com/blogs/big-data/explore-visualizations-with-aws-glue-interactive-sessions/

AWS Glue interactive sessions offer a powerful way to iteratively explore datasets and fine-tune transformations using Jupyter-compatible notebooks. Interactive sessions enable you to work with a choice of popular integrated development environments (IDEs) in your local environment or with AWS Glue or Amazon SageMaker Studio notebooks on the AWS Management Console, all while seamlessly harnessing the power of a scalable, on-demand Apache Spark backend. This post is part of a series exploring the features of AWS Glue interactive sessions.

AWS Glue interactive sessions now include native support for the matplotlib visualization library (AWS Glue version 3.0 and later). In this post, we look at how we can use matplotlib and Seaborn to explore and visualize data using AWS Glue interactive sessions, facilitating rapid insights without complex infrastructure setup.

Solution overview

You can quickly provision new interactive sessions directly from your notebook without needing to interact with the AWS Command Line Interface (AWS CLI) or the console. You can use magic commands to provide configuration options for your session and install any additional Python modules that are needed.

In this post, we use the classic Iris and MNIST datasets to navigate through a few commonly used visualization techniques using matplotlib on AWS Glue interactive sessions.

Create visualizations using AWS Glue interactive sessions

We start by installing the Sklearn and Seaborn libraries using the additional_python_modules Jupyter magic command:

%additional_python_modules scikit-learn, seaborn

You can also upload Python wheel modules to Amazon Simple Storage Service (Amazon S3) and specify the full path as a parameter value to the additional_python_modules magic command.

Now, let’s run a few visualizations on the Iris and MNIST datasets.

  1. Create a pair plot using Seaborn to uncover patterns within sepal and petal measurements across the iris species:
    import seaborn as sns
    import matplotlib.pyplot as plt
    
    # Load the Iris dataset
    iris = sns.load_dataset("iris")
    
    # Create a pair plot
    sns.pairplot(iris, hue="species")
    %matplot plt

  2. Create a violin plot to reveal the distribution of the sepal width measure across the three species of iris flowers:
    # Create a violin plot of the Sepal Width measure
    plt.figure(figsize=(10, 6))
    sns.violinplot(x="species", y="sepal_width", data=iris)
    plt.title("Violin Plot of Sepal Width by Species")
    plt.show()
    %matplot plt

  3. Create a heat map to display correlations across the iris dataset variables:
    # Calculate the correlation matrix
    correlation_matrix = iris.corr()
    
    # Create a heatmap using Seaborn
    plt.figure(figsize=(8, 6))
    sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm")
    plt.title("Correlation Heatmap")
    %matplot plt

  4. Create a scatter plot on the MNIST dataset using PCA to visualize distributions among the handwritten digits:
    import matplotlib.pyplot as plt
    from sklearn.datasets import fetch_openml
    from sklearn.decomposition import PCA
    
    # Load the MNIST dataset
    mnist = fetch_openml('mnist_784', version=1)
    X, y = mnist['data'], mnist['target']
    
    # Apply PCA to reduce dimensions to 2 for visualization
    pca = PCA(n_components=2)
    X_pca = pca.fit_transform(X)
    
    # Scatter plot of the reduced data
    plt.scatter(X_pca[:, 0], X_pca[:, 1], c=y.astype(int), cmap='viridis', s=5)
    plt.xlabel("Principal Component 1")
    plt.ylabel("Principal Component 2")
    plt.title("PCA - MNIST Dataset")
    plt.colorbar(label="Digit Class")
    
    %matplot plt

  5. Create another visualization using matplotlib and the mplot3d toolkit:
    import numpy as np
    import matplotlib.pyplot as plt
    from mpl_toolkits.mplot3d import Axes3D
    
    # Generate mock data
    x = np.linspace(-5, 5, 100)
    y = np.linspace(-5, 5, 100)
    x, y = np.meshgrid(x, y)
    z = np.sin(np.sqrt(x**2 + y**2))
    
    # Create a 3D plot
    fig = plt.figure(figsize=(10, 8))
    ax = fig.add_subplot(111, projection='3d')
    
    # Plot the surface
    surface = ax.plot_surface(x, y, z, cmap='viridis')
    
    # Add color bar to map values to colors
    fig.colorbar(surface, ax=ax, shrink=0.5, aspect=10)
    
    # Set labels and title
    ax.set_xlabel('X')
    ax.set_ylabel('Y')
    ax.set_zlabel('Z')
    ax.set_title('3D Surface Plot Example')
    
    %matplot plt

As illustrated by the preceding examples, you can use any compatible visualization library by installing the required modules and then using the %matplot magic command.

Conclusion

In this post, we discussed how extract, transform, and load (ETL) developers and data scientists can efficiently visualize patterns in their data using familiar libraries through AWS Glue interactive sessions. With this functionality, you’re empowered to focus on extracting valuable insights from their data, while AWS Glue handles the infrastructure heavy lifting using a serverless compute model. To get started today, refer to Developing AWS Glue jobs with Notebooks and Interactive sessions.


About the authors

Annie Nelson is a Senior Solutions Architect at AWS. She is a data enthusiast who enjoys problem solving and tackling complex architectural challenges with customers.

Keerthi Chadalavada is a Senior Software Development Engineer at AWS Glue. She is passionate about designing and building end-to-end solutions to address customer data integration and analytic needs.

Zach Mitchell is a Sr. Big Data Architect. He works within the product team to enhance understanding between product engineers and their customers while guiding customers through their journey to develop their enterprise data architecture on AWS.

Gal blog picGal Heyne is a Product Manager for AWS Glue with a strong focus on AI/ML, data engineering and BI. She is passionate about developing a deep understanding of customer’s business needs and collaborating with engineers to design easy to use data products.

New! Rate Limiting analytics and throttling

Post Syndicated from Radwa Radwan original http://blog.cloudflare.com/new-rate-limiting-analytics-and-throttling/

New! Rate Limiting analytics and throttling

New! Rate Limiting analytics and throttling

Rate Limiting rules are essential in the toolbox of security professionals as they are very effective in managing targeted volumetric attacks, takeover attempts, scraping bots, or API abuse. Over the years we have received a lot of feature requests from users, but two stand out: suggesting rate limiting thresholds and implementing a throttle behavior. Today we released both to Enterprise customers!

When creating a rate limit rule, one of the common questions is “what rate should I put in to block malicious traffic without affecting legitimate users?”. If your traffic is authenticated, API Shield will suggest thresholds based on auth IDs (such a session-id, cookie, or API key). However, when you don’t have authentication headers, you will need to create IP-based rules (like for a ‘/login’ endpoint) and you are left guessing the threshold. From today, we provide analytics tools to determine what rate of requests can be used for your rule.

So far, a rate limit rule could be created with log, challenge, or block action. When ‘block’ is selected, all requests from the same source (for example, IP) were blocked for the timeout period. Sometimes this is not ideal, as you would rather selectively block/allow requests to enforce a maximum rate of requests without an outright temporary ban. When using throttle, a rule lets through enough requests to keep the request rate from individual clients below a customer-defined threshold.

Continue reading to learn more about each feature.

Introducing Rate Limit Analysis in Security Analytics

The Security Analytics view was designed with the intention of offering complete visibility on HTTP traffic while adding an extra layer of security on top. It's proven a great value when it comes to crafting custom rules. Nevertheless, when it comes to creating rate limiting rules, relying solely on Security Analytics can be somewhat challenging.

To create a rate limiting rule you can leverage Security Analytics to determine the filter — what requests are evaluated by the rule (for example, by filtering on mitigated traffic, or selecting other security signals like Bot scores). However, you’ll also need to determine what’s the maximum rate you want to enforce and that depends on the specific application, traffic pattern, time of day, endpoint, etc. What’s the typical rate of legitimate users trying to access the login page at peak time? What’s the rate of requests generated by a botnet with the same JA3 fingerprint scraping prices from an ecommerce site? Until today, you couldn’t answer these questions from the analytics view.

That’s why we made the decision to integrate a rate limit helper into Security Analytics as a new tab called "Rate Limit Analysis," which concentrates on providing a tool to answer rate-related questions.

High level top statistics vs. granular Rate Limit Analysis

In Security Analytics, users can analyze traffic data by creating filters combining what we call top statistics. These statistics reveal the total volume of requests associated with a specific attribute of the HTTP requests. For example, you can filter the traffic from the ASNs that generated more requests in the last 24 hours, or you slice the data to look only at traffic reaching the most popular paths of your application. This tool is handy when creating rules based on traffic analysis.

New! Rate Limiting analytics and throttling

However, for rate limits, a more detailed approach is required.

The new Rate limit analysis tab now displays data on request rate for traffic matching the selected filter and time period. You can select a rate defined on different time intervals, like one or five minutes, and the attribute of the request used to identify the rate, such as IP address, JA3 fingerprint, or a combination of both as this often improves accuracy. Once the attributes are selected, the chart displays the distribution of request rates for the top 50 unique clients (identified as unique IPs or JA3s) observed during the chosen time interval in descending order.

New! Rate Limiting analytics and throttling

You can use the slider to determine the impact of a rule with different thresholds. How many clients would have been caught by the rule and rate limited? Can I visually identify abusers with above-average rate vs. the long tail of average users? This information will guide you in assessing what’s the most appropriate rate for the selected filter.

Using Rate Limit Analysis to define rate thresholds

It takes a few minutes to build your rate limit rule now. Let’s apply this to one of the common use cases where we identify /login endpoint and create a rate limit rule based on the IP with a logging action.

Define a scope and rate.

  • In the HTTP requests tab (the default view), start by selecting a specific time period. If you’re looking for the normal rate distribution you can specify a period with non-peak traffic. Alternatively, you can analyze the rate of offending users by selecting a period when an attack was carried out.

  • Using the filters in the top statistics, select a specific endpoint (e.g., /login). We can also focus on non-automated/human traffic using the bot score quick filter on the right sidebar or the filter button on top of the chart. In the Rate limiting Analysis tab, you can choose the characteristic (JA3, IP, or both) and duration (1 min, 5 mins, or 1 hour) for your rate limit rule. At this point, moving the dotted line up and down can help you choose an appropriate rate for the rule. JA3 is only available to customers using Bot Management.
New! Rate Limiting analytics and throttling
  • Looking at the distribution, we can exclude any IPs or ASNs that might be known to us, to have a better visual on end user traffic. One way to do this is to filter out the outliers right before the long tail begins. A rule with this setting will block the IPs/JA3 with a higher rate of requests.

Validate your rate. You can validate the rate by repeating this process but selecting a portion of traffic where you know there was an attack or traffic peak. The rate you've chosen should block the outliers during the attack and allow traffic during normal times. In addition to that, looking at the sampled logs can be helpful in verifying the fingerprints and filters chosen.

Create a rule. Selecting “Create rate limit rule” will take you to the rate limiting tab in the WAF with your filters pre-populated.

New! Rate Limiting analytics and throttling

Choose your action and behavior in the rule. Depending on your needs you can choose to log, challenge, or block requests exceeding the selected threshold. It’s often a good idea to first deploy the rule with a log action to validate the threshold and then change the action to block or challenge when you are confident with the result. With every action, you can also choose between two behaviors: fixed action or throttle. Learn more about the difference in the next section.

Introducing the new throttle behavior

Until today, the only available behavior for Rate Limiting has been fixed action, where an action is triggered for a selected time period (also known as timeout). For example, did the IP 192.0.2.23 exceed the rate of 20 requests per minute? Then block (or log) all requests from this IP for, let’s say, 10 minutes.

New! Rate Limiting analytics and throttling

In some situations, this type of penalty is too severe and risks affecting legitimate traffic. For example, if a device in a corporate network (think about NAT) exceeds the threshold, all devices sharing the same IP will be blocked outright.

With throttling, rate limiting selectively drops requests to maintain the rate within the specified threshold. It’s like a leaky bucket behavior (with the only difference that we do not implement a queuing system). For example, throttling a client to 20 requests per minute means that when a request comes from this client, we look at the last 60 seconds and see if (on average) we have received less than 20 requests. If this is true, the rule won’t perform any action. If the average is already at 20 requests then we will take action on that request. When another request comes in, we will check again. Since some time has passed the average rate might have dropped, making room for more requests.

New! Rate Limiting analytics and throttling

Throttling can be used with all actions: block, log, or challenge. When creating a rule, you can select the behavior after choosing the action.

New! Rate Limiting analytics and throttling

When using any challenge action, we recommend using the fixed action behavior. As a result, when a client exceeds the threshold we will challenge all requests until a challenge is passed. The client will then be able to reach the origin again until the threshold is breached again.

Throttle behavior is available to Enterprise rate limiting plans.

Try it out!

Today we are introducing a new Rate Limiting analytics experience along with the throttle behavior for all Rate Limiting users on Enterprise plans. We will continue to work actively on providing a better experience to save our customers' time. Log in to the dashboard, try out the new experience, and let us know your feedback using the feedback button located on the top right side of the Analytics page or by reaching out to your account team directly.

Deploy Amazon QuickSight dashboard for Amazon Pinpoint engagement events.

Post Syndicated from Pavlos Ioannou Katidis original https://aws.amazon.com/blogs/messaging-and-targeting/deploy-amazon-quicksight-dashboard-for-amazon-pinpoint-engagement-events/

Abstract

Business intelligence (BI) dashboards provide a graphical representation of metrics and key performance indicators (KPIs) to monitor the health of your business. By leveraging BI dashboards to analyze the performance of your customer communications, you gain valuable insights into how they are engaging with your messages and can make data-driven decisions to improve your marketing and communication strategies.

In this blog post we introduce a solution that automates the deployment of an Amazon QuickSight dashboard that enables marketers to analyze their long-term Amazon Pinpoint customer engagement data. These dashboards can be customized further depending the use case. This solution alleviates the need to create data pipelines for storage and analysis of Amazon Pinpoint’s engagement data, while offering a greater variety of widgets and views across email, SMS, campaigns, journeys and transactional messages when comparing to Amazon Pinpoint’s native dashboards.

Amazon Pinpoint is a flexible, scalable marketing communications service that connects you with customers over email, SMS, push notifications, or voice. The service offers ready to use dashboards to view key performance indicators (KPIs) for the various messaging channels, It provides 90 days of events for analysis. However, the raw events used to populate Amazon Pinpoint’s dashboards, can be streamed using Amazon Kinesis Data Firehose to a destination of your choice. This blog will walk you through leveraging this feature to create a data lake to store and analyze data beyond the initial 90 days.

Amazon QuickSight is a cloud-scale business intelligence (BI) service that you can use to deliver easy-to-understand insights in an interactive visual environment.

The solutions leverages the Amazon Cloud Development Kit (CDK) to deploy the needed infrastructure and dashboards.

Use Case(s)

The Amazon QuickSight dashboards deployed through this solution are designed to serve several use cases. Here are just a few examples:

  • View email and SMS costs per Campaign and Journey.
  • Deep dive into engagement insights and performance. (eg: SMS events, Email events, Campaign events, Journey events).
  • Schedule reports to various business stakeholders.
  • Track individual email & SMS statuses to specific endpoints.
  • Analyze open and click rates based on the message send time.

These are some of the use cases you can use these dashboards for and with all the data points being available in Amazon QuickSight, you can create your own views and widgets based on your specific requirements.

Solution Overview

This solution builds upon the Digital User Engagement (DUE) Event Database AWS Solution. It creates a long-term Amazon Pinpoint event data lake. This solution also builds a QuickSight dashboard to visualize and analyze this data. It leverages several other AWS services to tie i all together. It uses AWS Lambda for processing AWS CloudTrail data, Amazon Athena to build views using SQL for Amazon QuickSight, AWS CloudTrail to record any new campaign, journey and segment updates and Amazon DynamoDB to store the campaign, journey and segment metadata. This solution can be segmented into three logical portions: 1) Pinpoint campaign/journey/segment lookup tables. 2) Amazon Athena Views. 3) Amazon QuickSight resources.

The AWS Cloud Development Kit (CDK) is used to deploy this solution to your account. AWS CDK is an open-source software development framework for defining cloud infrastructure as code with modern programming languages and deploying it through AWS CloudFormation.

Pinpoint campaign/journey/segment lookup tables

architecture-diagram

  1. A CloudFormation AWS Lambda-backed custom resource function adds current Pinpoint campaign, journey and segment meta data to Amazon DynamoDB lookup tables. An AWS CloudFormation custom resource is managed by a Lambda function that runs only upon the deployment, update and deletion of the AWS CloudFormation stack.
  2. AWS CloudTrail logs record API actions to an S3 bucket every 5 minutes.
  3. When an AWS CloudTrail log is written to the S3 bucket an AWS Lambda function is invoked and checks for Amazon Pinpoint campaigns/journeys/segments management events such as create, update and delete.
  4. For every Amazon Pinpoint action the AWS Lambda function finds, it queries Amazon Pinpoint to get the respective resource details.
  5. The AWS Lambda function will create or update records in the Amazon DynamoDB table to reflect the changes.
  6. This solution also deploys an Amazon Athena DynamoDB connector. Amazon Athena uses this to query the Amazon DynamoDB lookup tables to enrich the data in the Amazon Pinpoint event data lake.
  7. The Amazon Athena to Amazon DynamoDB connector requires an Amazon S3 spill bucket for any data that exceeds the AWS Lambda function limits

Amazon Athena views

Amazon Athena views are crucial for querying and organizing the data. These views allow QuickSight to interact with the Pinpoint event data lake through standard SQL queries and views. Here’s how they’re set up:

The application creates several named queries (called saved queries in the Amazon Athena console). Each named query uses a SQL statement to create a database view containing a subset of the data from the Pinpoint event data lake (or joins data from a previous view with the Amazon DynamoDB tables created above. The views are also created using an AWS Lambda-backed custom resource.

Amazon QuickSight resources

quicksight-resources-diagram

  1. This solution creates several Amazon QuickSight resources to support the deployed dashboard. These include data sources, datasets, refresh schedules, and an analysis. The refresh schedule determines the frequency that Amazon QuickSight queries the Amazon Athena views to update the datasets.
  2. Amazon Athena retrieves live data from the DUE event database data lake and the Athena DynamoDB Connector whenever the Amazon QuickSight refresh schedule runs.

Prerequisites

  • Deploy the Digital User Engagement (DUE) Event Database solution before continuing
    • After you have deployed this solution, gather the following data from the stack’s Resources section.
      • DUES3DataLake: You will need the bucket name
      • PinpointProject: You will need the project Id
      • PinpointEventDatabase: This is the name of the Glue Database. You will only need this if you used something other than the default of due_eventdb

Note: If you are installing the DUE event database for the first time as part of these instructions, your dashboard will not have any data to show until new events start to come in from your Amazon Pinpoint project.

Once you have the DUE event database installed, you are ready to begin your deployment.

Implementation steps

Step 1 – Ensure that Amazon Athena is setup to store query results

Amazon Athena uses workgroups to separate users, teams, applications, or workloads, to set limits on amount of data each query or the entire workgroup can process, and to track costs. There is a default workgroup called “primary” However, before you can use this workgroup, it needs to be configured with an Amazon S3 bucket for storing the query results.

  1. If you do not have an existing S3 bucket you can use for the output, create a new Amazon S3 bucket.
  2. Navigate to the Amazon Athena console and from the menu select workgroups > primary > Edit > Query result configuration
    1. Select the Amazon S3 bucket and any specific directory for the Athena query result location

Note: If you choose to use a workgroup other that the default “primary” workgroup. Please take note of the workgroup name to be used later.

Step 2 – Enable Amazon QuickSight

Amazon QuickSight offers two types of data sets: Direct Query data sets, which provides real-time access to data sources, and SPICE (Super-fast, Parallel, In-memory Calculation Engine) data sets, which are pre-aggregated and cached for faster performance and scalability that can be refreshed on a schedule.

This solution uses SPICE datasets set to incrementally refresh on a cycle of your choice (Daily or Hourly). If you have already setup Amazon QuickSight, please navigate to Amazon QuickSight in the AWS Console and skip to step 3.

  1. Navigate to Amazon QuickSight on the AWS console
  2. Setup Amazon QuickSight account by clicking the “Sign up for QuickSight” button.
    1. You will need to setup an Enterprise account for this solution.
    2. To complete the process for the Amazon QuickSight account setup follow the instructions at this link
  3. Ensure you have the Admin Role
    1. Choose the profile icon in the top right corner, select Manage QuickSight and click on Manage Users
    2. Subscription details should display on the screen.
  4. Ensure you have enough SPICE capacity for the datasets
    1. Choose the profile icon, and then select Manage QuickSight
    2. Click on SPICE Capacity
  5. Make sure you enough SPICE for all three datasets
    1. if you are still in the free tier, you should have enough for initial testing.
    2. You will need about 2GB of capacity for every 1,000,000 Pinpoint events that will be ingested in to SPICE
    3. Note: If you do not have enough SPICE capacity, deployment will fail
  6. Please note the Amazon QuickSight username. You can find this by clicking profile icon. Example username: Admin/user-name

Step 3 – Collect the Amazon QuickSight Service Role name in IAM

For Amazon Athena, Amazon S3, and Athena Query Federation connections, Amazon QuickSight uses the following IAM “consumer” role by default: aws-quicksight-s3-consumers-role-v0

If the “consumer” role is not present, then QuickSight uses the following “service” role instead : aws-quicksight-service-role-v0.

The version number at the end of the role could be different in your account. Please validate your role name with the following steps.

  1. Navigate to the Identity and Access Management (IAM) console
  2. Go to Roles and search QuickSight
  3. If the consumer role exists, please note its full name
  4. If you only find the service role, please note its full name

Note: For more details on these service roles, please see the QuickSight User Guide

Step 4 – Prepare the CDK Application

Deploying this solution requires no previous experience with the AWS CDK toolkit. If you would like to familiarize yourself with CDK, the AWS CDK Workshop is a great place to start.

  1. Setup your integrated development environment (IDE)
    1. Option 1 (recommended for first time CDK users): Use AWS Cloud9 – a cloud-based IDE that lets you write, run, and debug your code with just a browser
      1. Navigate to Cloud9 in the AWS console and click the Create Environment button
      2. Provide a descriptive name to your environment (e.g. PinpointAnalysis)
      3. Leave the rest of the values as their default values and click Create
      4. Open the Cloud9 IDE
        1. Node, TypeScript, and CDK should be come pre-installed. Test this by running the following commands in your terminal.
          1. node --version
          2. tsc --version
          3. cdk --version
          4. If dependencies are not installed, follow the Step 1 instructions from this article
        2. Using AWS Cloud 9 will incur a nominal charge if you are no longer Free Tier eligible. However, using AWS Cloud9 will simply setup if you do not already have a local environment with AWS CDK and the AWS CLI installed
    2. Option 2: local IDE such as VS Code
      1. Setup CDK locally using this documentation
      2. Install Node, TypeScript and the AWS CLI
        1. Once the CLI is installed, configure your AWS credentials
          1. aws configure
  2. Clone the Pinpoint Dashboard Solution from your terminal by running the command below:
    1. git clone https://github.com/aws-samples/digital-user-engagement-events-dashboards.git
  3. Install the required npm packages from package.json by running the commands below:
    1. cd digital-user-engagement-events-dashboards
    2. npm install

Open the file at digital-user-engagement-events-dashboards/bin/pinpoint-bi-analysis.ts for editing in your IDE.

Edit the following code block your your solution with the information you have gathered in the previous steps. Please reference Table 1 for a description of each editable field.

const resourcePrefix = "pinpoint_analytics_";

...

new MainApp(app, "PinpointAnalytics", {
  env: {
    region: "us-east-1",
  }
  
  //Attributes to change
  dueDbBucketName: "{bucket-name}",
  pinpointProjectId: "{pinpoint-project-id}",
  qsUserName: "{quicksight-username}",

  //Default settings
  athenaWorkGroupName: "primary",
  dataLakeDbName: "due_eventdb",
  dateRangeNumberOfMonths: 6,
  qsUserRegion: "us-east-1", 
  qsDefaultServiceRole: "aws-quicksight-service-role-v0", 
  spiceRefreshInterval: "HOURLY",

  //Constants
  athena_util: athena_util,
  qs_util: qs_util,
});
Attribute Definition Example
resourcePrefix The prefix for all created Athena and QuickSight resources pinpoint_analytics_
region Where new resources will be deployed. This must be the same region that the DUE event database solution was deployed us-east-1
dueDbBucketName The name of the DUE event database S3 Bucket due-database-xxxxxxxxxxus-east-1
qsUserName The name of your QuickSight User Admin/my-user
athenaWorkGroupName The Athena workgroup that was previously configured primary
dataLakeDbName The Glue database created during the DUE event database solution. By default the database name is “due_eventdb” due_eventdb
dateRangeNumberOfMonths The number of months of data the Athena views will contain. QuickSight SPICE datasets will contain this many months of data initially and on full refresh. The QuickSight dataset will add new data incrementally without deleting historical data. 6
qsUserRegion The region where your quicksight user exists. By default, new users will be created in us-east-1. You can check your user location with the AWS CLI: aws quicksight list-users --aws-account-id {accout-id} --namespace default and look for the region in the arn us-east-1
qsDefaultServiceRole The service role collected during Step 3. aws-quicksight-service-role-v0
spiceRefreshInterval Options Include HOURLY, DAILY – This is how often the SPICE 7-day incremental window will be refreshed DAILY

Step 5 – Deploy

  1. CDK requires you to bootstrap in each region of an account. This creates a S3 bucket for deployment. You only need to bootstrap once per account/region
    1. cdk bootstrap
  2. Deploy the application
    1. cdk deploy

Step 6 – Explore

Once your solution deploys, look for the Outputs provided by the CDK CLI. You will find a link to your new Amazon Quicksight Analysis, or Dashboard, as well as a few other key resources. Also, explore the resources sections of the deployed stacks in AWS CloudFormation for a complete list of deployed resources. In the AWS CloudFormation, you should have two stacks. The main stack will be called PinpointAnalytics and a nested stack.

Pricing

The total cost to run this solution will depend on several factors. To help explore what the costs might look like for you, please look at the following examples.

All costs outlined below will assume the following:

  • 1 Amazon QuickSight author
  • 100 Amazon QuickSight analysis reader sessions
  • 100k write API actions for all services in AWS account
  • A total of 1k Amazon Pinpoint campaigns, journeys, and segments resulting in 1k Amazon DynamoDB records
  • 5 million monthly Amazon Pinpoint events – email send, email delivered, etc.

Base Costs:

  • 1 Amazon QuickSight author – can edit all Amazon QuickSight resources
    • $24 – There is a a 30 day trial for 4 authors in the free tier
  • 100 Amazon QuickSight analysis reader sessions OR 6 readers with unlimited access – max $5 per month per reader
    • $30
  • Total Monthly Costs: $54 / month

Variable Costs:

Even with the assumptions listed above, the costs will vary depending on the chosen data retention window as well as the the refresh schedule.

  • SPICE data storage costs.
    • Total size of storage will depend on how many months you choose to display in the dashboard
    • For the above assumptions, the SPICE datasets will cost roughly $3.25 for each month stored in the datasets.
  • Amazon Athena data volume costs
    • With Athena you are charged for the total number of bytes scanned in a query. The solution implements incremental data resfreshes in SPICE. Amazon QuickSight will only query and updates the most recent 7 days of data during each refresh cycle. This can be adjusted as needed.

Scenario 1 – 6-month data analysis with daily refresh:

  • Fixed costs: $57
  • SPICE datasets: $19.50
  • Athena Scans: $1.25
  • Total Costs: $77.75 / Month

Scenario 2 – 12-month data analysis with daily refresh:

  • Fixed costs: $57
  • SPICE datasets: $39
  • Athena Scans: $1.25
  • Total Costs: $97.25 / Month

Scenario 3 – 12-month data analysis with hourly refresh:

  • Fixed costs: $57
  • SPICE datasets: $39
  • Athena Scans: $27.50
  • Total Costs: $123.5 / Month

Note: Several services were not mentioned in the above scenarios (e.g., DynamoDB, Cloudtrail, Lambda, etc). The limited usage of these services resulted in a combined cost of less than a few US dollars per month. Even at a greater scale, the costs from these services will not increase in any significant way.

Clean up

  • Delete the CDK stack running the following from your command line
    • cdk destroy
  • Delete QuickSight account
  • Delete Athena views
    • Go to Glue > Data Catalog > Databases > Your Database Name
    • This should delete all Athena views no longer needed. Views created will start with the resourcePrefix specified in the bin/athena-quicksight-cdk.ts file
  • Delete S3 buckets
    • DynamoDB cloud watch log bucket
    • Dynamo Athena Connector Spill bucket
    • Athena workgroup output bucket
  • Delete DynamoDB tables
    • This solution creates two DynamoDB lookup tables prefixed with the Stack name

Conclusion

In this blog, you have deployed a solution that visualizes Amazon Pinpoint’s email and SMS engagement data using Amazon QuickSight. This solution provides you with an Amazon QuickSight functional dashboard as well as a foundation to design and build new Amazon QuickSight dashboards that meet your bespoke requirements. Parts of the solution, such as the Amazon Athena views, can be ingested with other business intelligence tools that your business might already be using.

Next steps

This solution can be expanded to include Amazon Pinpoint engagement events from other channels such as push notifications, Amazon Connect outbound calls, in-app and custom events. This will require certain updates on the Amazon Athena views and consequently on the Amazon QuickSight dashboards. Furthermore, the Amazon DynamoDB tables store only campaign, journey and segment meta-data. You can extend this part of the solution to include message template meta-data, which will help to analyze performance per message template.

Considerations / Troubleshooting

  • Pinpoint Standard account can be upgraded to an Enterprise account. Enterprise accounts cannot be downgraded to a Standard account.
  • SPICE capacity is allocated separately for each AWS Region. Default SPICE capacity is automatically allocated to your home AWS Region. For each AWS account, SPICE capacity is shared by all the people using QuickSight in a single AWS Region. The other AWS Regions have no SPICE capacity unless you choose to purchase some.
  • The QuickSight Analysis Event rates are calculated on Pinpoint message_id and endpoint_id grain – click rate will be the same if a user clicks an email link one or more than one times
  • All timestamps are in UTC. To display data in another timezone edit event_timestamp_timezone calculated field in every dataset
  • Data inside Amazon QuickSight will refresh depending on the schedule set during deployment. Current options include hourly and daily refreshes.
  • AWS CloudTrail has 5 cloudtrail trails per AWS account.

About the Authors

Spencer Harrison

Spencer Harrison

Spencer was a 2023 WWPS Solution Architect intern at Amazon Web Services. He will graduate with his Masters of Information Systems Management from Brigham Young University in the spring of 2024. After graduation he is aspiring to find opportunities as a solution architect, cloud engineer, or DevOps engineer. Outside of work, Spencer loves going outdoors to wake surf, downhill ski, and play pickle ball.

Daniel Wells

Daniel Wells

With over 20 years of IT experience, Daniel has held many architecture and director positions supporting a wide variety of technologies. He currently works as an AWS Solutions Architect supporting Education Technology companies striving to make a difference for learners and educators worldwide. Daniel’s interests outside of work include music, family, health, education and anything that allows him to express himself creatively.

Pavlos Ioannou Katidis

Pavlos Ioannou Katidis

Pavlos Ioannou Katidis is an Amazon Pinpoint and Amazon Simple Email Service Senior Specialist Solutions Architect at AWS. He enjoys diving deep into customers’ technical issues and help in designing communication solutions. In his spare time, he enjoys playing tennis, watching crime TV series, playing FPS PC games, and coding personal projects.

Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints – Part 2

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/optimize-checkpointing-in-your-amazon-managed-service-for-apache-flink-applications-with-buffer-debloating-and-unaligned-checkpoints-part-2/

This post is a continuation of a two-part series. In the first part, we delved into Apache Flink‘s internal mechanisms for checkpointing, in-flight data buffering, and handling backpressure. We covered these concepts in order to understand how buffer debloating and unaligned checkpoints allow us to enhance performance for specific conditions in Apache Flink applications.

In Part 1, we introduced and examined how to use buffer debloating to improve in-flight data processing. In this post, we focus on unaligned checkpoints. This feature has been available since Apache Flink 1.11 and has received many improvements since then. Unaligned checkpoints help, under specific conditions, to reduce checkpointing time for applications suffering temporary backpressure, and can be now enabled in Amazon Managed Service for Apache Flink applications running Apache Flink 1.15.2 through a support ticket.

Even though this feature might improve performance for your checkpoints, if your application is constantly failing because of checkpoints timing out, or is suffering from having constant backpressure, you may require a deeper analysis and redesign of your application.

Aligned checkpoints

As discussed in Part 1, Apache Flink checkpointing allows applications to record state in case of failure. We’ve already discussed how checkpoints, when triggered by the job manager, signal all source operators to snapshot their state, which is then broadcasted as a special record called a checkpoint barrier. This process achieves exactly-once consistency for state in a distributed streaming application through the alignment of these barriers.

Let’s walk through the process of aligned checkpoints in a standard Apache Flink application. Remember that Apache Flink distributes the workload horizontally: each operator (a node in the logical flow of your application, including sources and sinks) is split into multiple sub-tasks based on its parallelism.

Barrier alignment

The alignment of checkpoint barriers is crucial for achieving exactly-once consistency in Apache Flink applications during checkpoint runs. To recap, when a job manager triggers a checkpoint, all sub-tasks of source operators receive a signal to initiate the checkpoint process. Each sub-task independently snapshots its state to the state backend and broadcasts a special record known as a checkpoint barrier to all outgoing streams.

When an application operates with a parallelism higher than 1, multiple instances of each task—referred to as sub-tasks—enable parallel message consumption and processing. A sub-task can receive distinct partitions of the same stream from different upstream sub-tasks, such as after a stream repartitioning with keyBy or rebalance operations. To maintain exactly-once consistency, all sub-tasks must wait for the arrival of all checkpoint barriers before taking a snapshot of the state. The following diagram illustrates the checkpoint barriers flow.

Checkpoint Barriers flow in the Buffer Queues

This phase is called checkpoint alignment. During alignment, the sub-task stops processing records from the partitions from which it has already received barriers, as shown in the following figure.

The first Barrier reaches the sub-task: Checkpointing Alignment begins

However, it continues to process partitions that are behind the barrier.

Processing continues only for partitions behind the barrier

When barriers from all upstream partitions have arrived, the sub-task takes a snapshot of its state.

Barrier alignment complete: snapshot state

Then it broadcasts the barrier downstream.

Emit Barriers downstream, and continue processing

The time a sub-task spends waiting for all barriers to arrive is measured by the checkpoint Alignment Duration metric, which can be observed in the Apache Flink UI.

If the application experiences backpressure, an increase in this metric could lead to longer checkpoint durations and even checkpoint failures due to timeouts. This is where unaligned checkpoints become a viable option to potentially enhance checkpointing performance.

Unaligned checkpoints

Unaligned checkpoints address situations where backpressure is not just a temporary spike, but results in timeouts for aligned checkpoints, due to barrier queuing within the stream. As discussed in Part 1, checkpoint barriers can’t overtake regular records. Therefore, significant backpressure can slow down the movement of barriers across the application, potentially causing checkpoint timeouts.

The objective of unaligned checkpoints is to enable barrier overtaking, allowing barriers to move swiftly from source to sink even when the data flow is slower than anticipated.

Building on what we saw in Part 1 concerning checkpoints and what aligned checkpoints are, let’s explore how unaligned checkpoints modify the checkpointing mechanism.

Upon emission, each source’s checkpoint barrier is injected into the stream flowing across sub-tasks. It travels from the source output network buffer queue into the input network buffer queue of the subsequent operator.

Upon the arrival of the first barrier in the input network buffer queue, the operator initially waits for barrier alignment. If the specified alignment timeout expires because not all barriers have reached the end of the input network buffer queue, the operator switches to unaligned checkpoint mode.

The alignment timeout can be set programmatically by env.getCheckpointConfig().setAlignedCheckpointTimeout(Duration.ofSeconds(30)), but modifying the default is not recommended in Apache Flink 1.15.

Checkpoint barriers flow in the buffer queues

The operator waits until all checkpoint barriers are present in the input network buffer queue before triggering the checkpoint. Unlike aligned checkpoints, the operator doesn’t need to wait for all barriers to reach the queue’s end, allowing the operator to have in-flight data from the buffer that hasn’t been processed before checkpoint initiation.

All barriers are in the input queues

After all barriers have arrived in the input network buffer queue, the operator advances the barrier to the end of the output network buffer queue. This enhances checkpointing speed because the barrier can smoothly traverse the application from source to sink, independent of the application’s end-to-end latency.

Barriers can overtake in-flight messages

After forwarding the barrier to the output network buffer queue, the operator initiates the snapshot of in-flight data between the barriers in the input and output network buffer queues, along with the snapshot of the state.

Although processing is momentarily paused during this process, the actual writing to the remote persistent state storage occurs asynchronously, preventing potential bottlenecks.

Snapshot state and in-flight messages

The local snapshot, encompassing in-flight messages and state, is saved asynchronously in the remote persistent state store, while the barrier continues its journey through the application.

Processing continues

When to use unaligned checkpoints

Remember, barrier alignment only occurs between partitions coming from different sub-tasks of the same operator. Therefore, if an operator is experiencing temporary backpressure, enabling unaligned checkpoints may be beneficial. This way, the application doesn’t have to wait for all barriers to reach the operator before performing the snapshot of state or moving the barrier forward.

Temporary backpressure could arise from the following:

  • A surge in data ingestion
  • Backfilling or catching up with historical data
  • Increased message processing time due to delayed external systems

Another scenario where unaligned checkpoints prove advantageous is when working with exactly-once sinks. Utilizing the two-phase commit sink function for exactly-once sinks, unaligned checkpoints can expedite checkpoint runs, thereby reducing end-to-end latency.

When not to use unaligned checkpoints

Unaligned checkpoints won’t reduce the time required for savepoints (called snapshots in the Amazon Managed Service for Apache Flink implementation) because savepoints exclusively utilize aligned checkpoints. Furthermore, because Apache Flink doesn’t permit concurrent unaligned checkpoints, savepoints won’t occur simultaneously with unaligned checkpoints, potentially elongating savepoint durations.

Unaligned checkpoints won’t fix any underlying issue in your application design. If your application is suffering from persistent backpressure or constant checkpointing timeouts, this might indicate data skewness or underprovisioning, which may require improving and tuning the application.

Using unaligned checkpoints with buffer debloating

One alternative for reducing the risks associated with an increased state size is to combine unaligned checkpoints with buffer debloating. This approach results in having less in-flight data to snapshot and store in the state, along with less data to be used for recovery in case of failure. This synergy facilitates enhanced performance and efficient checkpoint runs, leading to smaller checkpointing sizes and faster recovery times. When testing the use of unaligned checkpoints, we recommend doing so with buffer debloating to prevent the state size from increasing.

Limitations

Unaligned checkpoints are subject to the following limitations:

  • They provide no benefit for operators with a parallelism of 1.
  • They only improve performance for operators where barrier alignment would have occurred. This alignment happens only if records are coming from different sub-tasks of the same operator, for example, through repartitioning or keyBy operations.
  • Operators receiving input from multiple sources or participating in joins might not experience improvements, because the operator would be receiving data from different operators in those cases.
  • Although checkpoint barriers can surpass records in the network’s buffer queue, this won’t occur if the sub-task is currently processing a message. If processing a message takes too much time (for example, a flat-map operation emitting numerous records for each input record), barrier handling will be delayed.
  • As we have seen, savepoints always use aligned checkpoints. If the savepoints of your applications are slow due to barrier alignment, unaligned checkpoints will not help.
  • Additional limitations affect watermarks, message ordering, and broadcast state in recovery. For more details, refer to Limitations.

Considerations

Considerations for implementing unaligned checkpoints:

  • Unaligned checkpoints introduce additional I/O to checkpoint storage
  • Checkpoints encompass not only operator state but also in-flight data within network buffer queues, leading to increased state size

Recommendations

We offer the following recommendations:

  • Consider enabling unaligned checkpoints only if both of the following conditions are true:
  • Checkpoints are timing out.
  • The average checkpoint Async Duration of any operator is more than 50% of the total checkpoint duration for the operator (sum of Sync Duration + Async Duration).
  • Consider enabling buffer debloating first, and evaluate whether it solves the problem of checkpoints timing out.
  • If buffer debloating doesn’t help, consider enabling unaligned checkpoints along with buffer debloating. Buffer debloating mitigates the drawbacks of unaligned checkpoints, reducing the amount of in-flight data.
  • If unaligned checkpoints and buffer debloating together don’t improve checkpoint alignment duration, consider testing unaligned checkpoints alone.

Decision flow

Finally, but most importantly, always test unaligned checkpoints in a non-production environment first, running some comparative performance testing with a realistic workload, and verify that unaligned checkpoints actually reduce checkpoint duration.

Conclusion

This two-part series explored advanced strategies for optimizing checkpointing within your Amazon Managed Service for Apache Flink applications. By harnessing the potential of buffer debloating and unaligned checkpoints, you can unlock significant performance improvements and streamline checkpoint processes. However, it’s important to understand when these techniques will provide improvements and when they will not. If you believe your application may benefit from checkpoint performance improvement, you can enable these features in your Amazon Managed Service For Apache Flink version 1.15 applications. We recommend first enabling buffer debloating and testing the application. If you are still not seeing the expected outcome, enable buffer debloating with unaligned checkpoints. This way, you can immediately reduce the state size and the additional I/O to state backends. Lastly, you may try using unaligned checkpoints by itself, bearing in mind the considerations we’ve mentioned.

With a deeper understanding of these techniques and their applicability, you are better equipped to maximize the efficiency of checkpoints and mitigate the effect of backpressure in your Apache Flink application.


About the Authors

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Solution Architect helping customers across EMEA. He has been building cloud-native, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Francisco MorilloFrancisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers helping them design real-time analytics architectures using AWS services, supporting Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS’s managed offering for Apache Flink.

Optimize checkpointing in your Amazon Managed Service for Apache Flink applications with buffer debloating and unaligned checkpoints – Part 1

Post Syndicated from Lorenzo Nicora original https://aws.amazon.com/blogs/big-data/part-1-optimize-checkpointing-in-your-amazon-managed-service-for-apache-flink-applications-with-buffer-debloating-and-unaligned-checkpoints/

This post is the first of a two-part series regarding checkpointing mechanisms and in-flight data buffering. In this first part, we explain some of the fundamental Apache Flink internals and cover the buffer debloating feature. In the second part, we focus on unaligned checkpoints.

Apache Flink is an open-source distributed engine for stateful processing over unbounded datasets (streams) and bounded datasets (batches). Amazon Managed Service for Apache Flink, formerly known as Amazon Kinesis Data Analytics, is the AWS service offering fully managed Apache Flink.

Apache Flink is designed for stateful processing at scale, for high throughput and low latency. It scales horizontally, distributing processing and state across multiple nodes, and is designed to withstand failures without compromising the exactly-once consistency it provides.

Internally, Apache Flink uses clever mechanisms to maintain exactly-once state consistency, while also optimizing for throughput and reduced latency. The default behavior works well for most use cases. Recent versions introduced two functionalities that can be optionally enabled to improve application performance under particular conditions: buffer debloating and unaligned checkpoints.

Buffer debloating and unaligned checkpoints can be enabled on Amazon Managed Service for Apache Flink version 1.15.

To understand how these functionalities can help and when to use them, we need to dive deep into some of the fundamental internal mechanisms of Apache Flink: checkpointing, in-flight data buffering, and backpressure.

Maintaining state consistency through failures with checkpointing

Apache Flink checkpointing periodically saves the internal application state for recovering in case of failure. Each of the distributed components of an application asynchronously snapshots its state to an external persistent datastore. The challenge is taking snapshots guaranteeing exactly-once consistency. A naïve “stop-the-world, take a snapshot” implementation would never meet the high throughput and low latency goals Apache Flink has been designed for.

Let’s walk through the process of checkpointing in a simple streaming application.

As shown in the following figure, Apache Flink distributes the work horizontally. Each operator (a node in the logical flow of your application, including sources and sinks) is split into multiple sub-tasks, based on its parallelism. The application is coordinated by a job manager. Checkpoints are periodically initiated by the job manager, sending a signal to all source operators’ sub-tasks.

Checkpoint initiated by the Job Manager

On receiving the signal, each source sub-task independently snapshots its state (for example, the offsets of the Kafka topic it is consuming) to a persistent storage, and then broadcasts a special record called checkpoint barrier (“CB” in the following diagrams) to all outgoing streams. Checkpoint barriers work similarly to watermarks in Apache Flink, flowing in-bands, along with normal records. A barrier does not overtake normal records and is not overtaken.

Source operators emit checkpoint bariers

When a downstream operator’s sub-task receives all checkpoint barriers from all input channels, it starts snapshotting its state.

A sub-task does not pause processing while saving its state to the remote, persistent state backend. This is a two-phase operation. First, the sub-task takes a snapshot of the state, on the local file system or in memory, depending on application configuration. This operation is blocking but very fast. When the snapshot is complete, it restarts processing records, while the state is asynchronously saved to the external, persistent state store. When the state is successfully saved to the state store, the sub-task acknowledges to the job manager that its checkpointing is complete.

The time a sub-task spends on the synchronous and asynchronous parts of the checkpoint is measured by Sync Duration and Async Duration metrics, shown by the Apache Flink UI. It is then asynchronously sent to the backend. After the fast snapshot, the sub-task restarts processing messages. The backend notifies the sub-task when the state has been successfully saved. The sub-task, in turn, sends an acknowledgment to the job manager that checkpointing is complete.

Sub-tasks acknowledge checkpoint completion

Checkpoint barriers propagate through all operators, down to the sinks. When all sink sub-tasks have acknowledged the checkpoint to the job manager, the checkpoint is declared complete and can be used to recover the application, for example in case of failure.

Sink operators acknowledge checkpoint is complete

Checkpoint barrier alignment

A sub-task may receive different partitions of the same stream from different upstream sub-tasks, for example when a stream is repartitioned with a keyBy or a rebalance. Each upstream sub-task will emit a checkpoint barrier independently. To maintain exactly-once consistency, the sub-task must wait for the barriers to arrive on all input partitions before taking a snapshot of its state.

This phase is called checkpoint alignment. During the alignment, the sub-task stops processing records from the partitions it already received the barrier from and continues processing the partitions that are behind the barrier.

After the barriers from all upstream partitions have arrived, the sub-task takes the snapshot of its state and then broadcasts the barrier downstream.

The time spent by a sub-task while aligning barriers is measured by the Checkpoint Alignment Duration metric, shown by the Apache Flink UI.

Checkpoint barrier alignment

In-flight data buffering

To optimize for throughput, Apache Flink tries to keep each sub-task always busy. This is achieved by transmitting records over the network in blocks and by buffering in-flight data. Note that this is data transmission optimization; Flink operators always process records one at the time.

Data is handed over between sub-tasks in units called network buffers. A network buffer has a fixed size, in bytes.

Sub-tasks also buffer in-flight input and output data. These buffers are called network buffer queues. Each queue is composed of multiple network buffers. Each sub-task has an input network buffer queue for each upstream sub-task and an output network buffer queue for each downstream sub-task.

Each record emitted by the sub-task is serialized, put into network buffers, and published to the output network buffer queue. To use all the available space, multiple messages can be packed into a single network buffer or split across subsequent network buffers.

A separate thread sends full network buffers over the network, where they are stored in the destination sub-task’s input network buffer queue.

When the destination sub-task thread is free, it deserializes the network buffers, rebuilds the records, and processes them one at a time.

Network Buffer Queue

Backpressure

If a sub-task can’t keep up with processing records at the same pace they are received, the input queue fills up. When the input queue is full, the upstream sub-task stops sending data.

Data accumulates in the sender’s output queue. When this is also full, the sender sub-task stops processing records, accumulating received data in its own input queue, and the effects propagates upstream.

This is the backpressure that Apache Flink uses to control the internal flow, preventing slow operators from being overwhelmed by slowing down the upstream flow. Backpressure is a safety mechanism to maximize the application throughput. It can be temporary, in case of an unexpected peak of ingested data, for example. If not temporary, it is usually the symptom—not the cause—that the application is not designed correctly or it has insufficient resources to process the workload.

Full Network Buffer Queue generates backpressure

In-flight buffering and checkpoint barriers

As checkpoint barriers flow with normal records, they also flow in the network buffers, through the input and output queues. In normal conditions, barriers don’t overtake records, and they are never overtaken. If records are queueing up due to backpressure, checkpoint barriers are also stuck in the queue, taking longer time to propagate from the sources to the sinks, delaying the completion of the checkpoint.

In the second part of this series, we will see how unaligned checkpoints can let barriers overtake records under specific conditions. For now, let’s see how we can optimize the size of input and output queues with buffer debloating.

Buffer debloating to optimize in-flight data

The default network buffer queue size is a good compromise for most applications. You can modify this size, but it applies to all sub-tasks, and it may be difficult to optimize this one-size-fits-all across different operators.

Longer queues support bigger throughout, but they may slow down checkpoint barriers that have to go through longer queues, causing longer End to End Checkpoint Duration. Ideally, the amount of in-flight data should be adjusted based on the actual throughput.

In version 1.14, Apache Flink introduced buffer debloating, which can be enabled to adjust in-flight data of each sub-task, based on the current throughput the sub-task is processing, and periodically reassess and readjust it.

How buffer debloating helps your application

Consider a streaming application, ingesting records from a streaming source and publishing the results to a streaming destination after some transformations. Under normal conditions, the application is sized to process the incoming throughput smoothly. Our destination has limited capacity, for example a Kafka topic throttled via quotas, sufficient to handle the normal throughput, with some margin.

In-flight data buffering under normal throughput

Imagine that the ingestion throughput has occasional peaks. These peaks exceed the limits of the streaming destination (throughput quota of the Kafka topic), which starts throttling.

Full in-flight data buffer to the sink backpressure the preceding operator

Because the sink can’t process the full throughput, in-flight data accumulates upstream of the sink, causing backpressure on the upstream operator. The effect eventually propagates up to the source, and the source starts lagging behind the most recent record in the source stream.

Backpressure propagates upstream, up to the source operator

As long this is a temporary condition, backpressure and lagging are not a problem per se, as long as the application is able to catch up when the peak has finished.

Unfortunately, accumulating in-flight data also slows down the propagation of the checkpoint barriers. Checkpoint End to End Duration goes up, and checkpoints may eventually time out.

Full in-flight data buffers slow down checkpoint barrier propagation, under backpressure

The situation is even worse if the sink uses two-phase commit for exactly-once guarantees. For example, KafkaSink uses Kafka transactions committed on checkpoints. If checkpoints become too slow, transactions are committed later, significantly increasing the latency of any downstream consumer using a read-committed isolation level.

Slow checkpoints under backpressure may also cause a vicious cycle. A slowed-down application eventually crashes, and recovers from the last checkpoint that is quite old. This causes a long reprocessing that, in turn, induces more backpressure and even slower checkpoints.

In this case, buffer debloating can help by adjusting the amount of in-flight data based on the throughput each sub-task is actually processing. When a sub-task is throttled by backpressure, the amount of in-flight data is reduced, also reducing the time checkpoint barriers take to go through all operators. Checkpoint End to End Duration goes down, and checkpoints do not time out.

Buffer debloating internals

Buffer debloating estimates the throughput a sub-task is capable of processing, assuming no idling, and limits the upstream in-flight data buffers to contain just enough data to be processed in 1 second (by default).

For efficiency, network buffers in the queues are fixed. Buffer debloating caps the usable size of each network buffer, making it smaller when the sub-task is processing slowly.

Buffer debloating speed up barrier propagation, reducing the volume of in-flight data

The benefits of less in-flight data depends on whether Apache Flink is using standard checkpoint alignment, the default behavior described so far, or unaligned checkpoints. We will examine unaligned checkpoints in the second part of this series, but let’s see the effect of buffer debloating, briefly.

  • With aligned checkpoints (default behavior) – Less in-flight data makes checkpoint barrier propagation faster, ultimately reducing the end-to-end checkpoint duration but also making it more predictable
  • With unaligned checkpoints (optional) – Less in-flight data reduces the amount of in-flight records stored with the checkpoint, ultimately reducing the checkpoint size

What buffer debloating does not do

Note that the problem we are trying to solve is slow checkpointing (or excessive checkpointing size, with unaligned checkpoints). Buffer debloating helps making checkpointing faster.

Buffer debloating does not remove backpressure. Backpressure is the internal protective mechanism that Apache Flink uses when some part of the application is not able to cope with the incoming throughput. To reduce backpressure, you have to work on other aspects of the application. When backpressure is only temporary, for example under peak conditions, the only way of removing it would be sizing the end-to-end system for the peak, rather than normal workload. But this could be impossible or too expensive.

Buffer debloating helps reduce and keep checkpoint duration stable under exceptional and temporary conditions. If an application experiences backpressure under its normal workload, or checkpoints are too slow under normal conditions, you should investigate the implementation of your application to understand the root cause.

When the automatic throughput prediction fails

Buffer debloating doesn’t have any particular drawback, but in corner cases, the mechanism may incorrectly estimate the throughput, and the resulting amount of in-flight data may not be optimal.

Estimating the throughput is complex when an operator receives data from multiple upstream operators, connected streams or unions, with very different throughput. It may also take time to adjust to a sudden spike, causing a temporary suboptimal buffering.

  • Too small in-flight data may reduce the throughput the sub-task can process (it will be idling), causing more backpressure upstream
  • Too large buffers may slow down checkpointing and increase the checkpoint size (with unaligned checkpoints)

Conclusion

The checkpointing mechanism makes Apache Flink fault tolerant, providing exactly-once state consistency. In-flight data buffering and backpressure control the data flow within the distributed streaming application maximize the throughput. Apache Flink default behaviors and configurations are good for most workloads.

The effectiveness of buffer debloating depends on the characteristics of the workload and the application. The general recommendation is to test the functionality in a non-production environment with a realistic workload to verify it actually helps with your use case.

You can request to enable buffer debloating on your Amazon Managed Service for Apache Flink application.

Under particular conditions, the combined effect of backpressure and in-flight data buffering may slow down checkpointing, increase checkpointing size (with unaligned checkpoints), and even cause checkpoints to fail. In these cases, enabling unaligned checkpointing may help reduce checkpoint duration or size.

In the second part of this series, we will understand better unaligned checkpoints and how they can help your application checkpointing efficiently in presence of backpressure, especially in combination with buffer debloating.


About the Authors

Lorenzo NicoraLorenzo Nicora works as Senior Streaming Solution Architect at AWS, helping customers across EMEA. He has been building cloud-native, data-intensive systems for over 25 years, working in the finance industry both through consultancies and for FinTech product companies. He has leveraged open-source technologies extensively and contributed to several projects, including Apache Flink.

Francisco MorilloFrancisco Morillo is a Streaming Solutions Architect at AWS. Francisco works with AWS customers helping them design real-time analytics architectures using AWS services, supporting Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS’s managed offering for Apache Flink.

Capacity Management and Amazon EMR Managed Scaling improvements for Amazon EMR on EC2 clusters

Post Syndicated from Sushant Majithia original https://aws.amazon.com/blogs/big-data/capacity-management-and-amazon-emr-managed-scaling-improvements-for-amazon-emr-on-ec2-clusters/

In 2022, we told you about the new enhancements we made in Amazon EMR Managed Scaling, which helped improve cluster utilization as well as reduced cluster costs. In 2023, we are happy to report that the Amazon EMR team has been hard at work. We worked backward from customer requirements and launched multiple new features to enhance your Amazon EMR on EC2 clusters capacity management and scaling experience.

Amazon EMR is the cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning (ML) using open-source frameworks such as Apache Spark, Apache Hive, and Presto. Customers asked us for features that would further improve the capacity management and scaling experience of their EMR on EC2 clusters, including their large, long-running clusters. We have been hard at work to meet those needs. The following are some of the key enhancements:

  • Enhanced customer transparency and flexibility with provisioning timeout for Spot Instances
  • Optimized task nodes scale-up for Amazon EMR on EC2 clusters launched with instance groups
  • Improved job resiliency with enhanced protection for Spark Drivers

Let’s dive deeper and discuss the new Amazon EMR on EC2 features in detail.

Enhanced customer transparency and flexibility with provisioning timeout for Spot Instances

Many Amazon EMR customers use EC2 Spot Instances for their EMR on EC2 clusters to reduce costs. Spot Instances are spare Amazon Elastic Compute Cloud (Amazon EC2) compute capacity offered at discounts of up to 90% compared to On-Demand pricing. Amazon EMR offers you the capability to scale your cluster either manually or by using Automatic Scaling. You can also use the Amazon EMR Managed Scaling feature to automatically resize your cluster based on workload and utilization.

To enhance the customer experience when scaling up using Spot Instances, for EMR on EC2 clusters launched using instance fleets, you can now specify a provisioning timeout for Spot Instances. A provisioning timeout will tell Amazon EMR to stop provisioning Spot Instance capacity if the cluster exceeds a specified time threshold during cluster scaling operations. You can configure the Spot instance provisioning timeout for clusters getting resized manually or using Amazon EMR Managed Scaling and Auto Scaling.

Additionally, to provide better transparency, when the timeout period expires, Amazon EMR will also automatically send events to an Amazon CloudWatch Events stream. With these CloudWatch events, you can create rules that match events according to a specified pattern, and then route the events to targets to take action. To learn more, please refer to Customize a provisioning timeout period for cluster resize in Amazon EMR.

Please find summarized below the experience for different scenario’s when you configure a provisioning timeout period during resize for your Amazon EMR on EC2 cluster

Scenario Experience
Amazon EMR is able to provision the desired Spot capacity before expiration of the provisioning timeout Amazon EMR automatically scales-up the cluster to the desired capacity and no action is needed from the customer
Amazon EMR is not able to provision any Spot capacity or only able to provision partial Spot capacity and the provisioning timeout has expired If Amazon EMR can’t provision the required Spot capacity and the provisioning timeout has expired, Amazon EMR will cancel the resize request and stops it’s attempts to provision additional Spot capacity. Amazon EMR will also publish events to an Amazon CloudWatch Events stream. Customers can use these events to create rules and take appropriate actions
If the Spot instances in your Amazon EMR on EC2 clusters are interrupted as Amazon EC2 needs them back Amazon EMR will automatically trigger a new resize request to rebalance your clusters by replacing instances with any of the available types in your cluster. Amazon EMR will also use the same provisioning resize timeout which was configured on the cluster. No action is needed from the customer.

You should consider the criticality of capacity availability when specifying the provisioning timeout value:

  • When your workload capacity availability is critical To ensure the desired capacity is available, we recommend configuring the resize provisioning timeout based on the time it takes to run the application and application SLAs. For example, if application SLA is 60 minutes and it takes 30 minutes for the application to complete, you should set the resize provisioning timeout to 30 minutes or less. Amazon EMR will try to provision to get Spot capacity until the timeout expires (30 minutes or less) and publish a CloudWatch event so that you can take appropriate actions.
  • When your workload is time flexible and capacity availability is not a factor If the workload is time flexible and capacity availability is not a factor, to ensure the highest likelihood for getting the desired Spot capacity, you can configure a higher timeout value for the resize provisioning timeout.

Optimized task nodes scale-up for Amazon EMR on EC2 clusters launched with Instance groups

Instance groups offer a simpler setup to launch EMR on EC2 clusters. Each cluster launched using instance groups can include up to 50 instance groups: one primary instance group that contains one EC2 instance, a core instance group that contains one or more EC2 instances, and up to 48 optional task instance groups. You can scale each instance group by adding and removing EC2 instances manually, or you can set up automatic scaling. You can also use the Amazon EMR Managed Scaling feature to automatically resize your cluster based on workload and utilization.

To enhance the customer experience for instance groups on EMR on EC2 clusters when scaling up task nodes using Amazon EMR Managed Scaling, we have enhanced the managed scaling algorithm to choose the task instance groups that have the highest likelihood of acquiring capacity. Furthermore, when managed scaling is not able to acquire capacity with a single task instance group, to reduce any scale-up delays, Amazon EMR will automatically switch to another task group and fulfill the capacity by using multiple task instance groups. Consequently, the more flexible you are about your instance types, the higher the chances of provisioning capacity. To learn more, refer to Best practices for instance and Availability Zone flexibility.

Improved job resiliency with enhanced protection for Spark Drivers

In 2022, to improve the job resiliency when using Amazon EMR Managed Scaling, we enhanced managed scaling to be Spark shuffle data aware, which prevents scale-down of instances that store intermediate shuffle data for Apache Spark. This helps prevents job reattempts and recomputations, which leads to better performance and lower cost.

To further improve job resiliency when using Amazon EMR Managed Scaling, we have further enhanced managed scaling to be Spark Driver aware, which ensures that during cluster scale-down, Amazon EMR Managed Scaling prioritizes the scale-down of nodes that don’t have an active Spark Driver running on them. This helps minimize job failures and job retries, helping further improve performance and reduce costs. This enhancement is enabled by default for EMR clusters using Amazon EMR versions 5.34.0 and later, and Amazon EMR versions 6.4.0 and later.

To confirm which nodes in your cluster are running Spark Driver, you can visit the Spark History Server and filter for the driver on the Executors tab of your Spark application ID.

Conclusion

In this post, we highlighted the improvements that we made in capacity management and Amazon EMR Managed Scaling for EMR on EC2 clusters. We focused on improving job resiliency, enhanced flexibility and transparency when provisioning Spot Instances, and optimizing the scale-up experience when using managed scaling with instance groups on Amazon EMR on EC2 clusters. Although we have launched multiple features so far in 2023 and the pace of innovation continues to accelerate, it remains day 1 and we look forward to hearing from you on how these features help you unlock more value for your organizations. We invite you to try these new features and get in touch with us through your AWS account team if you have further comments.


About the authors

Sushant Majithia is a Principal Product Manager for EMR at AWS.

Ankur Goyal is a SDM with Amazon EMR Big Data Platform team. He builds large scale distributed applications and cluster optimization algorithms. Ankur is interested in topics of Analytics, Machine Learning and Forecasting.

Matthew Liem is a Senior Solution Architecture Manager at AWS.

Tarun Chanana is an SDM with Amazon EMR Big Data Platform team.

Introducing Amazon MSK as a source for Amazon OpenSearch Ingestion

Post Syndicated from Muthu Pitchaimani original https://aws.amazon.com/blogs/big-data/introducing-amazon-msk-as-a-source-for-amazon-opensearch-ingestion/

Ingesting a high volume of streaming data has been a defining characteristic of operational analytics workloads with Amazon OpenSearch Service. Many of these workloads involve either self-managed Apache Kafka or Amazon Managed Streaming for Apache Kafka (Amazon MSK) to satisfy their data streaming needs. Consuming data from Amazon MSK and writing to OpenSearch Service has been a challenge for customers. AWS Lambda, custom code, Kafka Connect, and Logstash have been used for ingesting this data. These methods involve tools that must be built and maintained. In this post, we introduce Amazon MSK as a source to Amazon OpenSearch Ingestion, a serverless, fully managed, real-time data collector for OpenSearch Service that makes this ingestion even easier.

Solution overview

The following diagram shows the flow from data sources to Amazon OpenSearch Service.

The flow contains the following steps:

  1. Data sources produce data and send that data to Amazon MSK
  2. OpenSearch Ingestion consumes the data from Amazon MSK.
  3. OpenSearch Ingestion transforms, enriches, and writes the data into OpenSearch Service.
  4. Users search, explore, and analyze the data with OpenSearch Dashboards.

Prerequisites

You will need a provisioned MSK cluster created with appropriate data sources. The sources, as producers, write data into Amazon MSK. The cluster should be created with the appropriate Availability Zone, storage, compute, security and other configurations to suit your workload needs. To provision your MSK cluster and have your sources producing data, see Getting started using Amazon MSK.

As of this writing, OpenSearch Ingestion supports Amazon MSK provisioned, but not Amazon MSK Serverless. However, OpenSearch Ingestion can reside in the same or different account where Amazon MSK is present. OpenSearch Ingestion uses AWS PrivateLink to read data, so you must turn on multi-VPC connectivity on your MSK cluster. For more information, see Amazon MSK multi-VPC private connectivity in a single Region. OpenSearch Ingestion can write data to Amazon Simple Storage Service (Amazon S3), provisioned OpenSearch Service, and Amazon OpenSearch Service. In this solution, we use a provisioned OpenSearch Service domain as a sink for OSI. Refer to Getting started with Amazon OpenSearch Service to create a provisioned OpenSearch Service domain. You will need appropriate permission to read data from Amazon MSK and write data to OpenSearch Service. The following sections outline the required permissions.

Permissions required

To read from Amazon MSK and write to Amazon OpenSearch Service, you need to create a an AWS Identity and Access Management (IAM) role used by Amazon OpenSearch Ingestion. In this post we use a role called pipeline-Role for this purpose. To create this role please see Creating IAM roles.

Reading from Amazon MSK

OpenSearch Ingestion will need permission to create a PrivateLink connection and other actions that can be performed on your MSK cluster. Edit your MSK cluster policy to include the following snippet with appropriate permissions. If your OpenSearch Ingestion pipeline resides in an account different from your MSK cluster, you will need a second section to allow this pipeline. Use proper semantic conventions when providing the cluster, topic, and group permissions and remove the comments from the policy before using.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": "osis-pipelines.aws.internal"
      },
      "Action": [
        "kafka:CreateVpcConnection",
        "kafka:GetBootstrapBrokers",
        "kafka:DescribeCluster"
      ],
      # Change this to your msk arn
      "Resource": "arn:aws:kafka:us-east-1:XXXXXXXXXXXX:cluster/test-cluster/xxxxxxxx-xxxx-xx"
    },    
    ### Following permissions are required if msk cluster is in different account than osi pipeline
    {
      "Effect": "Allow",
      "Principal": {
        # Change this to your sts role arn used in the pipeline
        "AWS": "arn:aws:iam:: XXXXXXXXXXXX:role/PipelineRole"
      },
      "Action": [
        "kafka-cluster:*",
        "kafka:*"
      ],
      "Resource": [
        # Change this to your msk arn
        "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:cluster/test-cluster/xxxxxxxx-xxxx-xx",
        # Change this as per your cluster name & kafka topic name
        "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:topic/test-cluster/xxxxxxxx-xxxx-xx/*",
        # Change this as per your cluster name
        "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:group/test-cluster/*"
      ]
    }
  ]
}

Edit the pipeline role’s inline policy to include the following permissions. Ensure that you have removed the comments before using the policy.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:Connect",
                "kafka-cluster:AlterCluster",
                "kafka-cluster:DescribeCluster",
                "kafka:DescribeClusterV2",
                "kafka:GetBootstrapBrokers"
            ],
            "Resource": [
                # Change this to your msk arn
                "arn:aws:kafka:us-east-1:XXXXXXXXXXXX:cluster/test-cluster/xxxxxxxx-xxxx-xx"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:*Topic*",
                "kafka-cluster:ReadData"
            ],
            "Resource": [
                # Change this to your kafka topic and cluster name
                "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:topic/test-cluster/xxxxxxxx-xxxx-xx/topic-to-consume"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "kafka-cluster:AlterGroup",
                "kafka-cluster:DescribeGroup"
            ],
            "Resource": [
                # change this as per your cluster name
                "arn:aws:kafka:us-east-1: XXXXXXXXXXXX:group/test-cluster/*"
            ]
        }
    ]
}

Writing to OpenSearch Service

In this section, you provide the pipeline role with necessary permissions to write to OpenSearch Service. As a best practice, we recommend using fine-grained access control in OpenSearch Service. Use OpenSearch dashboards to map a pipeline role to an appropriate backend role. For more information on mapping roles to users, see Managing permissions. For example, all_access is a built-in role that grants administrative permission to all OpenSearch functions. When deploying to a production environment, ensure that you use a role with enough permissions to write to your OpenSearch domain.

Creating OpenSearch Ingestion pipelines

The pipeline role now has the correct set of permissions to read from Amazon MSK and write to OpenSearch Service. Navigate to the OpenSearch Service console, choose Pipelines, then choose Create pipeline.

Choose a suitable name for the pipeline. and se the pipeline capacity with appropriate minimum and maximum OpenSearch Compute Unit (OCU). Then choose ‘AWS-MSKPipeline’ from the dropdown menu as shown below.

Use the provided template to fill in all the required fields. The snippet in the following section shows the fields that needs to be filled in red.

Configuring Amazon MSK source

The following sample configuration snippet shows every setting you need to get the pipeline running:

msk-pipeline: 
  source: 
    kafka: 
      acknowledgments: true                     # Default is false  
      topics: 
         - name: "<topic name>" 
           group_id: "<consumer group id>" 
           serde_format: json                   # Remove, if Schema Registry is used. (Other option is plaintext)  
 
           # Below defaults can be tuned as needed 
           # fetch_max_bytes: 52428800          Optional 
           # fetch_max_wait: 500                Optional (in msecs) 
           # fetch_min_bytes: 1                 Optional (in MB) 
           # max_partition_fetch_bytes: 1048576 Optional 
           # consumer_max_poll_records: 500     Optional                                
           # auto_offset_reset: "earliest"      Optional (other option is "earliest") 
           # key_mode: include_as_field         Optional (other options are include_as_field, discard)  
 
       
           serde_format: json                   # Remove, if Schema Registry is used. (Other option is plaintext)   
 
      # Enable this configuration if Glue schema registry is used            
      # schema:                                 
      #   type: aws_glue 
 
      aws: 
        # Provide the Role ARN with access to MSK. This role should have a trust relationship with osis-pipelines.amazonaws.com 
        # sts_role_arn: "arn:aws:iam::XXXXXXXXXXXX:role/Example-Role" 
        # Provide the region of the domain. 
        # region: "us-west-2" 
        msk: 
          # Provide the MSK ARN.  
          arn: "arn:aws:kafka:us-west-2:XXXXXXXXXXXX:cluster/msk-prov-1/id" 
 
  sink: 
      - opensearch: 
          # Provide an AWS OpenSearch Service domain endpoint 
          # hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com" ] 
          aws: 
          # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com 
          # sts_role_arn: "arn:aws:iam::XXXXXXXXXXXX:role/Example-Role" 
          # Provide the region of the domain. 
          # region: "us-east-1" 
          # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection 
          # serverless: true 
          # index name can be auto-generated from topic name 
          index: "index_${getMetadata(\"kafka_topic\")}-%{yyyy.MM.dd}" 
          # Enable 'distribution_version' setting if the AWS OpenSearch Service domain is of version Elasticsearch 6.x 
          # distribution_version: "es6" 
          # Enable the S3 DLQ to capture any failed requests in Ohan S3 bucket 
          # dlq: 
            # s3: 
            # Provide an S3 bucket 

We use the following parameters:

  • acknowledgements – Set to true for OpenSearch Ingestion to ensure that the data is delivered to the sinks before committing the offsets in Amazon MSK. The default value is set to false.
  • name – This specifies topic OpenSearch Ingestion can read from. You can read a maximum of four topics per pipeline.
  • group_id – This parameter specifies that the pipeline is part of the consumer group. With this setting, a single consumer group can be scaled to as many pipelines as needed for very high throughput.
  • serde_format – Specifies a deserialization method to be used for the data read from Amazon MSK. The options are JSON and plaintext.
  • AWS sts_role_arn and OpenSearch sts_role_arn – Specifies the role OpenSearch Ingestion uses for reading and writing. Specify the ARN of the role you created from the last section. OpenSearch Ingestion currently uses the same role for reading and writing.
  • MSK arn – Specifies the MSK cluster to consume data from.
  • OpenSearch host and index – Specifies the OpenSearch domain URL and where the index should write.

When you have configured the Kafka source, choose the network access type and log publishing options. Public pipelines do not involve PrivateLink and they will not incur a cost associated with PrivateLink. Choose Next and review all configurations. When you are satisfied, choose Create pipeline.

Log in to OpenSearch Dashboards to see your indexes and search the data.

Recommended compute units (OCUs) for the MSK pipeline

Each compute unit has one consumer per topic. Brokers will balance partitions among these consumers for a given topic. However, when the number of partitions is greater than the number of consumers, Amazon MSK will host multiple partitions on every consumer. OpenSearch Ingestion has built-in auto scaling to scale up or down based on CPU usage or number of pending records in the pipeline. For optimal performance, partitions should be distributed across many compute units for parallel processing. If topics have a large number of partitions, for example, more than 96 (maximum OCUs per pipeline), we recommend configuring a pipeline with 1–96 OCUs because it will auto scale as needed. If a topic has a low number of partitions, for example, less than 96, then keep the maximum compute unit to same as the number of partitions. When pipeline has more than one topic, user can pick a topic with highest number of partitions as a reference to configure maximum computes units. By adding another pipeline with a new set of OCUs to the same topic and consumer group, you can scale the throughput almost linearly.

Clean up

To avoid future charges, clean up any unused resources from your AWS account.

Conclusion

In this post, you saw how to use Amazon MSK as a source for OpenSearch Ingestion. This not only addresses the ease of data consumption from Amazon MSK, but it also relieves you of the burden of self-managing and manually scaling consumers for varying and unpredictable high-speed, streaming operational analytics data. Please refer to the ‘sources’ list under ‘supported plugins’ section for exhaustive list of sources from which you can ingest data.


About the authors

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search applications and solutions. Muthu is interested in the topics of networking and security, and is based out of Austin, Texas.

Arjun Nambiar is a Product Manager with Amazon OpenSearch Service. He focusses on ingestion technologies that enable ingesting data from a wide variety of sources into Amazon OpenSearch Service at scale. Arjun is interested in large scale distributed systems and cloud-native technologies and is based out of Seattle, Washington.

Raj Sharma is a Sr. SDM with Amazon OpenSearch Service. He builds large-scale distributed applications and solutions. Raj is interested in the topics of Analytics, databases, networking and security, and is based out of Palo Alto, California.

Query your Iceberg tables in data lake using Amazon Redshift (Preview)

Post Syndicated from Rohit Bansal original https://aws.amazon.com/blogs/big-data/query-your-iceberg-tables-in-data-lake-using-amazon-redshift-preview/

Amazon Redshift is a fast, fully managed petabyte-scale cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing business intelligence (BI) tools. Tens of thousands of customers today use Amazon Redshift to analyze exabytes of data and run analytical queries, making it the most widely used cloud data warehouse. Amazon Redshift is available in both serverless and provisioned configurations.

Amazon Redshift enables you to directly access data stored in Amazon Simple Storage Service (Amazon S3) using SQL queries and join data across your data warehouse and data lake. With Amazon Redshift, you can query the data in your S3 data lake using a central AWS Glue metastore from your Redshift data warehouse.

Amazon Redshift supports querying a wide variety of data formats, such as CSV, JSON, Parquet, and ORC, and table formats like Apache Hudi and Delta. Amazon Redshift also supports querying nested data with complex data types such as struct, array, and map.

With this capability, Amazon Redshift extends your petabyte-scale data warehouse to an exabyte-scale data lake on Amazon S3 in a cost-effective manner.

Apache Iceberg is the latest table format that is supported now in preview by Amazon Redshift. In this post, we show you how to query Iceberg tables using Amazon Redshift, and explore Iceberg support and options.

Solution overview

Apache Iceberg is an open table format for very large petabyte-scale analytic datasets. Iceberg manages large collections of files as tables, and it supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. The Iceberg specification allows seamless table evolution such as schema and partition evolution, and its design is optimized for usage on Amazon S3.

Iceberg stores the metadata pointer for all the metadata files. When a SELECT query is reading an Iceberg table, the query engine first goes to the Iceberg catalog, then retrieves the entry of the location of the latest metadata file, as shown in the following diagram.

Amazon Redshift now provides support for Apache Iceberg tables, which allows data lake customers to run read-only analytics queries in a transactionally consistent way. This enables you to easily manage and maintain your tables on transactional data lakes.

Amazon Redshift supports Apache Iceberg’s native schema and partition evolution capabilities using the AWS Glue Data Catalog, eliminating the need to alter table definitions to add new partitions or to move and process large amounts of data to change the schema of an existing data lake table. Amazon Redshift uses the column statistics stored in the Apache Iceberg table metadata to optimize its query plans and reduce the file scans required to run queries.

In this post, we use the Yellow taxi public dataset from NYC Taxi & Limousine Commission as our source data. The dataset contains data files in Apache Parquet format on Amazon S3. We use Amazon Athena to convert this Parquet dataset and then use Amazon Redshift Spectrum to query and join with a Redshift local table, perform row-level deletes and updates and partition evolution, all coordinated through the AWS Glue Data Catalog in an S3 data lake.

Prerequisites

You should have the following prerequisites:

Convert Parquet data to an Iceberg table

For this post, you need the Yellow taxi public dataset from the NYC Taxi & Limousine Commission available in Iceberg format. You can download the files and then use Athena to convert the Parquet dataset into an Iceberg table, or refer to Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue blog post to create the Iceberg table.

In this post, we use Athena to convert the data. Complete the following steps:

  1. Download the files using the previous link or use the AWS Command Line Interface (AWS CLI) to copy the files from the public S3 bucket for year 2020 and 2021 to your S3 bucket using the following command:
    aws s3 cp "s3://nyc-tlc/trip data/" s3://<Your S3 bucket name>/Parquet/  --exclude "*"  --include  "yellow_tripdata_2020*" –recursive
    aws s3 cp "s3://nyc-tlc/trip data/" s3://<Your S3 bucket name>/Parquet/  --exclude "*"  --include  "yellow_tripdata_2021*" –recursive

For more information, refer to Setting up the Amazon Redshift CLI.

  1. Create a database Icebergdb and create a table using Athena pointing to the Parquet format files using the following statement:
    CREATE DATABASE Icebergdb; 
    CREATE EXTERNAL TABLE icebergdb.nyc_taxi_yellow_parquet(
    	vendorid int,
    	tpep_pickup_datetime timestamp,
    	tpep_dropoff_datetime timestamp,
    	passenger_count bigint,
    	trip_distance double,
    	ratecodeid bigint,
    	store_and_fwd_flag string,
    	pulocationid int,
    	dolocationid int,
    	payment_type integer,
    	fare_amount double,
    	extra double,
    	mta_tax double,
    	tip_amount double,
    	tolls_amount double,
    	improvement_surcharge double,
    	total_amount double,
    	congestion_surcharge double,
    	airport_fee double
    )
    STORED AS PARQUET
    LOCATION 's3://<Your S3 Bucket>/Parquet/’

  2. Validate the data in the Parquet table using the following SQL:
    SELECT vendorid,
    	tpep_pickup_datetime,
    	tpep_dropoff_datetime,
    	trip_distance,
    	fare_amount,
    	tip_amount,
    	tolls_amount,
    	total_amount,
    	congestion_surcharge,
    	airport_fee
    FROM icebergdb.nyc_taxi_yellow_parquet
    limit 5;

  3. Create an Iceberg table in Athena with the following code. You can see the table type properties as an Iceberg table with Parquet format and snappy compression in the following create table statement. You need to update the S3 location before running the SQL. Also note that the Iceberg table is partitioned with the Year key.
    CREATE  TABLE nyc_taxi_yellow_iceberg(
      vendorid int, 
      tpep_pickup_datetime timestamp, 
      tpep_dropoff_datetime timestamp, 
      passenger_count bigint, 
      trip_distance double, 
      ratecodeid bigint, 
      store_and_fwd_flag string, 
      pulocationid int, 
      dolocationid int, 
      payment_type bigint, 
      fare_amount double, 
      extra double, 
      mta_tax double, 
      tip_amount double, 
      tolls_amount double, 
      improvement_surcharge double, 
      total_amount double, 
      congestion_surcharge double, 
      airport_fee double)
    PARTITIONED BY (year(tpep_pickup_datetime))
    LOCATION ‘s3://<Your S3 bucket name>/iceberg/iceberg'
    TBLPROPERTIES (
      'table_type'='iceberg',
      'write_compression'='snappy',
      'format'='parquet');

  4. After you create the table, load the data into the Iceberg table using the previously loaded Parquet table nyc_taxi_yellow_parquet with the following SQL:
    insert into nyc_taxi_yellow_iceberg (
    	vendorid,tpep_pickup_datetime,
    	tpep_dropoff_datetime,
    	passenger_count,trip_distance,
    	ratecodeid,store_and_fwd_flag,
    	pulocationid,dolocationid,
    	payment_type,fare_amount,
    	extra,mta_tax,tip_amount,
    	tolls_amount,total_amount,
    	congestion_surcharge,airport_fee
    	)
    select vendorid,tpep_pickup_datetime,
    	tpep_dropoff_datetime,
    	passenger_count,trip_distance,
    	ratecodeid,store_and_fwd_flag,
    	pulocationid,dolocationid,
    	payment_type,fare_amount,
    	extra,mta_tax,tip_amount,
    	tolls_amount,total_amount,
    	congestion_surcharge,airport_fee
    from nyc_taxi_yellow_parquet;

  5. When the SQL statement is complete, validate the data in the Iceberg table nyc_taxi_yellow_iceberg. This step is required before moving to the next step.
    SELECT * FROM nyc_taxi_yellow_iceberg LIMIT 5;

  6. You can validate that the nyc_taxi_yellow_iceberg table is in Iceberg format table and partitioned on the Year column using the following command:
    SHOW CREATE TABLE nyc_taxi_yellow_iceberg;

Create an external schema in Amazon Redshift

In this section, we demonstrate how to create an external schema in Amazon Redshift pointing to the AWS Glue database icebergdb to query the Iceberg table nyc_taxi_yellow_iceberg that we saw in the previous section using Athena.

Log in to the Redshift via Query Editor v2 or a SQL client and run the following command (note that the AWS Glue database icebergdb and Region information is being used):

CREATE external schema spectrum_iceberg_schema
from data catalog
database 'icebergdb'
region 'us-east-1'
iam_role default;

To learn about creating external schemas in Amazon Redshift, refer to create external schema

After you create the external schema spectrum_iceberg_schema, you can query the Iceberg table in Amazon Redshift.

Query the Iceberg table in Amazon Redshift

Run the following query in Query Editor v2. Note that spectrum_iceberg_schema is the name of the external schema created in Amazon Redshift and nyc_taxi_yellow_iceberg is the table in the AWS Glue database used in the query:

SELECT * FROM"dev"."spectrum_iceberg_schema"."nyc_taxi_yellow_iceberg";

The query data output in the following screenshot shows that the AWS Glue table with Iceberg format is queryable using Redshift Spectrum.

Check the explain plan of querying the Iceberg table

You can use the following query to get the explain plan output, which shows the format is ICEBERG:

EXPLAIN 
SELECT vendorid,count(*) 
FROM "dev"."spectrum_iceberg_schema"."nyc_taxi_yellow_iceberg"
GROUP BY vendorid; 

Validate updates for data consistency

After the update is complete on the Iceberg table, you can query Amazon Redshift to see the transactionally consistent view of the data. Let’s run a query by picking a vendorid and for a certain pick-up and drop-off:

SELECT * FROM nyc_taxi_yellow_iceberg
WHERE vendorid=1
AND tpep_pickup_datetime=cast('2021-06-24 21:53:26' AS timestamp)
AND tpep_dropoff_datetime=cast('2021-06-24 22:02:46'AS timestamp)
LIMIT 5;

Next, update the value of passenger_count to 4 and trip_distance to 9.4 for a vendorid and certain pick-up and drop-off dates in Athena:

UPDATE nyc_taxi_yellow_iceberg
SET passenger_count=4,trip_distance=9.4
WHERE vendorid=1
AND tpep_pickup_datetime=cast('2021-06-24 21:53:26' AS timestamp)
AND tpep_dropoff_datetime=cast('2021-06-24 22:02:46'AS timestamp);

Finally, run the following query in Query Editor v2 to see the updated value of passenger_count and trip_distance:

SELECT * 
FROM "dev"."spectrum_iceberg_schema"."nyc_taxi_yellow_iceberg"
WHERE vendorid=1
AND tpep_pickup_datetime=cast('2021-06-24 21:53:26' AS timestamp)
AND tpep_dropoff_datetime=cast('2021-06-24 22:02:46' AS timestamp)
LIMIT 5;

As shown in the following screenshot, the update operations on the Iceberg table are available in Amazon Redshift.

Create a unified view of the local table and historical data in Amazon Redshift

As a modern data architecture strategy, you can organize historical data or less frequently accessed data in the data lake and keep frequently accessed data in the Redshift data warehouse. This provides the flexibility to manage analytics at scale and find the most cost-effective architecture solution.

In this example, we load 2 years of data in a Redshift table; the rest of the data stays on the S3 data lake because that dataset is less frequently queried.

  1. Use the following code to load 2 years of data in the nyc_taxi_yellow_recent table in Amazon Redshift, sourcing from the Iceberg table:
    CREATE TABLE nyc_taxi_yellow_recent
    AS
    SELECT *
    FROM "dev"."spectrum_iceberg_schema"."nyc_taxi_yellow_iceberg"
    WHERE extract(year from tpep_pickup_datetime)>2020;

  2. Next, you can remove the last 2 years of data from the Iceberg table using the following command in Athena because you loaded the data into a Redshift table in the previous step:
    DELETE FROM nyc_taxi_yellow_iceberg 
    WHERE EXTRACT(year from tpep_pickup_datetime)>2020;

After you complete these steps, the Redshift table has 2 years of the data and the rest of the data is in the Iceberg table in Amazon S3.

  1. Create a view using the nyc_taxi_yellow_iceberg Iceberg table and nyc_taxi_yellow_recent table in Amazon Redshift:
    create or replace view nyc_taxi_yellow as
    select 'nyc_taxi_yellow_iceberg' as source,vendorid,tpep_pickup_datetime,
        tpep_dropoff_datetime,
        passenger_count,trip_distance,
        ratecodeid,store_and_fwd_flag,
        pulocationid,dolocationid,
        payment_type,fare_amount,
        extra,mta_tax,tip_amount,
        tolls_amount,total_amount,
        congestion_surcharge,airport_fee
    from "dev"."spectrum_iceberg_schema"."nyc_taxi_yellow_iceberg"
    union all
    select 'nyc_taxi_yellow_recent' as source,vendorid,tpep_pickup_datetime,
        tpep_dropoff_datetime,
        passenger_count,trip_distance,
        ratecodeid,store_and_fwd_flag,
        pulocationid,dolocationid,
        payment_type,fare_amount,
        extra,mta_tax,tip_amount,
        tolls_amount,total_amount,
        congestion_surcharge,airport_fee
    from  public.nyc_taxi_yellow_recent
    with no schema binding;

  2. Now query the view, depending on the filter conditions, Redshift Spectrum will scan either the Iceberg data, the Redshift table, or both. The following example query returns a number of records from each of the source tables by scanning both tables:
    SELECT source,count(1)
    FROM  nyc_taxi_yellow
    GROUP BY source;

Partition evolution

Iceberg uses hidden partitioning, which means you don’t need to manually add partitions for your Apache Iceberg tables. New partition values or new partition specs (add or remove partition columns) in Apache Iceberg tables are automatically detected by Amazon Redshift and no manual operation is needed to update partitions in the table definition. The following example demonstrates this.

In our example, if the Iceberg table nyc_taxi_yellow_iceberg was originally partitioned by year and later the column vendorid was added as an additional partition column, then Amazon Redshift can seamlessly query the Iceberg table nyc_taxi_yellow_iceberg with two different partition schemes over a period of time.

Considerations when querying Iceberg tables using Amazon Redshift

During the preview period, consider the following when using Amazon Redshift with Iceberg tables:

  • Only Iceberg tables defined in the AWS Glue Data Catalog are supported.
  • CREATE or ALTER external table commands are not supported, which means the Iceberg table should already exist in an AWS Glue database.
  • Time travel queries are not supported.
  • Iceberg versions 1 and 2 are supported. For more details on Iceberg format versions, refer to Format Versioning.
  • For a list of supported data types with Iceberg tables, refer to Supported data types with Apache Iceberg tables (preview).
  • Pricing for querying an Iceberg table is the same as accessing any other data formats using Amazon Redshift.

For additional details on considerations for Iceberg format tables preview, refer to Using Apache Iceberg tables with Amazon Redshift (preview).

Customer feedback

“Tinuiti, the largest independent performance marketing firm, handles large volumes of data on a daily basis and must have a robust data lake and data warehouse strategy for our market intelligence teams to store and analyze all our customer data in an easy, affordable, secure, and robust way,” says Justin Manus, Chief Technology Officer at Tinuiti. “Amazon Redshift’s support for Apache Iceberg tables in our data lake, which is the single source of truth, addresses a critical challenge in optimizing performance and accessibility and further simplifies our data integration pipelines to access all the data ingested from different sources and to power our customers’ brand potential.”

Conclusion

In this post, we showed you an example of querying an Iceberg table in Redshift using files stored in Amazon S3, cataloged as a table in the AWS Glue Data Catalog, and demonstrated some of the key features like efficient row-level update and delete, and the schema evolution experience for users to unlock the power of big data using Athena.

You can use Amazon Redshift to run queries on data lake tables in various files and table formats, such as Apache Hudi and Delta Lake, and now with Apache Iceberg (preview), which provides additional options for your modern data architectures needs.

We hope this gives you a great starting point for querying Iceberg tables in Amazon Redshift.


About the Authors

Rohit Bansal is an Analytics Specialist Solutions Architect at AWS. He specializes in Amazon Redshift and works with customers to build next-generation analytics solutions using other AWS Analytics services.

Satish Sathiya is a Senior Product Engineer at Amazon Redshift. He is an avid big data enthusiast who collaborates with customers around the globe to achieve success and meet their data warehousing and data lake architecture needs.

Ranjan Burman is an Analytics Specialist Solutions Architect at AWS. He specializes in Amazon Redshift and helps customers build scalable analytical solutions. He has more than 16 years of experience in different database and data warehousing technologies. He is passionate about automating and solving customer problems with cloud solutions.

Deploy Amazon OpenSearch Serverless with Terraform

Post Syndicated from Joshua Luo original https://aws.amazon.com/blogs/big-data/deploy-amazon-opensearch-serverless-with-terraform/

Amazon OpenSearch Serverless provides the search and analytical functionality of OpenSearch without the manual overhead of configuring, managing, and scaling OpenSearch clusters. It automatically scales the resources based on your workload, and you only pay for the resources consumed. Managing OpenSearch Serverless is simple, but with infrastructure as code (IaC) software like Terraform, you can simplify your resource management even more.

This post demonstrates how to use Terraform to create, deploy, and clean up OpenSearch Serverless infrastructure.

Solution overview

To create and deploy an OpenSearch Serverless collection with security and access policies using Terraform, you need to follow these steps:

  1. Initialize the Terraform configuration.
  2. Create an encryption policy.
  3. Create an OpenSearch Serverless collection.
  4. Create a network policy.
  5. Create a virtual private cloud (VPC) endpoint.
  6. Create a data access policy.
  7. Deploy using Terraform.

Prerequisites

This post assumes that you’re familiar with GitHub and Git commands.

For this walkthrough, you need the following:

Initialize the Terraform configuration

The sample code is available in the Terraform GitHub repo in the terraform-provider-aws/examples/opensearchserverless directory. This configuration will get you started with OpenSearch Serverless. First, clone the repository to your workstation and navigate to the directory:

$ git clone https://github.com/hashicorp/terraform-provider-aws.git && \ 
cd ./terraform-provider-aws/examples/opensearchserverless

Initialize the configuration to install the aws provider by running the following command:

$ terraform init

The Terraform configuration first defines the version of Terraform required and configures the AWS provider to launch resources in the Region defined by the aws_region variable:

# Require Terraform 0.12 or greater
terraform {
  required_version = ">= 0.12"
}

# Set AWS provider region
provider "aws" {
  region = var.aws_region
}

The variables used in this Terraform configuration are defined in the variables.tf file. This post assumes the default values are used:

variable "aws_region" {
  description = "The AWS region to create things in."
  default     = "us-east-1"
}

variable "collection_name" {
  description = "Name of the OpenSearch Serverless collection."
  default     = "example-collection"
}

Create an encryption policy

Now that the provider is installed and configured, the Terraform configuration moves on to defining OpenSearch Serverless policies for security. OpenSearch Serverless uses AWS Key Management Service (AWS KMS) to encrypt your data. The encryption is managed by an encryption policy. To create an encryption policy, use the aws_opensearchserverless_security_policy resource, which has a name parameter, a type of encryption, a JSON string that defines the policy, and an optional description:

# Creates an encryption security policy
resource "aws_opensearchserverless_security_policy" "encryption_policy" {
  name        = "example-encryption-policy"
  type        = "encryption"
  description = "encryption policy for ${var.collection_name}"
  policy = jsonencode({
    Rules = [
      {
        Resource = [
          "collection/${var.collection_name}"
        ],
        ResourceType = "collection"
      }
    ],
    AWSOwnedKey = true
  })
}

This encryption policy is named example-encryption-policy, applies to a collection named example-collection, and uses an AWS owned key to encrypt the data.

Create an OpenSearch Serverless collection

You can organize your OpenSearch indexes into a logical grouping called a collection. Create a collection using the aws_opensearchserverless_collection resource, which has a name parameter, and optionally, description, tags, and type:

# Creates a collection
resource "aws_opensearchserverless_collection" "collection" {
  name = var.collection_name

  depends_on = [aws_opensearchserverless_security_policy.encryption_policy]
}

This collection is named example-collection. If type is not specified, a time series collection is created. Supported collection types can be found in the Terraform documentation for the aws_opensearchserverless_collection resource. OpenSearch Serverless requires encryption at rest, so an applicable encryption policy is required before a collection can be created. The Terraform configuration explicitly defines this dependency using the depends_on meta-argument. Errors can arise if this dependency is not defined.

Now that a collection has been created with an AWS owned KMS key, the Terraform configuration goes on to define the network and data access policy to configure access to the collection.

Create a network policy

A network policy allows access to your collection either over the public internet or through OpenSearch Serverless-managed VPC endpoints. Similar to the encryption policy, to create a network policy, use the aws_opensearchserverless_security_policy resource, which has a name parameter, a type of network, a JSON string that defines the policy, and an optional description:

# Creates a network security policy
resource "aws_opensearchserverless_security_policy" "network_policy" {
  name        = "example-network-policy"
  type        = "network"
  description = "public access for dashboard, VPC access for collection endpoint"
  policy = jsonencode([
    {
      Description = "VPC access for collection endpoint",
      Rules = [
        {
          ResourceType = "collection",
          Resource = [
            "collection/${var.collection_name}"
          ]
        }
      ],
      AllowFromPublic = false,
      SourceVPCEs = [
        aws_opensearchserverless_vpc_endpoint.vpc_endpoint.id
      ]
    },
    {
      Description = "Public access for dashboards",
      Rules = [
        {
          ResourceType = "dashboard"
          Resource = [
            "collection/${var.collection_name}"
          ]
        }
      ],
      AllowFromPublic = true
    }
  ])
}

This network policy is named example-network-policy and applies to the collection named example-collection. This policy only allows access to the collection’s OpenSearch endpoint through a VPC endpoint, but allows public access to the OpenSearch Dashboards endpoint.

You’ll notice the VPC endpoint has not been defined yet, but it is referenced in the network policy. Terraform determines this dependency automatically and will not create the network policy until the VPC endpoint has been created.

Create a VPC endpoint

A VPC endpoint enables you to privately access your OpenSearch Serverless collection using AWS PrivateLink (for more information, refer to Access AWS services through AWS PrivateLink). Create a VPC endpoint using the aws_opensearchserverless_vpc_endpoint resource, where you define name, vpc_id, subnet_ids , and optionally, security_group_ids:

# Creates a VPC endpoint
resource "aws_opensearchserverless_vpc_endpoint" "vpc_endpoint" {
  name               = "example-vpc-endpoint"
  vpc_id             = aws_vpc.vpc.id
  subnet_ids         = [aws_subnet.subnet.id]
  security_group_ids = [aws_security_group.security_group.id]
}

Creating a VPC and all the required networking resources is out of scope for this post, but the minimum required VPC resources are created here in a separate file to demonstrate the VPC endpoint functionality. Refer to Getting Started with Amazon VPC to learn more.

Create a data access policy

The configuration defines a data source that looks up information about the context Terraform is currently running in. This data source is used when defining the data access policy. More information can be found in the Terraform documentation for aws_caller_identity.

# Gets access to the effective Account ID in which Terraform is authorized
data "aws_caller_identity" "current" {}

A data access policy allows you to define who has access to collections and indexes. The data access policy is defined using the aws_opensearchserverless_access_policy resource, which has a name parameter, a type parameter set to data, a JSON string that defines the policy, and an optional description:

# Creates a data access policy
resource "aws_opensearchserverless_access_policy" "data_access_policy" {
  name        = "example-data-access-policy"
  type        = "data"
  description = "allow index and collection access"
  policy = jsonencode([
    {
      Rules = [
        {
          ResourceType = "index",
          Resource = [
            "index/${var.collection_name}/*"
          ],
          Permission = [
            "aoss:*"
          ]
        },
        {
          ResourceType = "collection",
          Resource = [
            "collection/${var.collection_name}"
          ],
          Permission = [
            "aoss:*"
          ]
        }
      ],
      Principal = [
        data.aws_caller_identity.current.arn
      ]
    }
  ])
}

This data access policy allows the current AWS role or user to perform collection-related actions on the collection named example-collection and index-related actions on the indexes in the collection.

Deploy using Terraform

Now that you have configured the necessary resources, apply the configuration using terraform apply. Before creating the resources, Terraform will describe all the resources that will be created so you can verify your configuration:

$ terraform apply

...

Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
  + create

Terraform will perform the following actions:

...

Plan: 13 to add, 0 to change, 0 to destroy.

Changes to Outputs:
  + collection_enpdoint = (known after apply)
  + dashboard_endpoint  = (known after apply)

Do you want to perform these actions?
  Terraform will perform the actions described above.
  Only 'yes' will be accepted to approve.

  Enter a value:

Answer yes to proceed.

If this is the first OpenSearch Serverless collection in your account, applying the configuration may take over 10 minutes because Terraform waits for the collection to become active.

Apply complete! Resources: 13 added, 0 changed, 0 destroyed.

Outputs:

collection_enpdoint = "..."
dashboard_endpoint = "..."

You have now deployed an OpenSearch Serverless time series collection with policies to configure encryption and access to the collection!

Clean up

The resources will incur costs as long as they are running, so clean up the resources when you are done using them. Use the terraform destroy command to do this:

$ terraform destroy

...

Terraform used the selected providers to generate the following execution plan. Resource actions are indicated with the following symbols:
  - destroy

Terraform will perform the following actions:

  ...

Plan: 0 to add, 0 to change, 13 to destroy.

Changes to Outputs:

...

Do you really want to destroy all resources?
  Terraform will destroy all your managed infrastructure, as shown above.
  There is no undo. Only 'yes' will be accepted to confirm.

  Enter a value:

Answer yes to run this plan and destroy the infrastructure.

Destroy complete! Resources: 13 destroyed.

All resources created during this walkthrough have now been deleted.

Conclusion

In this post, you created an OpenSearch Serverless collection. Using IaC software like Terraform can make it simple to manage your resources like OpenSearch Serverless collections, encryption, network, and data access policies, and VPC endpoints.

Thank you to all the open-source contributors who help maintain OpenSearch and Terraform.

Try using OpenSearch Serverless with Terraform to simplify your resource management. Check out the Getting started with Amazon OpenSearch Serverless workshop and the Amazon OpenSearch Serverless Developer Guide to learn more about OpenSearch Serverless.


About the authors

Joshua Luo is a Software Development Engineer for Amazon OpenSearch Serverless. He works on the systems that enable customers to manage and monitor their OpenSearch Serverless resources. He enjoys bouldering, photography, and videography in his free time.

Satish Nandi is a Senior Technical Product Manager for Amazon OpenSearch Service.

Build an ETL process for Amazon Redshift using Amazon S3 Event Notifications and AWS Step Functions

Post Syndicated from Ziad Wali original https://aws.amazon.com/blogs/big-data/build-an-etl-process-for-amazon-redshift-using-amazon-s3-event-notifications-and-aws-step-functions/

Data warehousing provides a business with several benefits such as advanced business intelligence and data consistency. It plays a big role within an organization by helping to make the right strategic decision at the right moment which could have a huge impact in a competitive market. One of the major and essential parts in a data warehouse is the extract, transform, and load (ETL) process which extracts the data from different sources, applies business rules and aggregations and then makes the transformed data available for the business users.

This process is always evolving to reflect new business and technical requirements, especially when working in an ambitious market. Nowadays, more verification steps are applied to source data before processing them which so often add an administration overhead. Hence, automatic notifications are more often required in order to accelerate data ingestion, facilitate monitoring and provide accurate tracking about the process.

Amazon Redshift is a fast, fully managed, cloud data warehouse that allows you to process and run your complex SQL analytics workloads on structured and semi-structured data. It also helps you to securely access your data in operational databases, data lakes or third-party datasets with minimal movement or copying. AWS Step Functions is a fully managed service that gives you the ability to orchestrate and coordinate service components. Amazon S3 Event Notifications is an Amazon S3 feature that you can enable in order to receive notifications when specific events occur in your S3 bucket.

In this post we discuss how we can build and orchestrate in a few steps an ETL process for Amazon Redshift using Amazon S3 Event Notifications for automatic verification of source data upon arrival and notification in specific cases. And we show how to use AWS Step Functions for the orchestration of the data pipeline. It can be considered as a starting point for teams within organizations willing to create and build an event driven data pipeline from data source to data warehouse that will help in tracking each phase and in responding to failures quickly. Alternatively, you can also use Amazon Redshift auto-copy from Amazon S3 to simplify data loading from Amazon S3 into Amazon Redshift.

Solution overview

The workflow is composed of the following steps:

  1. A Lambda function is triggered by an S3 event whenever a source file arrives at the S3 bucket. It does the necessary verifications and then classifies the file before processing by sending it to the appropriate Amazon S3 prefix (accepted or rejected).
  2. There are two possibilities:
    • If the file is moved to the rejected Amazon S3 prefix, an Amazon S3 event sends a message to Amazon SNS for further notification.
    • If the file is moved to the accepted Amazon S3 prefix, an Amazon S3 event is triggered and sends a message with the file path to Amazon SQS.
  3. An Amazon EventBridge scheduled event triggers the AWS Step Functions workflow.
  4. The workflow executes a Lambda function that pulls out the messages from the Amazon SQS queue and generates a manifest file for the COPY command.
  5. Once the manifest file is generated, the workflow starts the ETL process using stored procedure.

The following image shows the workflow.

Prerequisites

Before configuring the previous solution, you can use the following AWS CloudFormation template to set up and create the infrastructure

  • Give the stack a name, select a deployment VPC and define the master user for the Amazon Redshift cluster by filling in the two parameters MasterUserName and MasterUserPassword.

The template will create the following services:

  • An S3 bucket
  • An Amazon Redshift cluster composed of two ra3.xlplus nodes
  • An empty AWS Step Functions workflow
  • An Amazon SQS queue
  • An Amazon SNS topic
  • An Amazon EventBridge scheduled rule with a 5-minute rate
  • Two empty AWS Lambda functions
  • IAM roles and policies for the services to communicate with each other

The names of the created services are usually prefixed by the stack’s name or the word blogdemo. You can find the names of the created services in the stack’s resources tab.

Step 1: Configure Amazon S3 Event Notifications

Create the following four folders in the S3 bucket:

  • received
  • rejected
  • accepted
  • manifest

In this scenario, we will create the following three Amazon S3 event notifications:

  1. Trigger an AWS Lambda function on the received folder.
  2. Send a message to the Amazon SNS topic on the rejected folder.
  3. Send a message to Amazon SQS on the accepted folder.

To create an Amazon S3 event notification:

  1. Go to the bucket’s Properties tab.
  2. In the Event Notifications section, select Create Event Notification.
  3. Fill in the necessary properties:
    • Give the event a name.
    • Specify the appropriate prefix or folder (accepted/, rejected/ or received/).
    • Select All object create events as an event type.
    • Select and choose the destination (AWS lambda, Amazon SNS or Amazon SQS).
      Note: for an AWS Lambda destination, choose the function that starts with ${stackname}-blogdemoVerify_%

At the end, you should have three Amazon S3 events:

  • An event for the received prefix with an AWS Lambda function as a destination type.
  • An event for the accepted prefix with an Amazon SQS queue as a destination type.
  • An event for the rejected prefix with an Amazon SNS topic as a destination type.

The following image shows what you should have after creating the three Amazon S3 events:

Step 2: Create objects in Amazon Redshift

Connect to the Amazon Redshift cluster and create the following objects:

  1. Three schemas:
    create schema blogdemo_staging; -- for staging tables
    create schema blogdemo_core; -- for target tables
    create schema blogdemo_proc; -- for stored procedures

  2. A table in the blogdemo_staging and blogdemo_core schemas:
    create table ${schemaname}.rideshare
    (
      id_ride bigint not null,
      date_ride timestamp not null,
      country varchar (20),
      city varchar (20),
      distance_km smallint,
      price decimal (5,2),
      feedback varchar (10)
    ) distkey(id_ride);

  3. A stored procedure to extract and load data into the target schema:
    create or replace procedure blogdemo_proc.elt_rideshare (bucketname in varchar(200),manifestfile in varchar (500))
    as $$
    begin
    -- purge staging table
    truncate blogdemo_staging.rideshare;
    
    -- copy data from s3 bucket to staging schema
    execute 'copy blogdemo_staging.rideshare from ''s3://' + bucketname + '/' + manifestfile + ''' iam_role default delimiter ''|'' manifest;';
    
    -- apply transformation rules here
    
    -- insert data into target table
    insert into blogdemo_core.rideshare
    select * from blogdemo_staging.rideshare;
    
    end;
    $$ language plpgsql;

  4. Set the role ${stackname}-blogdemoRoleRedshift_% as a default role:
    1. In the Amazon Redshift console, go to clusters and click on the cluster blogdemoRedshift%.
    2. Go to the Properties tab.
    3. In the Cluster permissions section, select the role ${stackname}-blogdemoRoleRedshift%.
    4. Click on Set default then Make default.

Step 3: Configure Amazon SQS queue

The Amazon SQS queue can be used as it is; this means with the default values. The only thing you need to do for this demo is to go to the created queue ${stackname}-blogdemoSQS% and purge the test messages generated (if any) by the Amazon S3 event configuration. Copy its URL in a text file for further use (more precisely, in one of the AWS Lambda functions).

Step 4: Setup Amazon SNS topic

  1. In the Amazon SNS console, go to the topic ${stackname}-blogdemoSNS%
  2. Click on the Create subscription button.
  3. Choose the blogdemo topic ARN, email protocol, type your email and then click on Create subscription.
  4. Confirm your subscription in your email that you received.

Step 5: Customize the AWS Lambda functions

  1. The following code verifies the name of a file. If it respects the naming convention, it will move it to the accepted folder. If it does not respect the naming convention, it will move it to the rejected one. Copy it to the AWS Lambda function ${stackname}-blogdemoLambdaVerify and then deploy it:
    import boto3
    import re
    
    def lambda_handler (event, context):
        objectname = event['Records'][0]['s3']['object']['key']
        bucketname = event['Records'][0]['s3']['bucket']['name']
        
        result = re.match('received/rideshare_data_20[0-5][0-9]((0[1-9])|(1[0-2]))([0-2][1-9]|3[0-1])\.csv',objectname)
        targetfolder = ''
        
        if result: targetfolder = 'accepted'
        else: targetfolder = 'rejected'
        
        s3 = boto3.resource('s3')
        copy_source = {
            'Bucket': bucketname,
            'Key': objectname
        }
        target_objectname=objectname.replace('received',targetfolder)
        s3.meta.client.copy(copy_source, bucketname, target_objectname)
        
        s3.Object(bucketname,objectname).delete()
        
        return {'Result': targetfolder}

  2. The second AWS Lambda function ${stackname}-blogdemonLambdaGenerate% retrieves the messages from the Amazon SQS queue and generates and stores a manifest file in the S3 bucket manifest folder. Copy the following content, replace the variable ${sqs_url} by the value retrieved in Step 3 and then click on Deploy.
    import boto3
    import json
    import datetime
    
    def lambda_handler(event, context):
    
        sqs_client = boto3.client('sqs')
        queue_url='${sqs_url}'
        bucketname=''
        keypath='none'
        
        manifest_content='{\n\t"entries": ['
        
        while True:
            response = sqs_client.receive_message(
                QueueUrl=queue_url,
                AttributeNames=['All'],
                MaxNumberOfMessages=1
            )
            try:
                message = response['Messages'][0]
            except KeyError:
                break
            
            message_body=message['Body']
            message_data = json.loads(message_body)
            
            objectname = message_data['Records'][0]['s3']['object']['key']
            bucketname = message_data['Records'][0]['s3']['bucket']['name']
    
            manifest_content = manifest_content + '\n\t\t{"url":"s3://' +bucketname + '/' + objectname + '","mandatory":true},'
            receipt_handle = message['ReceiptHandle']
    
            sqs_client.delete_message(
                QueueUrl=queue_url,
                ReceiptHandle=receipt_handle
            )
            
        if bucketname != '':
            manifest_content=manifest_content[:-1]+'\n\t]\n}'
            s3 = boto3.resource("s3")
            encoded_manifest_content=manifest_content.encode('utf-8')
            current_datetime=datetime.datetime.now()
            keypath='manifest/files_list_'+current_datetime.strftime("%Y%m%d-%H%M%S")+'.manifest'
            s3.Bucket(bucketname).put_object(Key=keypath, Body=encoded_manifest_content)
    
        sf_tasktoken = event['TaskToken']
        
        step_function_client = boto3.client('stepfunctions')
        step_function_client.send_task_success(taskToken=sf_tasktoken,output='{"manifestfilepath":"' + keypath + '",\"bucketname":"' + bucketname +'"}')

Step 6: Add tasks to the AWS Step Functions workflow

Create the following workflow in the state machine ${stackname}-blogdemoStepFunctions%.

If you would like to accelerate this step, you can drag and drop the content of the following JSON file in the definition part when you click on Edit. Make sure to replace the three variables:

  • ${GenerateManifestFileFunctionName} by the ${stackname}-blogdemoLambdaGenerate% arn.
  • ${RedshiftClusterIdentifier} by the Amazon Redshift cluster identifier.
  • ${MasterUserName} by the username that you defined while deploying the CloudFormation template.

Step 7: Enable Amazon EventBridge rule

Enable the rule and add the AWS Step Functions workflow as a rule target:

  1. Go to the Amazon EventBridge console.
  2. Select the rule created by the Amazon CloudFormation template and click on Edit.
  3. Enable the rule and click Next.
  4. You can change the rate if you want. Then select Next.
  5. Add the AWS Step Functions state machine created by the CloudFormation template blogdemoStepFunctions% as a target and use an existing role created by the CloudFormation template ${stackname}-blogdemoRoleEventBridge%
  6. Click on Next and then Update rule.

Test the solution

In order to test the solution, the only thing you should do is upload some csv files in the received prefix of the S3 bucket. Here are some sample data; each file contains 1000 rows of rideshare data.

If you upload them in one click, you should receive an email because the ridesharedata2022.csv does not respect the naming convention. The other three files will be loaded in the target table blogdemo_core.rideshare. You can check the Step Functions workflow to verify that the process finished successfully.

Clean up

  1. Go to the Amazon EventBridge console and delete the rule ${stackname}-blogdemoevenbridge%.
  2. In the Amazon S3 console, select the bucket created by the CloudFormation template ${stackname}-blogdemobucket% and click on Empty.
  3. Go to Subscriptions in the Amazon SNS console and delete the subscription created in Step 4.
  4. In the AWS CloudFormation console, select the stack and delete it.

Conclusion

In this post, we showed how different AWS services can be easily implemented together in order to create an event-driven architecture and automate its data pipeline, which targets the cloud data warehouse Amazon Redshift for business intelligence applications and complex queries.


About the Author

Ziad WALI is an Acceleration Lab Solutions Architect at Amazon Web Services. He has over 10 years of experience in databases and data warehousing where he enjoys building reliable, scalable and efficient solutions. Outside of work, he enjoys sports and spending time in nature.

Announcing Amazon Managed Service for Apache Flink Renamed from Amazon Kinesis Data Analytics

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/announcing-amazon-managed-service-for-apache-flink-renamed-from-amazon-kinesis-data-analytics/

Today we are announcing the rename of Amazon Kinesis Data Analytics to Amazon Managed Service for Apache Flink, a fully managed and serverless service for you to build and run real-time streaming applications using Apache Flink.

We continue to deliver the same experience in your Flink applications without any impact on ongoing operations, developments, or business use cases. All your existing running applications in Kinesis Data Analytics will work as is without any changes.

Many customers use Apache Flink for data processing, including support for diverse use cases with a vibrant open-source community. While Apache Flink applications are robust and popular, they can be difficult to manage because they require scaling and coordination of parallel compute or container resources. With the explosion of data volumes, data types, and data sources, customers need an easier way to access, process, secure, and analyze their data to gain faster and deeper insights without compromising on performance and costs.

Using Amazon Managed Service for Apache Flink, you can set up and integrate data sources or destinations with minimal code, process data continuously with sub-second latencies from hundreds of data sources like Amazon Kinesis Data Streams and Amazon Managed Streaming for Apache Kafka (Amazon MSK), and respond to events in real-time. You can also analyze streaming data interactively with notebooks in just a few clicks with Amazon Managed Service for Apache Flink Studio with built-in visualizations powered by Apache Zeppelin.

With Amazon Managed Service for Apache Flink, you can deploy secure, compliant, and highly available applications. There are no servers and clusters to manage, no compute and storage infrastructure to set up, and you only pay for the resources your applications consume.

A History to Support Apache Flink
Since we launched Amazon Kinesis Data Analytics based on a proprietary SQL engine in 2016, we learned that SQL alone was not sufficient to provide the capabilities that customers needed for efficient stateful stream processing. So, we started investing in Apache Flink, a popular open-source framework and engine for processing real-time data streams.

In 2018, we provided support for Amazon Kinesis Data Analytics for Java as a programmable option for customers to build streaming applications using Apache Flink libraries and choose their own integrated development environment (IDE) to build their applications. In 2020, we repositioned Amazon Kinesis Data Analytics for Java to Amazon Kinesis Data Analytics for Apache Flink to emphasize our continued support for Apache Flink. In 2021, we launched Kinesis Data Analytics Studio (now, Amazon Managed Service for Apache Flink Studio) with a simple, familiar notebook interface for rapid development powered by Apache Zeppelin and using Apache Flink as the processing engine.

Since 2019, we have worked more closely with the Apache Flink community, increasing code contributions in the area of AWS connectors for Apache Flink such as those for Kinesis Data Streams and Kinesis Data Firehose, as well as sponsoring annual Flink Forward events. Recently, we contributed Async Sink to the Flink 1.15 release, which improved cloud interoperability and added more sink connectors and formats, among other updates.

Beyond connectors, we continue to work with the Flink community to contribute availability improvements and deployment options. To learn more, see Making it Easier to Build Connectors with Apache Flink: Introducing the Async Sink in the AWS Open Source Blog.

New Features in Amazon Managed Service for Apache Flink
As I mentioned, you can continue to run your existing Flink applications in Kinesis Data Analytics (now Amazon Managed Apache Flink) without making any changes. I want to let you know about a part of the service along with the console change and new feature,  a blueprint where you create an end-to-end data pipeline with just one click.

First, you can use the new console of Amazon Managed Service for Apache Flink directly under the Analytics section in AWS. To get started, you can easily create Streaming applications or Studio notebooks in the new console, with the same experience as before.

To create a streaming application in the new console, choose Create from scratch or Use a blueprint. With a new blueprint option, you can create and set up all the resources that you need to get started in a single step using AWS CloudFormation.

The blueprint is a curated collection of Apache Flink applications. The first of these has demo data being read from a Kinesis Data Stream and written to an Amazon Simple Storage Service (Amazon S3) bucket.

After creating the demo application, you can configure, run, and open the Apache Flink dashboard to monitor your Flink application’s health with the same experiences as before. You can change a code sample in the GitHub repository to perform different operations using the Flink libraries in your own local development environment.

Blueprints are designed to be extensible, and you can leverage them to create more complex applications to solve your business challenges based on Amazon Managed Service for Apache Flink. Learn more about how to use Apache Flink libraries in the AWS documentation.

You can also use a blueprint to create your Studio notebook using Apache Zeppelin as a new setup option. With this new blueprint option, you can also create and set up all the resources that you need to get started in a single step using AWS CloudFormation.

This blueprint includes Apache Flink applications with demo data being sent to an Amazon MSK topic and read in Managed Service for Apache Flink. With an Apache Zeppelin notebook, you can view, query, and analyze your streaming data. Deploying the blueprint and setting up the Studio notebook takes about ten minutes. Go get a cup of coffee while we set it up!

After creating the new Studio notebook, you can open an Apache Zeppelin notebook to run SQL queries in your note with the same experiences as before. You can view a code sample in the GitHub repository to learn more about how to use Apache Flink libraries.

You can run more SQL queries on this demo data such as user-defined functions, tumbling and hopping windows, Top-N queries, and delivering data to an S3 bucket for streaming.

You can also use Java, Python, or Scala to power up your SQL queries and deploy your note as a continuously running application, as shown in the blog posts, how to use the Studio notebook and query your Amazon MSK topics.

To learn more blueprint samples, see GitHub repositories such as reading from MSK Serverless and writing to Amazon S3, reading from MSK Serverless and writing to MSK Serverless, and reading from MSK Serverless and writing to Amazon S3.

Now Available
You can now use Amazon Managed Service for Apache Flink, renamed from Amazon Kinesis Data Analytics. All your existing running applications in Kinesis Data Analytics will work as is without any changes.

To learn more, visit the new product page and developer guide. You can send feedback to AWS re:Post for Amazon Managed Service for Apache Flink, or through your usual AWS Support contacts.

Channy

Monitoring Amazon OpenSearch Serverless using AWS User Notifications

Post Syndicated from Raj Ramasubbu original https://aws.amazon.com/blogs/big-data/monitoring-amazon-opensearch-serverless-using-aws-user-notifications/

Amazon OpenSearch Serverless is a serverless deployment option for Amazon OpenSearch Service that makes it simple for you to run search and analytics workloads without having to think about infrastructure management. The compute capacity used for data ingestion, and search and query in OpenSearch Serverless is measured in OpenSearch Compute Units (OCUs). Customers can configure maximum OCU limits in their AWS account to control costs. In the past, customers had to monitor resource usage metrics to make sure that the collections did not deplete their configured storage and computational capacity. With the new AWS User Notification integration, you can configure the system to send notifications whenever the capacity threshold is breached. The User Notification feature eliminates the need to monitor the service constantly. AWS User Notifications enables users to centrally set up and view notifications from various AWS services across accounts and regions in a human-friendly format. Users can view notifications in a Console Notifications Center and also configure various delivery channels.

You can now use AWS User Notifications to set up delivery channels to get notified when resource usage is nearing or exceeding the capacity threshold. You receive a notification when an event matches a rule that you specify. There are multiple channels for receiving notifications, including email, AWS Chatbot chat notifications, and AWS Console Mobile Application push notifications. In this post, you will see how you can use AWS user notifications to receive notifications for OCU threshold breaches across all your OpenSearch Serverless collections.

Solution overview

OpenSearch Serverless allows you to receive OCU utilization notifications for both search and indexing in the two scenarios listed below. OCU utilization percent is calculated based on your configured maximum capacity limit and the current OCU consumption.

  • OCU Utilization Approaching Max Limit – OpenSearch Serverless sends this event through AWS User Notifications when current OCU usage percent reaches greater than or equal to 75 percent of the configured maximum OCU capacity.
  • OCU Utilization Reached Max Limit – OpenSearch Serverless sends this event when OCU usage percent reaches 100 percent of the configured maximum OCU capacity.

If you receive OCU utilization notification, you can adjust the maximum capacity limit for your collection using either the console or AWS CLI.

The following sections detail the steps you can take to receive both these OCU utilization notifications for all collections.

Prerequisites

As a prerequisite to receive OCU utilization notifications, you will set up notification configuration in notification hubs to store the notifications data. This has to be done only once, and at least one AWS region should be selected. The following screenshot shows a sample notification hub configuration for the US East (Ohio) AWS Region.

Also, please configure the maximum OCU capacity depending on your requirements for all collections.

Set up OCU utilization notifications

To set up the notifications, complete the following steps:

  1. On the AWS User Notifications console, create a notification configuration to receive notifications about OCU utilization.
  2. Choose Amazon OpenSearch Serverless for the service name and choose OCU Utilization Approaching Max Limit for the event type.
  3. Click Add another event rule and choose OCU Utilization Reached Max Limit for the event type.
  4. Using AWS User Notifications, you can receive the notifications as soon as they occur or receive them within 5 minutes to avoid receiving too many notifications all at once. We recommend receiving notifications every 5 minutes. Choose Receive within 5 minutes (recommended).
  5. You can opt to receive notifications through many delivery channels like the AWS Console Mobile Application and chat channels like Slack. For this blog post, to keep it simple, you can add your email address as the delivery channel, as shown in the following image.
  6. After you complete the configuration, your notification configurations page should look like the following screenshot.
  7. Once the OCU consumption for any of your collections reaches greater than or equal to 75 percent of the configured maximum OCU capacity, you will get a notification to your configured email address within 5 minutes, as shown in the following image.
  8. Once the OCU consumption reaches 100 percent of allocated OCU capacity for any of your collections, you will get a notification to your configured email address within 5 minutes, as shown in the following image.
  9. You can also see notifications in the AWS User Notifications console, as shown in the following screenshots

Summary

You can use AWS User Notifications to get notifications from various AWS services in one place. Now with Amazon OpenSearch Serverless integration, you can receive OCU utilization notifications as well. In this post, we explored how to enable notifications for all Amazon OpenSearch Serverless collections and receive notifications using an email delivery channel. If you have any questions or suggestions, please write to us in the comments section.


About the Author

Raj Ramasubbu is a Senior Analytics Specialist Solutions Architect focused on big data and analytics and AI/ML with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS. Raj provided technical expertise and leadership in building data engineering, big data analytics, business intelligence, and data science solutions for over 18 years prior to joining AWS. He helped customers in various industry verticals like healthcare, medical devices, life science, retail, asset management, car insurance, residential REIT, agriculture, title insurance, supply chain, document management, and real estate.

Generate machine learning insights for Amazon Security Lake data using Amazon SageMaker

Post Syndicated from Jonathan Nguyen original https://aws.amazon.com/blogs/security/generate-machine-learning-insights-for-amazon-security-lake-data-using-amazon-sagemaker/

Amazon Security Lake automatically centralizes the collection of security-related logs and events from integrated AWS and third-party services. With the increasing amount of security data available, it can be challenging knowing what data to focus on and which tools to use. You can use native AWS services such as Amazon QuickSight, Amazon OpenSearch, and Amazon SageMaker Studio to visualize, analyze, and interactively identify different areas of interest to focus on, and prioritize efforts to increase your AWS security posture.

In this post, we go over how to generate machine learning insights for Security Lake using SageMaker Studio. SageMaker Studio is a web integrated development environment (IDE) for machine learning that provides tools for data scientists to prepare, build, train, and deploy machine learning models. With this solution, you can quickly deploy a base set of Python notebooks focusing on AWS Security Hub findings in Security Lake, which can also be expanded to incorporate other AWS sources or custom data sources in Security Lake. After you’ve run the notebooks, you can use the results to help you identify and focus on areas of interest related to security within your AWS environment. As a result, you might implement additional guardrails or create custom detectors to alert on suspicious activity.

Prerequisites

  1. Specify a delegated administrator account to manage the Security Lake configuration for all member accounts within your organization.
  2. Security Lake has been enabled in the delegated administrator AWS account.
  3. As part of the solution in this post, we focus on Security Hub as a data source. AWS Security Hub must be enabled for your AWS Organizations. When enabling Security Lake, select All log and event sources to include AWS Security Hub findings.
  4. Configure subscriber query access to Security Lake. Security Lake uses AWS Lake Formation cross-account table sharing to support subscriber query access. Accept the resource share request in the subscriber AWS account in AWS Resource Access Manager (AWS RAM). Subscribers with query access can query the data that Security Lake collects. These subscribers query Lake Formation tables in an Amazon Simple Storage Service (Amazon S3) bucket with Security Lake data using services such as Amazon Athena.

Solution overview

Figure 1 that follows depicts the architecture of the solution.

Figure 1 SageMaker machine learning insights architecture for Security Lake

Figure 1 SageMaker machine learning insights architecture for Security Lake

The deployment builds the architecture by completing the following steps:

  1. A Security Lake is set up in an AWS account with supported log sources — such as Amazon VPC Flow Logs, AWS Security Hub, AWS CloudTrail, and Amazon Route53 — configured.
  2. Subscriber query access is created from the Security Lake AWS account to a subscriber AWS account.

    Note: See Prerequisite #4 for more information.

  3. The AWS RAM resource share request must be accepted in the subscriber AWS account where this solution is deployed.

    Note: See Prerequisite #4 for more information.

  4. A resource link database in Lake Formation is created in the subscriber AWS account and grants access for the Athena tables in the Security Lake AWS account.
  5. VPC is provisioned for SageMaker with IGW, NAT GW, and VPC endpoints for the AWS services used in the solution. IGW and NAT are required to install external open-source packages.
  6. A SageMaker Domain for SageMaker Studio is created in VPCOnly mode with a single SageMaker user profile that is tied to a dedicated AWS Identity and Access Management (IAM) role.
  7. A dedicated IAM role is created to restrict access to create and access the presigned URL for the SageMaker Domain from a specific CIDR for accessing the SageMaker notebook.
  8. An AWS CodeCommit repository containing Python notebooks is used for the AI and ML workflow by the SageMaker user-profile.
  9. An Athena workgroup is created for the Security Lake queries with an S3 bucket for output location (access logging configured for the output bucket).

Deploy the solution

You can deploy the SageMaker solution by using either the AWS Management Console or the AWS Cloud Development Kit (AWS CDK).

Option 1: Deploy the solution with AWS CloudFormation using the console

Use the console to sign in to your subscriber AWS account and then choose the Launch Stack button to open the AWS CloudFormation console pre-loaded with the template for this solution. It takes approximately 10 minutes for the CloudFormation stack to complete.

Select this image to open a link that starts building the CloudFormation stack

Option 2: Deploy the solution by using the AWS CDK

You can find the latest code for the SageMaker solution in the SageMaker machine learning insights GitHub repository, where you can also contribute to the sample code. For instructions and more information on using the AWS CDK, see Get Started with AWS CDK.

To deploy the solution by using the AWS CDK

  1. To build the app when navigating to the project’s root folder, use the following commands:
    npm install -g aws-cdk-lib
    npm install

  2. Update IAM_role_assumption_for_sagemaker_presigned_url and security_lake_aws_account default values in source/lib/sagemaker_domain.ts with their respective appropriate values.
  3. Run the following commands in your terminal while authenticated in your subscriber AWS account. Be sure to replace <INSERT_AWS_ACCOUNT> with your account number and replace <INSERT_REGION> with the AWS Region that you want the solution deployed to.
    cdk bootstrap aws://<INSERT_AWS_ACCOUNT>/<INSERT_REGION>
    cdk deploy

Post deployment steps

Now that you’ve deployed the SageMaker solution, you must grant the SageMaker user profile in the subscriber AWS account query access to your Security Lake. You can Grant permission for the SageMaker user profile to Security Lake in Lake Formation in the subscriber AWS account.

Grant permission to the Security Lake database

  1. Copy the SageMaker user-profile Amazon resource name (ARN) arn:aws:iam::<account-id>:role/sagemaker-user-profile-for-security-lake
  2. Go to Lake Formation in the console.
  3. Select the amazon_security_lake_glue_db_us_east_1 database.
  4. From the Actions Dropdown, select Grant.
  5. In Grant Data Permissions, select SAML Users and Groups.
  6. Paste the SageMaker user profile ARN from Step 1.
  7. In Database Permissions, select Describe and then Grant.

Grant permission to Security Lake – Security Hub table

  1. Copy the SageMaker user-profile ARN arn:aws:iam:<account-id>:role/sagemaker-user-profile-for-security-lake
  2. Go to Lake Formation in the console.
  3. Select the amazon_security_lake_glue_db_us_east_1 database.
  4. Choose View Tables.
  5. Select the amazon_security_lake_table_us_east_1_sh_findings_1_0 table.
  6. From Actions Dropdown, select Grant.
  7. In Grant Data Permissions, select SAML Users and Groups.
  8. Paste the SageMaker user-profile ARN from Step 1.
  9. In Table Permissions, select Describe and then Grant.

Launch your SageMaker Studio application

Now that you have granted permissions for a SageMaker user-profile, we can move on to launching the SageMaker application associated to that user-profile.

  1. Navigate to the SageMaker Studio domain in the console.
  2. Select the SageMaker domain security-lake-ml-insights-<account-id>.
  3. Select the SageMaker user profile sagemaker-user-profile-for-security-lake.
  4. Select the Launch drop-down and select Studio
    Figure 2 SageMaker domain user-profile AWS console screen

    Figure 2: SageMaker domain user-profile AWS console screen

Clone Python notebooks

You’ll work primarily in the SageMaker user profile to create a data-science app to work in. As part of the solution deployment, we’ve created Python notebooks in CodeCommit that you will need to clone.

To clone the Python notebooks

  1. Navigate to CloudFormation in the console.
  2. In the Stacks section, select the SageMakerDomainStack.
  3. Select to the Outputs tab/
  4. Copy the value for sagemakernotebookmlinsightsrepositoryURL. (For example: https://git-codecommit.us-east-1.amazonaws.com/v1/repos/sagemaker_ml_insights_repo)
  5. Go back to your SageMaker app.
  6. In Studio, in the left sidebar, choose the Git icon (identified by a diamond with two branches), then choose Clone a Repository.
    Figure 3 SageMaker clone CodeCommit repository

    Figure 3: SageMaker clone CodeCommit repository

  7. Paste the CodeCommit repository link from Step 4 under the Git repository URL (git). After you paste the URL, select Clone “https://git-codecommit.us-east-1.amazonaws.com/v1/repos/sagemaker_ml_insights_repo”, then select Clone.

    NOTE: If you don’t select from the auto-populated drop-down, SageMaker won’t be able to clone the repository.

    Figure 4 SageMaker clone CodeCommit URL

    Figure 4: SageMaker clone CodeCommit URL

Generating machine learning insights using SageMaker Studio

You’ve successfully pulled the base set of Python notebooks into your SageMaker app and they can be accessed at sagemaker_ml_insights_repo/notebooks/tsat/. The notebooks provide you with a starting point for running machine learning analysis using Security Lake data. These notebooks can be expanded to existing native or custom data sources being sent to Security Lake.

Figure 5: SageMaker cloned Python notebooks

Figure 5: SageMaker cloned Python notebooks

Notebook #1 – Environment setup

The 0.0-tsat-environ-setup notebook handles the installation of the required libraries and dependencies needed for the subsequent notebooks within this blog. For our notebooks, we use an open-source Python library called Kats, which is a lightweight, generalizable framework to perform time series analysis.

  1. Select the 0.0-tsat-environ-setup.ipynb notebook for the environment setup.

    Note: If you have already provisioned a kernel, you can skip steps 2 and 3.

  2. In the right-hand corner, select No Kernel
  3. In the Set up notebook environment pop-up, leave the defaults and choose Select.
    Figure 6 SageMaker application environment settings

    Figure 6: SageMaker application environment settings

  4. After the kernel has successfully started, choose the Terminal icon to open the image terminal.
    Figure 7: SageMaker application terminal

    Figure 7: SageMaker application terminal

  5. To install open-source packages from https instead of http, you must update the sources.list file. After the terminal opens, send the following commands:
    cd /etc/apt
    sed -i 's/http:/https:/g' sources.list

  6. Go back to the 0.0-tsat-environ-setup.ipynb notebook and select the Run drop-down and select Run All Cells. Alternatively, you can run each cell independently, but it’s not required. Grab a coffee! This step will take about 10 minutes.

    IMPORTANT: If you complete the installation out of order or update the requirements.txt file, you might not be able to successfully install Kats and you will need to rebuild your environment by using a net-new SageMaker user profile.

  7. After installing all the prerequisites, check the Kats version to determine if it was successfully installed.
    Figure 8: Kats installation verification

    Figure 8: Kats installation verification

  8. Install PyAthena (Python DB API client for Amazon Athena) which is used to query your data in Security Lake.

You’ve successfully set up the SageMaker app environment! You can now load the appropriate dataset and create a time series.

Notebook #2 – Load data

The 0.1-load-data notebook establishes the Athena connection to query data in Security Lake and creates the resulting time series dataset. The time series dataset will be used for subsequent notebooks to identify trends, outliers, and change points.

  1. Select the 0.1-load-data.ipynb notebook.
  2. If you deployed the solution outside of us-east-1, update the con details to the appropriate Region. In this example, we’re focusing on Security Hub data within Security Lake. If you want to change the underlying data source, you can update the TABLE value.
    Figure 9: SageMaker notebook load Security Lake data settings

    Figure 9: SageMaker notebook load Security Lake data settings

  3. In the Query section, there’s an Athena query to pull specific data from Security Hub, this can be expanded as needed to a subset or can include all products within Security Hub. The query below pulls Security Hub information after 01:00:00 1/1/2022 from the products listed in productname.
    Figure 10: SageMaker notebook Athena query

    Figure 10: SageMaker notebook Athena query

  4. After the values have been updated, you can create your time series dataset. For this notebook, we recommend running each cell individually instead of running all cells at once so you can get a bit more familiar with the process. Select the first cell and choose the Run icon.
    Figure 11: SageMaker run Python notebook code

    Figure 11: SageMaker run Python notebook code

  5. Follow the same process as Step 4 for the subsequent cells.

    Note: If you encounter any issues with querying the table, make sure you completed the post-deployment step for Grant permission to Security Lake – Security Hub table.

You’ve successfully loaded your data and created a timeseries! You can now move on to generating machine learning insights from your timeseries.

Notebook #3 – Trend detector

The 1.1-trend-detector.ipynb notebook handles trend detection in your data. Trend represents a directional change in the level of a time series. This directional change can be either upward (increase in level) or downward (decrease in level). Trend detection helps detect a change while ignoring the noise from natural variability. Each environment is different, and trends help us identify where to look more closely to determine why a trend is positive or negative.

  1. Select 1.1-trend-detector.ipynb notebook for trend detection.
  2. Slopes are created to identify the relationship between x (time) and y (counts).
    Figure 12: SageMaker notebook slope view

    Figure 12: SageMaker notebook slope view

  3. If the counts are increasing with time, then it’s considered a positive slope and the reverse is considered a negative slope. A positive slope isn’t necessarily a good thing because in an ideal state we would expect counts of a finding type to come down with time.
    Figure 13: SageMaker notebook trend view

    Figure 13: SageMaker notebook trend view

  4. Now you can plot the top five positive and negative trends to identify the top movers.
    Figure 14: SageMaker notebook trend results view

    Figure 14: SageMaker notebook trend results view

Notebook #4 – Outlier detection

The 1.2-outlier-detection.ipynb notebook handles outlier detection. This notebook does a seasonal decomposition of the input time series, with additive or multiplicative decomposition as specified (default is additive). It uses a residual time series by either removing only trend or both trend and seasonality if the seasonality is strong. The intent is to discover useful, abnormal, and irregular patterns within data sets, allowing you to pinpoint areas of interest.

  1. To start, it detects points in the residual that are over 5 times the inter-quartile range.
  2. Inter-quartile range (IQR) is the difference between the seventy-fifth and twenty-fifth percentiles of residuals or the spread of data within the middle two quartiles of the entire dataset. IQR is useful in detecting the presence of outliers by looking at values that might lie outside of the middle two quartiles.
  3. The IQR multiplier controls the sensitivity of the range and decision of identifying outliers. By using a larger value for the iqr_mult_thresh parameter in OutlierDetector, outliers would be considered data points, while a smaller value would identify data points as outliers.

    Note: If you don’t have enough data, decrease iqr_mult_thresh to a lower value (for example iqr_mult_thresh=3).

    Figure 15: SageMaker notebook outlier setting

    Figure 15: SageMaker notebook outlier setting

  4. Along with outlier detection plots, investigation SQL will be displayed as well, which can help with further investigation of the outliers.

    In the diagram that follows, you can see that there are several outliers in the number of findings, related to failed AWS Firewall Manager policies, which have been identified by the vertical red lines within the line graph. These are outliers because they deviate from the normal behavior and number of findings on a day-to-day basis. When you see outliers, you can look at the resources that might have caused an unusual increase in Firewall Manager policy findings. Depending on the findings, it could be related to an overly permissive or noncompliant security group or a misconfigured AWS WAF rule group.

    Figure 16: SageMaker notebook outlier results view

    Figure 16: SageMaker notebook outlier results view

Notebook #5 – Change point detection

The 1.3-changepoint-detector.ipynb notebook handles the change point detection. Change point detection is a method to detect changes in a time series that persist over time, such as a change in the mean value. To detect a baseline to identify when several changes might have occurred from that point. Change points occur when there’s an increase or decrease to the average number of findings within a data set.

  1. Along with identifying change points within the data set, the investigation SQL is generated to further investigate the specific change point if applicable.

    In the following diagram, you can see there’s a change point decrease after July 27, 2022, with confidence of 99.9 percent. It’s important to note that change points differ from outliers, which are sudden changes in the data set observed. This diagram means there was some change in the environment that resulted in an overall decrease in the number of findings for S3 buckets with block public access being disabled. The change could be the result of an update to the CI/CD pipelines provisioning S3 buckets or automation to enable all S3 buckets to block public access. Conversely, if you saw a change point that resulted in an increase, it could mean that there was a change that resulted in a larger number of S3 buckets with a block public access configuration consistently being disabled.

    Figure 17: SageMaker changepoint detector view

    Figure 17: SageMaker changepoint detector view

By now, you should be familiar with the set up and deployment for SageMaker Studio and how you can use Python notebooks to generate machine learning insights for your Security Lake data. You can take what you’ve learned and start to curate specific datasets and data sources within Security Lake, create a time series, detect trends, and identify outliers and change points. By doing so, you can answer a variety of security-related questions such as:

  • CloudTrail

    Is there a large volume of Amazon S3 download or copy commands to an external resource? Are you seeing a large volume of S3 delete object commands? Is it possible there’s a ransomware event going on?

  • VPC Flow Logs

    Is there an increase in the number of requests from your VPC to external IPs? Is there an increase in the number of requests from your VPC to your on-premises CIDR? Is there a possibility of internal or external data exfiltration occurring?

  • Route53

    Which resources are making DNS requests that they haven’t typically made within the last 30–45 days? When did it start? Is there a potential command and control session occurring on an Amazon Elastic Compute Cloud (Amazon EC2) instance?

It’s important to note that this isn’t a solution to replace Amazon GuardDuty, which uses foundational data sources to detect communication with known malicious domains and IP addresses and identify anomalous behavior, or Amazon Detective, which provides customers with prebuilt data aggregations, summaries, and visualizations to help security teams conduct faster and more effective investigations. One of the main benefits of using Security Lake and SageMaker Studio is the ability to interactively create and tailor machine learning insights specific to your AWS environment and workloads.

Clean up

If you deployed the SageMaker machine learning insights solution by using the Launch Stack button in the AWS Management Console or the CloudFormation template sagemaker_ml_insights_cfn, do the following to clean up:

  1. In the CloudFormation console for the account and Region where you deployed the solution, choose the SageMakerML stack.
  2. Choose the option to Delete the stack.

If you deployed the solution by using the AWS CDK, run the command cdk destroy.

Conclusion

Amazon Security Lake gives you the ability to normalize and centrally store your security data from various log sources to help you analyze, visualize, and correlate appropriate security logs. You can then use this data to increase your overall security posture by implementing additional security guardrails or take appropriate remediation actions within your AWS environment.

In this blog post, you learned how you can use SageMaker to generate machine learning insights for your Security Hub findings in Security Lake. Although the example solution focuses on a single data source within Security Lake, you can expand the notebooks to incorporate other native or custom data sources in Security Lake.

There are many different use-cases for Security Lake that can be tailored to fit your AWS environment. Take a look at this blog post to learn how you can ingest, transform and deliver Security Lake data to Amazon OpenSearch to help your security operations team quickly analyze security data within your AWS environment. In supported Regions, new Security Lake account holders can try the service free for 15 days and gain access to its features.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Jonathan Nguyen

Jonathan Nguyen

Jonathan is a Principal Security Architect at AWS. His background is in AWS security, with a focus on threat detection and incident response. He helps enterprise customers develop a comprehensive AWS security strategy, deploy security solutions at scale, and train customers on AWS security best practices.

Madhunika Reddy Mikkili

Madhunika Reddy Mikkili

Madhunika is a Data and Machine Learning Engineer with the AWS Professional Services Shared Delivery Team. She is passionate about helping customers achieve their goals through the use of data and machine learning insights. Outside of work, she loves traveling and spending time with family and friends.

Generate security insights from Amazon Security Lake data using Amazon OpenSearch Ingestion

Post Syndicated from Muthu Pitchaimani original https://aws.amazon.com/blogs/big-data/generate-security-insights-from-amazon-security-lake-data-using-amazon-opensearch-ingestion/

Amazon Security Lake centralizes access and management of your security data by aggregating security event logs from AWS environments, other cloud providers, on premise infrastructure, and other software as a service (SaaS) solutions. By converting logs and events using Open Cybersecurity Schema Framework, an open standard for storing security events in a common and shareable format, Security Lake optimizes and normalizes your security data for analysis using your preferred analytics tool.

Amazon OpenSearch Service continues to be a tool of choice by many enterprises for searching and analyzing large volume of security data. In this post, we show you how to ingest and query Amazon Security Lake data with Amazon OpenSearch Ingestion, a serverless, fully managed data collector with configurable ingestion pipelines. Using OpenSearch Ingestion to ingest data into your OpenSearch Service cluster, you can derive insights quicker for time sensitive security investigations. You can respond swiftly to security incidents, helping you protect your business critical data and systems.

Solution overview

The following architecture outlines the flow of data from Security Lake to OpenSearch Service.

The workflow contains the following steps:

  1. Security Lake persists OCSF schema normalized data in an Amazon Simple Storage Service (Amazon S3) bucket determined by the administrator.
  2. Security Lake notifies subscribers through the chosen subscription method, in this case Amazon Simple Queue Service (Amazon SQS).
  3. OpenSearch Ingestion registers as a subscriber to get the necessary context information.
  4. OpenSearch Ingestion reads Parquet formatted security data from the Security Lake managed Amazon S3 bucket and transforms the security logs into JSON documents.
  5. OpenSearch Ingestion ingests this OCSF compliant data into OpenSearch Service.
  6. Download and import provided dashboards to analyze and gain quick insights into the security data.

OpenSearch Ingestion provides a serverless ingestion framework to easily ingest Security Lake data into OpenSearch Service with just a few clicks.

Prerequisites

Complete the following prerequisite steps:

  1. Create an Amazon OpenSearch Service domain. For instructions, refer to Creating and managing Amazon OpenSearch Service domains.
  2. You must have access to the AWS account in which you wish to set up this solution.

Set up Amazon Security Lake

In this section, we present the steps to set up Amazon Security Lake, which includes enabling the service and creating a subscriber.

Enable Amazon Security Lake

Identify the account in which you want to activate Amazon Security Lake. Note that for accounts that are part of organizations, you have to designate a delegated Security Lake administrator from your management account. For instructions, refer to Managing multiple accounts with AWS Organizations.

  1. Sign in to the AWS Management Console using the credentials of the delegated account.
  2. On the Amazon Security Lake console, choose your preferred Region, then choose Get started.

Amazon Security Lake collects log and event data from a variety of sources and across your AWS accounts and Regions.

Now you’re ready to enable Amazon Security Lake.

  1. You can either select All log and event sources or choose specific logs by selecting Specific log and event sources.
  2. Data is ingested from all Regions. The recommendation is to select All supported regions so activities are logged for accounts that you might not frequently use as well. However, you also have the option to select Specific Regions.
  3. For Select accounts, you can select the accounts in which you want Amazon Security Lake enabled. For this post, we select All accounts.

  1. You’re prompted to either create a new AWS Identity and Access Management (IAM) role or use an existing IAM role. This gives required permissions to Amazon Security Lake to collect the logs and events. Choose the option appropriate for your situation.
  2. Choose Next.
  3. Optionally, specify the Amazon S3 storage class for the data in Amazon Security Lake. For more information, refer to Lifecycle management in Security Lake.
  4. Choose Next.
  5. Review the details and create the data lake.

Create an Amazon Security Lake subscriber

To access and consume data in your Security Lake managed Amazon S3 buckets, you must set up a subscriber.

Complete the following steps to create your subscriber:

  1. On the Amazon Security Lake console, choose Summary in the navigation pane.

Here, you can see the number of Regions selected.

  1. Choose Create subscriber.

A subscriber consumes logs and events from Amazon Security Lake. In this case, the subscriber is OpenSearch Ingestion, which consumes security data and ingests it into OpenSearch Service.

  1. For Subscriber name, enter OpenSearchIngestion.
  2. Enter a description.
  3. Region is automatically populated based on the current selected Region.
  4. For Log and event sources, select whether the subscriber is authorized to consume all log and event sources or specific log and event sources.
  5. For Data access method, select S3.
  6. For Subscriber credentials, enter the subscriber’s <AWS account ID> and OpenSearchIngestion-<AWS account ID>.
  7. For Notification details, select SQS queue.

This prompts Amazon Security Lake to create an SQS queue that the subscriber can poll for object notifications.

  1. Choose Create.

Install templates and dashboards for Amazon Security Lake data

Your subscriber for OpenSearch Ingestion is now ready. Before you configure OpenSearch Ingestion to process the security data, let’s configure an OpenSearch sink (destination to write data) with index templates and dashboards.

Index templates are predefined mappings for security data that selects the correct OpenSearch field types for corresponding Open Cybersecurity Schema Framework (OCSF) schema definition. In addition, index templates also contain index-specific settings for a particular index patterns. OCSF classifies security data into different categories such as system activity, findings, identity and access management, network activity, application activity and discovery.

Amazon Security Lake publishes events from four different AWS sources: AWS CloudTrail with subsets for AWS Lambda and Amazon Simple Storage Service (Amazon S3), Amazon Virtual Private Cloud(Amazon VPC) Flow Logs, Amazon Route 53, and AWS Security Hub. The following table details the event sources and their corresponding OCSF categories and OpenSearch index templates.

Amazon Security Lake Source OCSF Category ID OpenSearch Index Pattern
CloudTrail (Lambda and Amazon S3 API subsets) 3005 ocsf-3005*
VPC Flow Logs 4001 ocsf-4001*
Route 53 4003 ocsf-4003*
Security Hub 2001 ocsf-2001*

To easily identify OpenSearch indices containing Security Lake data, we recommend following a structured index naming pattern that includes the log category and its OCSF defined class in the name of the index. An example is provided below

ocsf-cuid-${/class_uid}-${/metadata/product/name}-${/class_name}-%{yyyy.MM.dd}

Complete the following steps to install the index templates and dashboards for your data:

  1. Download the component_templates.zip and index_templates.zip files and unzip them on your local device.

Component templates are composable modules with settings, mappings, and aliases that can be shared and used by index templates.

  1. Upload the component templates before the index templates. For example, the following Linux command line shows how to use the OpenSearch _component_template API to upload to your OpenSearch Service domain (change the domain URL and the credentials to appropriate values for your environment):
    ls component_templates | awk -F'_body' '{print $1}' | xargs -I{} curl  -u adminuser:password -X PUT -H 'Content-Type: application/json' -d @component_templates/{}_body.json https://my-opensearch-domain.es.amazonaws.com/_component_template/{}

  2. Once the component templates are successfully uploaded, proceed to upload the index templates:
    ls index_templates | awk -F'_body' '{print $1}' | xargs -I{} curl  -uadminuser:password -X PUT -H 'Content-Type: application/json' -d @index_templates/{}_body.json https://my-opensearch-domain.es.amazonaws.com/_index_template/{}

  3. Verify whether the index templates and component templates are uploaded successfully, by navigating to OpenSearch Dashboards, choose the hamburger menu, then choose Index Management.

  1. In the navigation pane, choose Templates to see all the OCSF index templates.

  1. Choose Component templates to verify the OCSF component templates.

  1. After successfully uploading the templates, download the pre-built dashboards and other components required to visualize the Security Lake data in OpenSearch indices.
  2. To upload these to OpenSearch Dashboards, choose the hamburger menu, and under Management, choose Stack Management.
  3. In the navigation pane, choose Saved Objects.

  1. Choose Import.

  1. Choose Import, navigate to the downloaded file, then choose Import.

  1. Confirm the dashboard objects are imported correctly, then choose Done.

All the necessary index and component templates, index patterns, visualizations, and dashboards are now successfully installed.

Configure OpenSearch Ingestion

Each OpenSearch Ingestion pipeline will have a single data source with one or more sub-pipelines, processors, and sink. In our solution, Security Lake managed Amazon S3 is the source and your OpenSearch Service cluster is the sink. Before setting up OpenSearch Ingestion, you need to create the following IAM roles and set up the required permissions:

  • Pipeline role – Defines permissions to read from Amazon Security Lake and write to the OpenSearch Service domain
  • Management role – Defines permission to allow the user to create, update, delete, validate the pipeline and perform other management operations

The following figure shows the permissions and roles you need and how they interact with the solution services.

Before you create an OpenSearch Ingestion pipeline, the principal or the user creating the pipeline must have permissions to perform management actions on a pipeline (create, update, list, and validate). Additionally, the principal must have permission to pass the pipeline role to OpenSearch Ingestion. If you are performing these operations as a non-administrator, add the following permissions to the user creating the pipelines:

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Resource": "*",
			"Action": [
				"osis:CreatePipeline",
				"osis:ListPipelineBlueprints",
				"osis:ValidatePipeline",
				"osis:UpdatePipeline"
			]
		},
		{
			"_comment": "Replace {your-account-id} with your AWS account ID",
			"Resource": [
				"arn:aws:iam::{your-account-id}:role/pipeline-role"
			],
			"Effect": "Allow",
			"Action": [
				"iam:PassRole"
			]
		}
	]
}

Configure a read policy for the pipeline role

Security Lake subscribers only have access to the source data in the Region you selected when you created the subscriber. To give a subscriber access to data from multiple Regions, refer to Managing multiple Regions. To create a policy for read permissions, you need the name of the Amazon S3 bucket and the Amazon SQS queue created by Security Lake.

Complete the following steps to configure a read policy for the pipeline role:

  1. On the Security Lake console, choose Regions in the navigation pane.
  2. Choose the S3 location corresponding to the Region of the subscriber you created.

  1. Make a note of this Amazon S3 bucket name.

  1. Choose Subscribers in the navigation pane.
  2. Choose the subscriber OpenSearchIngestion that you created earlier.

  1. Take note of the Amazon SQS queue ARN under Subscription endpoint.

  1. On the IAM console, choose Policies in the navigation pane.
  2. Choose Create policy.
  3. In the Specify permissions section, choose JSON to open the policy editor.
  4. Remove the default policy and enter the following code (replace the S3 bucket and SQS queue ARN with the corresponding values):
    {
    	"Version": "2012-10-17",
    	"Statement": [
    		{
    			"Sid": "ReadFromS3",
    			"Effect": "Allow",
    			"Action": "s3:GetObject",
    			"Resource": "arn:aws:s3:::{bucket-name}/*"
    		},
    		{
    			"Sid": "ReceiveAndDeleteSqsMessages",
    			"Effect": "Allow",
    			"Action": [
    				"sqs:DeleteMessage",
    				"sqs:ReceiveMessage"
    			],
    			"_comment": "Replace {your-account-id} with your AWS account ID",
    			"Resource": "arn:aws:sqs:{region}:{your-account-id}:{sqs-queue-name}"
    		}
    	]
    }

  5. Choose Next.
  6. For policy name, enter read-from-securitylake.
  7. Choose Create policy.

You have successfully created the policy to read data from Security Lake and receive and delete messages from the Amazon SQS queue.

The complete process is shown below.

Configure a write policy for the pipeline role

We recommend using fine-grained access control (FGAC) with OpenSearch Service. When you use FGAC, you don’t have to use a domain access policy; you can skip the rest of this section and proceed to creating your pipeline role with the necessary permissions. If you use a domain access policy, you need to create a second policy (for this post, we call it write-to-opensearch) as an added step to the steps in the previous section. Use the following policy code:

{
	"Version": "2012-10-17",
	"Statement": [
		{
			"Effect": "Allow",
			"Action": "es:DescribeDomain",
			"Resource": "arn:aws:es:*:{your-account-id}:domain/*"
		},
		{
			"Effect": "Allow",
			"Action": "es:ESHttp*",
			"Resource": "arn:aws:es:*:{your-account-id}:domain/{domain-name}/*"
		}
	]
}

If the configured role has permissions to access Amazon S3 and Amazon SQS across accounts, OpenSearch Ingestion can ingest data across accounts.

Create the pipeline role with necessary permissions

Now that you have created the policies, you can create the pipeline role. Complete the following steps:

  1. On the IAM console, choose Roles in the navigation pane.
  2. Choose Create role.
  3. For Use cases for other AWS services, select OpenSearch Ingestion pipelines.
  4. Choose Next.
  5. Search for and select the policy read-from-securitylake.
  6. Search for and select the policy write-to-opensearch (if you’re using a domain access policy).
  7. Choose Next.
  8. For Role Name, enter pipeline-role.
  9. Choose Create.

Keep note of the role name; you will be using it while configuring opensearch-pipeline.

Now you can map the pipeline role to an OpenSearch backend role if you’re using FGAC. You can map the ingestion role to one of predefined roles or create your own with necessary permissions. For example, all_access is a built-in role that grants administrative permission to all OpenSearch functions. When deploying to a production environment, make sure to use a role with just enough permissions to write to your Amazon OpenSearch Service domain.

Create the OpenSearch Ingestion pipeline

In this section, you use the pipeline role you created to create an OpenSearch Ingestion pipeline. Complete the following steps:

  1. On the OpenSearch Service console, choose OpenSearch Ingestion in the navigation pane.
  2. Choose Create pipeline.
  3. For Pipeline name, enter a name, such as security-lake-osi.
  4. In the Pipeline configuration section, choose Configuration blueprints and choose AWS-SecurityLakeS3ParquetOCSFPipeline.

  1. Under source, update the following information:
    1. Update the queue_url in the sqs section. (This is the SQS queue that Amazon Security Lake created when you created a subscriber. To get the URL, navigate to the Amazon SQS console and look for the queue ARN created with the format AmazonSecurityLake-abcde-Main-Queue.)
    2. Enter the Region to use for aws credentials.

  1. Under sink, update the following information:
    1. Replace the hosts value in the OpenSearch section with the Amazon OpenSearch Service domain endpoint.
    2. For sts_role_arn, enter the ARN of pipeline-role.
    3. Set region as us-east-1.
    4. For index, enter the index name that was defined in the template created in the previous section ("ocsf-cuid-${/class_uid}-${/metadata/product/name}-${/class_name}-%{yyyy.MM.dd}").
  2. Choose Validate pipeline to verify the pipeline configuration.

If the configuration is valid, a successful validation message appears; you can now proceed to the next steps.

  1. Under Network, select Public for this post. Our recommendation is to select VPC access for an inherent layer of security.
  2. Choose Next.
  3. Review the details and create the pipeline.

When the pipeline is active, you should see the security data ingested into your Amazon OpenSearch Service domain.

Visualize the security data

After OpenSearch Ingestion starts writing your data into your OpenSearch Service domain, you should be able to visualize the data using the pre-built dashboards you imported earlier. Navigate to dashboards and choose any one of the installed dashboards.

For example, choosing DNS Activity will give you dashboards of all DNS activity published in Amazon Security Lake.

This dashboard shows the top DNS queries by account and hostname. It also shows the number of queries per account. OpenSearch Dashboards are flexible; you can add, delete, or update any of these visualizations to suit your organization and business needs.

Clean up

To avoid unwanted charges, delete the OpenSearch Service domain and OpenSearch Ingestion pipeline, and disable Amazon Security Lake.

Conclusion

In this post, you successfully configured Amazon Security Lake to send security data from different sources to OpenSearch Service through serverless OpenSearch Ingestion. You installed pre-built templates and dashboards to quickly get insights from the security data. Refer to Amazon OpenSearch Ingestion to find additional sources from which you can ingest data. For additional use cases, refer to Use cases for Amazon OpenSearch Ingestion.


About the authors

Muthu Pitchaimani is a Search Specialist with Amazon OpenSearch Service. He builds large-scale search applications and solutions. Muthu is interested in the topics of networking and security, and is based out of Austin, Texas.

Aish Gunasekar is a Specialist Solutions architect with a focus on Amazon OpenSearch Service. Her passion at AWS is to help customers design highly scalable architectures and help them in their cloud adoption journey. Outside of work, she enjoys hiking and baking.

Jimish Shah is a Senior Product Manager at AWS with 15+ years of experience bringing products to market in log analytics, cybersecurity, and IP video streaming. He’s passionate about launching products that offer delightful customer experiences, and solve complex customer problems. In his free time, he enjoys exploring cafes, hiking, and taking long walks.

Automate the archive and purge data process for Amazon RDS for PostgreSQL using pg_partman, Amazon S3, and AWS Glue

Post Syndicated from Anand Komandooru original https://aws.amazon.com/blogs/big-data/automate-the-archive-and-purge-data-process-for-amazon-rds-for-postgresql-using-pg_partman-amazon-s3-and-aws-glue/

The post Archive and Purge Data for Amazon RDS for PostgreSQL and Amazon Aurora with PostgreSQL Compatibility using pg_partman and Amazon S3 proposes data archival as a critical part of data management and shows how to efficiently use PostgreSQL’s native range partition to partition current (hot) data with pg_partman and archive historical (cold) data in Amazon Simple Storage Service (Amazon S3). Customers need a cloud-native automated solution to archive historical data from their databases. Customers want the business logic to be maintained and run from outside the database to reduce the compute load on the database server. This post proposes an automated solution by using AWS Glue for automating the PostgreSQL data archiving and restoration process, thereby streamlining the entire procedure.

AWS Glue is a serverless data integration service that makes it easier to discover, prepare, move, and integrate data from multiple sources for analytics, machine learning (ML), and application development. There is no need to pre-provision, configure, or manage infrastructure. It can also automatically scale resources to meet the requirements of your data processing job, providing a high level of abstraction and convenience. AWS Glue integrates seamlessly with AWS services like Amazon S3, Amazon Relational Database Service (Amazon RDS), Amazon Redshift, Amazon DynamoDB, Amazon Kinesis Data Streams, and Amazon DocumentDB (with MongoDB compatibility) to offer a robust, cloud-native data integration solution.

The features of AWS Glue, which include a scheduler for automating tasks, code generation for ETL (extract, transform, and load) processes, notebook integration for interactive development and debugging, as well as robust security and compliance measures, make it a convenient and cost-effective solution for archival and restoration needs.

Solution overview

The solution combines PostgreSQL’s native range partitioning feature with pg_partman, the Amazon S3 export and import functions in Amazon RDS, and AWS Glue as an automation tool.

The solution involves the following steps:

  1. Provision the required AWS services and workflows using the provided AWS Cloud Development Kit (AWS CDK) project.
  2. Set up your database.
  3. Archive the older table partitions to Amazon S3 and purge them from the database with AWS Glue.
  4. Restore the archived data from Amazon S3 to the database with AWS Glue when there is a business need to reload the older table partitions.

The solution is based on AWS Glue, which takes care of archiving and restoring databases with Availability Zone redundancy. The solution is comprised of the following technical components:

  • An Amazon RDS for PostgreSQL Multi-AZ database runs in two private subnets.
  • AWS Secrets Manager stores database credentials.
  • An S3 bucket stores Python scripts and database archives.
  • An S3 Gateway endpoint allows Amazon RDS and AWS Glue to communicate privately with the Amazon S3.
  • AWS Glue uses a Secrets Manager interface endpoint to retrieve database secrets from Secrets Manager.
  • AWS Glue ETL jobs run in either private subnet. They use the S3 endpoint to retrieve Python scripts. The AWS Glue jobs read the database credentials from Secrets Manager to establish JDBC connections to the database.

You can create an AWS Cloud9 environment in one of the private subnets available in your AWS account to set up test data in Amazon RDS. The following diagram illustrates the solution architecture.

Solution Architecture

Prerequisites

For instructions to set up your environment for implementing the solution proposed in this post, refer to Deploy the application in the GitHub repo.

Provision the required AWS resources using AWS CDK

Complete the following steps to provision the necessary AWS resources:

  1. Clone the repository to a new folder on your local desktop.
  2. Create a virtual environment and install the project dependencies.
  3. Deploy the stacks to your AWS account.

The CDK project includes three stacks: vpcstack, dbstack, and gluestack, implemented in the vpc_stack.py, db_stack.py, and glue_stack.py modules, respectively.

These stacks have preconfigured dependencies to simplify the process for you. app.py declares Python modules as a set of nested stacks. It passes a reference from vpcstack to dbstack, and a reference from both vpcstack and dbstack to gluestack.

gluestack reads the following attributes from the parent stacks:

  • The S3 bucket, VPC, and subnets from vpcstack
  • The secret, security group, database endpoint, and database name from dbstack

The deployment of the three stacks creates the technical components listed earlier in this post.

Set up your database

Prepare the database using the information provided in Populate and configure the test data on GitHub.

Archive the historical table partition to Amazon S3 and purge it from the database with AWS Glue

The “Maintain and Archive” AWS Glue workflow created in the first step consists of two jobs: “Partman run maintenance” and “Archive Cold Tables.”

The “Partman run maintenance” job runs the Partman.run_maintenance_proc() procedure to create new partitions and detach old partitions based on the retention setup in the previous step for the configured table. The “Archive Cold Tables” job identifies the detached old partitions and exports the historical data to an Amazon S3 destination using aws_s3.query_export_to_s3. In the end, the job drops the archived partitions from the database, freeing up storage space. The following screenshot shows the results of running this workflow on demand from the AWS Glue console.

Archive job run result

Additionally, you can set up this AWS Glue workflow to be triggered on a schedule, on demand, or with an Amazon EventBridge event. You need to use your business requirement to select the right trigger.

Restore archived data from Amazon S3 to the database

The “Restore from S3” Glue workflow created in the first step consists of one job: “Restore from S3.”

This job initiates the run of the partman.create_partition_time procedure to create a new table partition based on your specified month. It subsequently calls aws_s3.table_import_from_s3 to restore the matched data from Amazon S3 to the newly created table partition.

To start the “Restore from S3” workflow, navigate to the workflow on the AWS Glue console and choose Run.

The following screenshot shows the “Restore from S3” workflow run details.

Restore job run result

Validate the results

The solution provided in this post automated the PostgreSQL data archival and restoration process using AWS Glue.

You can use the following steps to confirm that the historical data in the database is successfully archived after running the “Maintain and Archive” AWS Glue workflow:

  1. On the Amazon S3 console, navigate to your S3 bucket.
  2. Confirm the archived data is stored in an S3 object as shown in the following screenshot.
    Archived data in S3
  3. From a psql command line tool, use the \dt command to list the available tables and confirm the archived table ticket_purchase_hist_p2020_01 does not exist in the database.List table result after post archival

You can use the following steps to confirm that the archived data is restored to the database successfully after running the “Restore from S3” AWS Glue workflow.

  1. From a psql command line tool, use the \dt command to list the available tables and confirm the archived table ticket_history_hist_p2020_01 is restored to the database.List table results after restore

Clean up

Use the information provided in Cleanup to clean up your test environment created for testing the solution proposed in this post.

Summary

This post showed how to use AWS Glue workflows to automate the archive and restore process in RDS for PostgreSQL database table partitions using Amazon S3 as archive storage. The automation is run on demand but can be set up to be trigged on a recurring schedule. It allows you to define the sequence and dependencies of jobs, track the progress of each workflow job, view run logs, and monitor the overall health and performance of your tasks. Although we used Amazon RDS for PostgreSQL as an example, the same solution works for Amazon Aurora-PostgreSQL Compatible Edition as well. Modernize your database cron jobs using AWS Glue by using this post and the GitHub repo. Gain a high-level understanding of AWS Glue and its components by using the following hands-on workshop.


About the Authors

Anand Komandooru is a Senior Cloud Architect at AWS. He joined AWS Professional Services organization in 2021 and helps customers build cloud-native applications on AWS cloud. He has over 20 years of experience building software and his favorite Amazon leadership principle is “Leaders are right a lot.”

Li Liu is a Senior Database Specialty Architect with the Professional Services team at Amazon Web Services. She helps customers migrate traditional on-premise databases to the AWS Cloud. She specializes in database design, architecture, and performance tuning.

Neil Potter is a Senior Cloud Application Architect at AWS. He works with AWS customers to help them migrate their workloads to the AWS Cloud. He specializes in application modernization and cloud-native design and is based in New Jersey.

Vivek Shrivastava is a Principal Data Architect, Data Lake in AWS Professional Services. He is a big data enthusiast and holds 14 AWS Certifications. He is passionate about helping customers build scalable and high-performance data analytics solutions in the cloud. In his spare time, he loves reading and finds areas for home automation.

Try semantic search with the Amazon OpenSearch Service vector engine

Post Syndicated from Stavros Macrakis original https://aws.amazon.com/blogs/big-data/try-semantic-search-with-the-amazon-opensearch-service-vector-engine/

Amazon OpenSearch Service has long supported both lexical and vector search, since the introduction of its kNN plugin in 2020. With recent developments in generative AI, including AWS’s launch of Amazon Bedrock earlier in 2023, you can now use Amazon Bedrock-hosted models in conjunction with the vector database capabilities of OpenSearch Service, allowing you to implement semantic search, retrieval augmented generation (RAG), recommendation engines, and rich media search based on high-quality vector search. The recent launch of the vector engine for Amazon OpenSearch Serverless makes it even easier to deploy such solutions.

OpenSearch Service supports a variety of search and relevance ranking techniques. Lexical search looks for words in the documents that appear in the queries. Semantic search, supported by vector embeddings, embeds documents and queries into a semantic high-dimension vector space where texts with related meanings are nearby in the vector space and therefore semantically similar, so that it returns similar items even if they don’t share any words with the query.

We’ve put together two demos on the public OpenSearch Playground to show you the strengths and weaknesses of the different techniques: one comparing textual vector search to lexical search, the other comparing cross-modal textual and image search to textual vector search. With OpenSearch’s Search Comparison Tool, you can compare the different approaches. For the demo, we’re using the Amazon Titan foundation model hosted on Amazon Bedrock for embeddings, with no fine tuning. The dataset consists of a selection of Amazon clothing, jewelry, and outdoor products.

Background

A search engine is a special kind of database, allowing you to store documents and data and then run queries to retrieve the most relevant ones. End-user search queries usually consist of text entered in a search box. Two important techniques for using that text are lexical search and semantic search. In lexical search, the search engine compares the words in the search query to the words in the documents, matching word for word. Only items that have all or most of the words the user typed match the query. In semantic search, the search engine uses a machine learning (ML) model to encode text from the source documents as a dense vector in a high-dimensional vector space; this is also called embedding the text into the vector space. It similarly codes the query as a vector and then uses a distance metric to find nearby vectors in the multi-dimensional space. The algorithm for finding nearby vectors is called kNN (k Nearest Neighbors). Semantic search does not match individual query terms—it finds documents whose vector embedding is near the query’s embedding in the vector space and therefore semantically similar to the query, so the user can retrieve items that don’t have any of the words that were in the query, even though the items are highly relevant.

Textual vector search

The demo of textual vector search shows how vector embeddings can capture the context of your query beyond just the words that compose it.

In the text box at the top, enter the query tennis clothes. On the left (Query 1), there’s an OpenSearch DSL (Domain Specific Language for queries) semantic query using the amazon_products_text_embedding index, and on the right (Query 2), there’s a simple lexical query using the amazon_products_text index. You’ll see that lexical search doesn’t know that clothes can be tops, shorts, dresses, and so on, but semantic search does.

Search Comparison Tool

Compare semantic and lexical results

Similarly, in a search for warm-weather hat, the semantic results find lots of hats suitable for warm weather, whereas the lexical search returns results mentioning the words “warm” and “hat,” all of which are warm hats suitable for cold weather, not warm-weather hats. Similarly, if you’re looking for long dresses with long sleeves, you might search for long long-sleeved dress. A lexical search ends up finding some short dresses with long sleeves and even a child’s dress shirt because the word “dress” appears in the description, whereas the semantic search finds much more relevant results: mostly long dresses with long sleeves, with a couple of errors.

Cross-modal image search

The demo of cross-modal textual and image search shows searching for images using textual descriptions. This works by finding images that are related to your textual descriptions using a pre-production multi-modal embedding. We’ll compare searching for visual similarity (on the left) and textual similarity (on the right). In some cases, we get very similar results.

Search Comparison Tool

Compare image and textual embeddings

For example, sailboat shoes does a good job with both approaches, but white sailboat shoes does much better using visual similarity. The query canoe finds mostly canoes using visual similarity—which is probably what a user would expect—but a mixture of canoes and canoe accessories such as paddles using textual similarity.

If you are interested in exploring the multi-modal model, please reach out to your AWS specialist.

Building production-quality search experiences with semantic search

These demos give you an idea of the capabilities of vector-based semantic vs. word-based lexical search and what can be accomplished by utilizing the vector engine for OpenSearch Serverless to build your search experiences. Of course, production-quality search experiences use many more techniques to improve results. In particular, our experimentation shows that hybrid search, combining lexical and vector approaches, typically results in a 15% improvement in search result quality over lexical or vector search alone on industry-standard test sets, as measured by the NDCG@10 metric (Normalized Discounted Cumulative Gain in the first 10 results). The improvement is because lexical outperforms vector for very specific names of things, and semantic works better for broader queries. For example, in the semantic vs. lexical comparison, the query saranac 146, a brand of canoe, works very well in lexical search, whereas semantic search doesn’t return relevant results. This demonstrates why the combination of semantic and lexical search provides superior results.

Conclusion

OpenSearch Service includes a vector engine that supports semantic search as well as classic lexical search. The examples shown in the demo pages show the strengths and weaknesses of different techniques. You can use the Search Comparison Tool on your own data in OpenSearch 2.9 or higher.

Further information

For further information about OpenSearch’s semantic search capabilities, see the following:


About the author

Stavros Macrakis is a Senior Technical Product Manager on the OpenSearch project of Amazon Web Services. He is passionate about giving customers the tools to improve the quality of their search results.

Introducing AWS Glue crawler and create table support for Apache Iceberg format

Post Syndicated from Sandeep Adwankar original https://aws.amazon.com/blogs/big-data/introducing-aws-glue-crawler-and-create-table-support-for-apache-iceberg-format/

Apache Iceberg is an open table format for large datasets in Amazon Simple Storage Service (Amazon S3) and provides fast query performance over large tables, atomic commits, concurrent writes, and SQL-compatible table evolution. Iceberg has become very popular for its support for ACID transactions in data lakes and features like schema and partition evolution, time travel, and rollback. Iceberg captures metadata information on the state of datasets as they evolve and change over time.

AWS Glue crawlers now support Iceberg tables, enabling you to use the AWS Glue Data Catalog and migrate from other Iceberg catalogs easier. AWS Glue crawlers will extract schema information and update the location of Iceberg metadata and schema updates in the Data Catalog. You can then query the Data Catalog Iceberg tables across all analytics engines and apply AWS Lake Formation fine-grained permissions.

The Iceberg catalog helps you manage a collection of Iceberg tables and tracks the table’s current metadata. Iceberg provides several implementation options for the Iceberg catalog, including the AWS Glue Data Catalog, Hive Metastore, and JDBC catalogs. Customers prefer using or migrating to the AWS Glue Data Catalog because of its integrations with AWS analytical services such as Amazon Athena, AWS Glue, Amazon EMR, and Lake Formation.

With today’s launch, you can create and schedule an AWS Glue crawler to existing Iceberg tables into in the Data Catalog. You can then provide one or multiple S3 paths where the Iceberg tables are located. You have the option to provide the maximum depth of S3 paths that the crawler can traverse. With each crawler run, the crawler inspects each of the S3 paths and catalogs the schema information, such as new tables, deletes, and updates to schemas in the Data Catalog. Crawlers support schema merging across all snapshots and update the latest metadata file location in the Data Catalog that AWS analytical engines can directly use.

Additionally, AWS Glue is launching support for creating new (empty) Iceberg tables in the Data Catalog using the AWS Glue console or AWS Glue CreateTable API. Before the launch, customers who wanted to adopt Iceberg table format were required to generate Iceberg’s metadata.json file on Amazon S3 using PutObject separately in addition to CreateTable. Often, customers have used the create table statement on analytics engines such as Athena, AWS Glue, and so on. The new CreateTable API eliminates the need to create the metadata.json file separately, and automates generating metadata.json based on the given API input. Also, customers who manage deployments using AWS CloudFormation templates can now create Iceberg tables using the CreateTable API. For more details, refer to Creating Apache Iceberg tables.

For accessing the data using Athena, you can also use Lake Formation to secure your Iceberg table using fine-grained access control permissions when you register the Amazon S3 data location with Lake Formation. For source data in Amazon S3 and metadata that is not registered with Lake Formation, access is determined by AWS Identity and Access Management (IAM) permissions policies for Amazon S3 and AWS Glue actions.

Solution overview

For our example use case, a customer uses Amazon EMR for data processing and Iceberg format for the transactional data. They store their product data in Iceberg format on Amazon S3 and host the metadata of their datasets in Hive Metastore on the EMR primary node. The customer wants to make product data accessible to analyst personas for interactive analysis using Athena. Many AWS analytics services don’t integrate natively with Hive Metastore, so we use an AWS Glue crawler to populate the metadata in the AWS Glue Data Catalog. Athena supports Lake Formation permissions on Iceberg tables, so we apply fine-grained access for data access.

We configure the crawler to onboard the Iceberg schema to the Data Catalog and use Lake Formation access control for crawling. We apply Lake Formation grants on the database and crawled table to enable analyst users to query the data and verify using Athena.

After we populate the schema of the existing Iceberg dataset to the Data Catalog, we onboard new Iceberg tables to the Data Catalog and load data into the newly created data using Athena. We apply Lake Formation grants on the database and newly created table to enable analyst users to query the data and verify using Athena.

The following diagram illustrates the solution architecture.

Set up resources with AWS CloudFormation

To set up the solution resources using AWS CloudFormation, complete the following steps:

  1. Log in to the AWS Management Console as IAM administrator.
  2. Choose Launch Stack to deploy a CloudFormation template.
  3. Choose Next.
  4. On the next page, choose Next.
  5. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  6. Choose Create.

The CloudFormation template generates the following resources:

  • VPC, subnet, and security group for the EMR cluster
  • Data lake bucket to store Iceberg table data and metadata
  • IAM roles for the crawler and Lake Formation registration
  • EMR cluster and steps to create an Iceberg table with Hive Metastore
  • Analyst role for data access
  • Athena bucket path for results

  1. When the stack is complete, on the AWS CloudFormation console, navigate to the Resources tab of the stack.
  2. Note down the values of EmrClusterId, DataLakeBucketName, LFRegisterLocationServiceRole, AWSGlueServiceRole, AthenaBucketName, and LFBusinessAnalystRole.
  3. Navigate to the Amazon EMR console and choose the EMR cluster you created.
  4. Navigate to the Steps tab and verify that the steps were run.

This script run creates the database icebergcrawlerblodb using Hive and the Iceberg table product. It uses the Hive Metastore server on Amazon EMR as the metastore and stores the data on Amazon S3.

  1. Navigate to the S3 bucket you created and verify if the data and metadata are created for the Iceberg table.

Some of the resources that this stack deploys incur costs when in use.

Now that the data is on Amazon S3, we can register the bucket with Lake Formation to implement access control and centralize the data governance.

Set up Lake Formation permissions

To use the AWS Glue Data Catalog in Lake Formation, complete the following steps to update the Data Catalog settings to use Lake Formation permissions to control Data Catalog resources instead of IAM-based access control:

  1. Sign in to the Lake Formation console as admin.
    • If this is the first time accessing the Lake Formation console, add yourself as the data lake administrator.
  2. In the navigation pane, under Data catalog, choose Settings.
  3. Deselect Use only IAM access control for new databases.
  4. Deselect Use only IAM access control for new tables in new databases.
  5. Choose Version 3 for Cross account version settings.
  6. Choose Save.

Now you can set up Lake Formation permissions.

Register the data lake S3 bucket with Lake Formation

To register the data lake S3 bucket, complete the following steps:

  1. On the Lake Formation console, in the navigation pane, choose Data lake locations.
  2. Choose Register location.
  3. For Amazon S3 path, enter the data lake bucket path.
  4. For IAM role, choose the role noted from the CloudFormation template for LFRegisterLocationServiceRole.
  5. Choose Register location.

Grant crawler role access to the data location

To grant access to the crawler, complete the following steps:

  1. On the Lake Formation console, in the navigation pane, choose Data locations.
  2. Choose Grant.
  3. For IAM users and roles, choose the role for the crawler.
  4. For Storage locations, enter the data lake bucket path.
  5. Choose Grant.

Create database and grant access to the crawler role

Complete the following steps to create your database and grant access to the crawler role:

  1. On the Lake Formation console, in the navigation pane, choose Databases.
  2. Choose Create database.
  3. Provide the name icebergcrawlerblogdb for the database.
  4. Make sure Use only IAM access control for new tables in this database option is not selected.
  5. Choose Create database.
  6. On the Action menu, choose Grant.
  7. For IAM users and roles, choose the role for the crawler.
  8. Leave the database specified as icebergcrawlerblogdb.
  9. Select Create table, Describe, and Alter for Database permissions.
  10. Choose Grant.

Configure the crawler for Iceberg

To configure your crawler for Iceberg, complete the following steps:

  1. On the AWS Glue console, in the navigation pane, choose Crawlers.
  2. Choose Create crawler.
  3. Enter a name for the crawler. For this post, we use icebergcrawler.
  4. Under Data source configuration, choose Add data source.
  5. For Data source, choose Iceberg.
  6. For S3 path, enter s3://<datalakebucket>/icebergcrawlerblogdb.db/.
  7. Choose Add a Iceberg data source.

Support for Iceberg tables is available through CreateCrawler and UpdateCrawler APIs and adding the additional IcebergTarget as a target, with the following properties:

  • connectionId – If your Iceberg tables are stored in buckets that require VPC authorization, you can set your connection properties here
  • icebergTables – This is an array of icebergPaths strings, each indicating the folder with which the metadata files for an Iceberg table resides

See the following code:

{
    "IcebergTarget": {
        "connectionId": "iceberg-connection-123",
        "icebergMetaDataPaths": [
            "s3://bucketA/",
            "s3://bucketB/",
            "s3://bucket3/financedb/financetable/"
        ]
        "exclusions": ["departments/**", "employees/folder/**"]
        "maximumDepth": 5
    }
}
  1. Choose Next.
  2. For Existing IAM role, enter the crawler role created by the stack.
  3. Under Lake Formation configuration, select Use Lake Formation credentials for crawling S3 data source.
  4. Choose Next.
  5. Under Set output and scheduling, specify the target database as icebergcrawlerblogdb.
  6. Choose Next.
  7. Choose Create crawler.
  8. Run the crawler.

During each crawl, for each icebergTable path provided, the crawler calls the Amazon S3 List API to find the most recent metadata file under that Iceberg table metadata folder and updates the metadata_location parameter to the latest manifest file.

The following screenshot shows the details after a successful run.

The crawler was able to crawl the S3 data source and successfully populate the schema for Iceberg data in the Data Catalog.

You can now start using the Data Catalog as your primary metastore and create new Iceberg tables directly in the Data Catalog or using the createtable API.

Create a new Iceberg table

To create an Iceberg table in the Data Catalog using the console, complete the steps in this section. Alternatively, you can use a CloudFormation template to create an Iceberg table using the following code:

Type: AWS::Glue::Table
Properties: 
  CatalogId:"<account_id>"
  DatabaseName:"icebergcrawlerblogdb"
  TableInput:
    Name: "product_details"
    StorageDescriptor:
       Columns:
         - Name: "product_id"
           Type: "string"
         - Name: "manufacture_name"
           Type: "string"
         - Name: "product_rating"
           Type: "int"
       Location: "s3://<datalakebucket>/icebergcrawlerblogdb.db/"
    TableType: "EXTERNAL_TABLE"
  OpenTableFormatInput:
    IcebergInput:
      MetadataOperation: "CREATE"
      Version: "2"

Grant the IAM role access to the data location

First, grant the IAM role access to the data location:

  1. On the Lake Formation console, in the navigation pane, choose Data locations.
  2. Choose Grant.
  3. Select Admin IAM role for IAM users and roles.
  4. For Storage location, enter the data lake bucket path.
  5. Choose Grant.

Create the Iceberg table

Complete the following steps to create the Iceberg table:

  1. On the Lake Formation console, in the navigation pane, choose Tables.
  2. Choose Create table.
  3. For Name, enter product_details.
  4. Choose icebergcrawlerblogdb for Database.
  5. Select Apache Iceberg table for Table format.
  6. Provide the path for <datalakebucket>/icebergcrawlerblogdb.db/ for Table location.
  7. Provide the following schema and choose Upload schema:
    [
         {
             "Name": "product_id",
             "Type": "string"
         },
         {
             "Name": "manufacture_name",
             "Type": "string"
         },
         {
             "Name": "product_rating",
             "Type": "int"
         }
     ]

  8. Choose Submit to create the table.

Add a record to the new Iceberg table

Complete the following steps to add a record to the Iceberg table:

  1. On the Athena console, navigate to the query editor.
  2. Choose Edit settings to configure the Athena query results bucket using the value noted from the CloudFormation output for AthenaBucketName.
  3. Choose Save.
  4. Run the following query to add a record to the table:
    insert into icebergcrawlerblogdb.product_details values('00001','ABC Company',10)

Configure Lake Formation permissions on the Iceberg table in the Data Catalog

Athena supports Lake Formation permission on Iceberg tables, so for this post, we show you how to set up fine-grained access on the tables and query them using Athena.

Now the data lake admin can delegate permissions on the database and table to the LFBusinessAnalystRole-IcebergBlogIAM role via the Lake Formation console.

Grant the role access to the database and describe permissions

To grant the LFBusinessAnalystRole-IcebergBlogIAM role access to the database with describe permissions, complete the following steps:

  1. On the Lake Formation console, under Permissions in the navigation pane, choose Data lake permissions.
  2. Choose Grant
  3. Under Principals, select IAM users and roles.
  4. Choose the IAM role LFBusinessAnalystRole-IcebergBlog.
  5. Under LF-Tags or catalog resources, choose icebergcrawlerblogdb for Databases.
  6. Select Describe for Database permissions.
  7. Choose Grant to apply the permissions.

Grant column access to the role

Next, grant column access to the LFBusinessAnalystRole-IcebergBlogIAM role:

  1. On the Lake Formation console, under Permissions in the navigation pane, choose Data lake permissions.
  2. Choose Grant.
  3. Under Principals, select IAM users and roles.
  4. Choose the IAM role LFBusinessAnalystRole-IcebergBlog.
  5. Under LF-Tags or catalog resources, choose icebergcrawlerblogdb for Databases and product for Tables.
  6. Choose Select for Table permissions.
  7. Under Data permissions, select Column-based access.
  8. Select Include columns and choose product_name and price.
  9. Choose Grant to apply the permissions.

Grant table access to the role

Lastly, grant table access to the LFBusinessAnalystRole-IcebergBlogIAM role:

  1. On the Lake Formation console, under Permissions in the navigation pane, choose Data lake permissions.
  2. Choose Grant.
  3. Under Principals, select IAM users and roles.
  4. Choose the IAM role LFBusinessAnalystRole-IcebergBlog.
  5. Under LF-Tags or catalog resources, choose icebergcrawlerblogdb for Databases and product_details for Tables.
  6. Choose Select and Describe for Table permissions.
  7. Choose Grant to apply the permissions.

Verify the tables using Athena

To verify the tables using Athena, switch to LFBusinessAnalystRole-IcebergBlogrole and complete the following steps:

  1. On the Athena console, navigate to the query editor.
  2. Choose Edit settings to configure the Athena query results bucket using the value noted from the CloudFormation output for AthenaBucketName.
  3. Choose Save.
  4. Run the queries on product and product_details to validate access.

The following screenshot shows column permissions on product.

The following screenshot shows table permissions on product_details.

We have successfully crawled the Iceberg dataset created from Hive Metastore with data on Amazon S3 and created an AWS Glue Data Catalog table with the schema populated. We registered the data lake bucket with Lake Formation and enabled crawling access to the data lake using Lake Formation permissions. We granted Lake Formation permissions on the database and table to the analyst user and validated access to the data using Athena.

Clean up

To avoid unwanted charges to your AWS account, delete the AWS resources:

  1. Sign in to the CloudFormation console as the IAM admin used for creating the CloudFormation stack.
  2. Delete the CloudFormation stack you created.

Conclusion

With the support for Iceberg crawlers, you can quickly move to using the AWS Glue Data Catalog as your primary Iceberg table catalog. You can automatically register Iceberg tables into the Data Catalog by running an AWS Glue crawler, which doesn’t require any DDL or manual schema definition. You can start building your serverless transactional data lake on AWS using the AWS Glue crawler, create a new table using the Data Catalog, and utilize Lake Formation fine-grained access controls for querying Iceberg tables formats by Athena.

Refer to Working with other AWS services for Lake Formation support for Iceberg tables across various AWS analytical services.

Special thanks to everyone who contributed to this crawler and createtable feature launch: Theo Xu, Kyle Duong, Anshuman Sharma, Atreya Srivathsan, Eric Wu, Jack Ye, Himani Desai, Atreya Srivathsan, Masoud Shahamiri and Sachet Saurabh.

If you have questions or suggestions, submit them in the comments section.


About the authors

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Srividya Parthasarathy is a Senior Big Data Architect on the AWS Lake Formation team. She enjoys building data mesh solutions and sharing them with the community.

Mahesh Mishra is a Principal Product Manager with AWS Lake Formation team. He works with many of AWS largest customers on emerging technology needs, and leads several data and analytics initiatives within AWS including strong support for Transactional Data Lakes.

Implement a serverless CDC process with Apache Iceberg using Amazon DynamoDB and Amazon Athena

Post Syndicated from Vijay Velpula original https://aws.amazon.com/blogs/big-data/implement-a-serverless-cdc-process-with-apache-iceberg-using-amazon-dynamodb-and-amazon-athena/

Apache Iceberg is an open table format for very large analytic datasets. Iceberg manages large collections of files as tables, and it supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. The Iceberg specification allows seamless table evolution such as schema and partition evolution, and its design is optimized for usage on Amazon Simple Storage Service (Amazon S3). Iceberg also helps guarantee data correctness under concurrent write scenarios.

Most businesses store their critical data in a data lake, where you can bring data from various sources to a centralized storage. Change Data Capture (CDC) in the context of a data lake refers to the process of capturing and propagating changes made to source data. Source systems often lack the capability to publish data that is modified or changed. This requires data pipelines to consume full load datasets every day, increasing the data processing duration and also the storage cost. If the source is tabular format, then there are mechanisms to identify the data changes easily. However, the complexity increases if the data is in semi-structured format and propagating changes made to source data into the data lake in near-real-time.

This post presents a solution to handle incoming semi-structured datasets from source systems and effectively determine changed records and load them into Iceberg tables. With this approach, we will not only use Athena to query data source files in Amazon S3, but also achieve ACID compliance.

Solution overview

We demonstrate this solution with an end-to-end serverless CDC process. We use a sample JSON file as input to Amazon DynamoDB. We identify changed records by utilizing Amazon DynamoDB Streams and AWS Lambda to update the data lake with changed records. We then utilize an Iceberg table to demonstrate CDC functionality for a sample employee dataset. This data represents employee details such as name, address, date joined, and other fields.

The architecture is implemented as follows:

  1. Source systems ingest a semi-structured (JSON) dataset into a DynamoDB table.
  2. The DynamoDB table stores the semi-structured dataset, and these tables have DynamoDB Streams enabled. DynamoDB Streams helps identify if the incoming data is new, modified, or deleted based on the keys defined and delivers the ordered messages to a Lambda function.
  3. For every stream, the Lambda function parses the stream and builds the dynamic DML SQL statements.
  4. The constructed DML SQL statements are run on the corresponding Iceberg tables to reflect the changes.

The following diagram illustrates this workflow.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Deploy the solution

For this solution, we provide a CloudFormation template that sets up the services included in the architecture, to enable repeatable deployments.

Note : – Deploying the CloudFormation stack in your account incurs AWS usage charges.

To deploy the solution, complete the following steps:

  1. Choose Launch Stack to launch the CloudFormation stack.
  2. Enter a stack name.
  3. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.

After the CloudFormation stack deployment is complete, navigate to AWS CloudFormation console to note the following resources on the Outputs tab:

  • Data lake S3 bucketiceberg-cdc-xxxxx-us-east-1-xxxxx
  • AthenaWorkGroupNameAthenaWorkgroup-xxxxxx
  • DataGeneratorLambdaFunctionUserRecordsFunction-xxxxxx
  • DynamoDBTableNameusers_xxxxxx
  • LambdaDMLFunctionIcebergUpsertFunction-xxxxxx
  • AthenaIcebergTableNameusers_xxxxxx

Generate sample employee data and load into the DynamoDB table using Lambda

To test the solution, trigger the UserRecordsFunction-XXXXX function by creating a test event which loads sample data into DynamoDB table.

  1. On the Lambda console, open the Lambda function with the name UserRecordsFunction-XXXXX.
  2. On the Code tab, choose Test, then Configure test event.
  3. Configure a test event with the default hello-world template event JSON.
  4. Provide an event name without any changes to the template and save the test event.
  5. On the Test tab, choose Test to trigger the SampleEvent test event. This will invoke the data generator Lambda function to load data into the users_xxxxxx DynamoDB table. When the test event is complete, you should notice a success notification as shown in the following screenshot.
  6. On the DynamoDB console, navigate to the users_XXXXXX table and choose Explore table items to verify the data loaded into the table.

The data loads performed on the DynamoDB table will be cascaded to the Athena table with the help of the IcebergUpsertFunction-xxxxx Lambda function deployed by CloudFormation template.

In the following sections, we simulate and validate various scenarios to demonstrate Iceberg capabilities, including DML operations, time travel, and optimizations.

Simulate the scenarios and validate CDC functionality in Athena

After the first run of the data generator Lambda function, navigate to the Athena query editor, choose the AthenaWorkgroup-XXXXX workgroup, and preview the user_XXXXXX Iceberg table to query the records.

With the data inserted into the DynamoDB table, all the data change activities such as inserts, updates, and deletes are captured in DynamoDB Streams. DynamoDB Streams triggers IcebergUpsertFunction-xxxxx Lambda function which processes the events in the order they are received. IcebergUpsertFunction-xxxxx function, performs the following steps:

  • Receives the stream event
  • Parses the stream event based on the  DynamdoDB eventType (insert, update, or delete) and eventually generates an Athena DML SQL statement
  • Runs the SQL statement in Athena

Let’s deep dive in to the IcebergUpsertFunction-XXXX function code and how it handles various scenarios.

IcebergUpsertFunction-xxxxx function code

As indicated in the following Lambda function code block, the DynamoDB Streams event received by the function, categorizes events based on eventType—INSERT, MODIFY, or DELETE. Any other event raises InvalidEventException. MODIFY is considered an UPDATE event.

All the DML operations are run on the user_XXXXXX table in Athena. We fetch the metadata of the users_xxxxxx table from Athena. The following are a few important considerations regarding how the Lambda function handles Iceberg table metadata changes:

  • In this approach, target metadata takes precedence during DML operations.
  • Any columns that are missing in the target will be excluded in the DML command.
  • It’s imperative that the source and target metadata match. Incase new columns and attributes are added to source table than the current solution is configured to skip the new columns and attributes.
  • This solution can be enhanced further to cascade source system metadata changes to the target table in Athena.

The following is the Lambda function code:

def iceberg_upsert(event, database, tablename):
    response ={}
    logger.info(f'Started iceberg_upsert executing.')
    logger.info(f'Started parsing received event.')
    
    # Determine type of event
    resp=event
    eventName=resp['eventName']
    
    # call for athena function 
    athresp=retrieve_athena_table_metadata(database,tablename) 
    try:
        AthenTblMd=athresp['TableMetadata']['Columns']
    except Exception as e:
        logger.error(f"Athena Metadata does not have column information. Please check table {tablename} and database {database} ")
        raise(e)
    else: # else block for try/except
        logger.info(f"{AthenTblMd}")
        
    try:
        if eventName == "INSERT":
            sqlstmt=insert_stmt(resp,AthenTblMd,database,tablename)
            logger.info(sqlstmt)
            response=run_query(sqlstmt, database_name, athena_workgroup, output_location,wait_time)
        elif eventName == "MODIFY":
            sqlstmt=update_stmt(resp,AthenTblMd,database,tablename)
            logger.info(sqlstmt)
            response=run_query(sqlstmt, database_name, athena_workgroup, output_location,wait_time)
        elif eventName == "REMOVE":
            sqlstmt=del_stmt(resp,database,tablename)
            logger.info(sqlstmt)
            response=run_query(sqlstmt, database_name, athena_workgroup, output_location,wait_time)
        else:
            raise InvalidEventTypeException
        
    except InvalidEventTypeException:
        logger.warning(f'Event type should be INSERT/MODIFY/REMOVE. Received event type is : {eventName}.')
        logger.warning(f'Skipping applying grant/revoke permissions.')
    except Exception as e:
        logger.error("iceberg_upsert function failed with error")
        raise(e)
    else : # else block for try/except
        return response

The following code uses the Athena Boto3 client to fetch the table metadata:

def retrieve_athena_table_metadata(databaseName, tableName, catalogName=None):
    if catalogName is None:
        catalogName='AWSDATACATALOG' # default value 
    try:
        athenaTblMd=client.get_table_metadata(CatalogName=catalogName,DatabaseName=databaseName,TableName=tableName)
    except Exception as e:
        logger.error("Athena Table Metadata retrieval function Failed.Please check exception", e)
        raise(e) 
    else: # else block for try except
        return athenaTblMd

Insert operations

Now let’s see how insert operations are handled with the sample data generated in the DynamoDB table.

  1. On the DynamoDB console, navigate to the users_XXXXX table.
  2. Choose Create item.
  3. Enter a sample record with the following code:
    {
      "emp_no": {
         "N": "11"
      },
      "country": {
         "S": "USA"
      },
      "dateOfBirth": {
         "S": "1991-10-23"
      },
      "first_name": {
         "S": "Tom"
      },
      "isContractAthlete": {
         "BOOL": false
      },
      "job": {
         "S": "Sr Manager"
      },
      "last_name": {
         "S": "Carter"
      },
      "phone_number": {
         "S": "+1-226-333-789"
      },
      "sex": {
         "S": "male"
      },
      "ssn": {
         "S": "434-98-2345"
      }
    }
    

  4. Choose Create item to insert the new record into the DynamoDB table.

After the item is created in the DynamoDB table, a stream event is generated in DynamoDB Streams, which triggers the Lambda function. The function processes the event and generates an equivalent INSERT SQL statement to run on the Athena table. The following screenshot shows the INSERT SQL that was generated by the Lambda function on the Athena console in the Recent queries section.

The IcebergUpsertFunction-xxxxx Lambda code has modularized functions for each eventType. The following code highlights the function, which processes insert eventType streams:

def insert_stmt(insert_event_resp,AthenTblMd,database,tablename):
    resp=insert_event_resp
    
    Tablevalues=resp['dynamodb']['NewImage']
    Tblvalues={ k.lower():v for k,v in Tablevalues.items()} # converting key names to lowercase to prevent case-sensitive mismatches
    
    val_list=unpack_dict(Tblvalues,AthenTblMd)
    col_nm,val_for_col=[],[]
 
    for item in val_list:
        
        if item.get('data') is not None:
            col_nm.append(item['Name'])
            if item['Type'] != 'string':
                val_for_col.append(f"CAST ({(item['data'])} AS {item['Type']})" )
            else:
                val_for_col.append(str((item['data'])))
 
    colnames_with_doublequotes=",".join([f'"{i}"' for i in col_nm])
    values_formatted=",".join([f"{i}" if i.startswith('CAST') else f"'{i}'" for i in val_for_col] )
 
    return f"insert into {database}.{tablename} ({colnames_with_doublequotes}) values ({values_formatted})"

This function parses the create item stream event and constructs an INSERT SQL statement in the following format:

INSERT into <tablename> values (val1, val2....)

The function returns a string, which is an ANSI SQL compliant statement that can be run directly in Athena.

Update operations

For our update operation, let’s identify the current state of a record in the Athena table. We see emp_no=5 and its column values in Athena and compare them to the DynamoDB table. If there are no changes, the records should be the same, as shown in the following screenshots.

Let’s initiate an edit item operation in the DynamoDB table. We modify the following values:

  • IsContractAthlete – True
  • Phone_number – 123-456-789

After the item is edited in the DynamoDB table, a MODIFY stream event is generated in DynamoDB Streams, which triggers the Lambda function. The function processes the event and generates the equivalent UPDATE SQL statement to run on the Athena table.

MODIFY DynamoDB Streams events have two components: the old image and the new image. Here we parse only the new image data section to construct an UPDATE ANSI SQL statement and run it on the Athena tables.

The following update_stmt code block parses the modify item stream event and constructs the corresponding UPDATE SQL statement with new image data. The code block performs the following steps:

  • Finds the key columns for the WHERE clause
  • Finds columns for the SET clause
  • Ensures key columns are not part of the SET command

The function returns a string that is a SQL ANSI compliant statement that can be run directly in Athena. For example:

UPDATE <TABLENAME> SET col = value where key = value

See the following code:

def update_stmt(update_event_resp,AthenTblMd,database,tablename):
    resp=update_event_resp
    
    Tablevalues=resp['dynamodb']['NewImage']
    primary_key_col_names=resp['dynamodb']['Keys']     
    
    Tblvalues={ k.lower():v for k,v in Tablevalues.items()} # converting key names to lowercase to prevent case-sensitive mismatches
    
    new_upd_AthenaTblMd=AthenTblMd.copy()
    where_nm,set_nm=[],[]
    forUpdate=Tblvalues.copy()
 
    # removing primary keys from the stream dictionary so that SET command for Update can be constructed.
    for col_pkey in primary_key_col_names.keys():
        forUpdate.pop(col_pkey,None)
    
 
    for position,item in enumerate(AthenTblMd):
        if forUpdate.get(item.get('Name')) is not None:
            datafromsource=(list(forUpdate.get(item.get('Name')).values())[0])
            new_upd_AthenaTblMd[position]['data']=datafromsource
 
    # For set clause
    for item in new_upd_AthenaTblMd:
        if item.get('data') is not None:
            if item['Type'] != 'string':
                set_nm.append(f"{item['Name']} = CAST ('{(item['data'])}' AS {item['Type']})")
            else:
                set_nm.append(f" {item['Name']} = '{item['data']}' ")
    
    set_cmd=f" set {','.join(set_nm)}"
    
    # for where clause
    for key, val in primary_key_col_names.items():
        where_nm.append(f" {key} = {list(val.values())[0]}")
 
    where_cmd=f" where {' and '.join(where_nm)}"
 
    return (f" UPDATE {database}.{tablename} {set_cmd}  {where_cmd}")

In the Athena table, we can see the columns IsContractAthlete and Phone_number have been updated to the recent values. The other column values remain the same because they weren’t modified.

Delete operations

For delete operations, let’s identify the current state of a record in Athena table. We choose emp_no=6 for this activity.

  1. On the DynamoDB console, navigate to the user table.
  2. Select the record for emp_no=6.
  3. On the Actions menu, choose Delete items.

After the delete item operation is performed on the DynamoDB table, it generates a DELETE eventType in the DynamoDB stream, which triggers the Iceberg-Upsert Lambda function.

The DELETE function removes the data based on key columns in the stream. The following function parses the stream to identify key columns of the deleted item. We construct a DELETE DML SQL statement with a WHERE clause of emp_no=6:

DELETE &lt;TABLENAME&gt; WHERE key = value

See the following code:

def del_stmt(del_event_resp,database,tablename):
    
    resp=del_event_resp
    
    primary_key_col_names=resp['dynamodb']['Keys'] 
    del_where_nm=[]
    
    for key, val in primary_key_col_names.items():
        del_where_nm.append(f" {key} = {list(val.values())[0]}")
 
    del_where_cmd=f" where {' and '.join(del_where_nm)}"
    return f" DELETE FROM {database}.{tablename} {del_where_cmd} "   

The function returns a string, which is an ANSI SQL compliant statement that can be run directly in Athena. The following screenshot shows the DELETE statement that was run in Athena.

As you can see from the following screenshot, emp_no=6 record no longer exists in the Iceberg table when queried with Athena.

Time travel

Time travel queries in Athena query Amazon S3 for historical data from a consistent snapshot as of a specified date and time. Iceberg tables provide the capability of time travel. Each Iceberg table maintains a versioned manifest of the S3 objects that it contains. Previous versions of the manifest can be used for time travel and version travel queries. Version travel queries in Athena query Amazon S3 for historical data as of a specified snapshot ID. Iceberg format tracks every change that happened to the table in the tablename$iceberg_history table. When you query them, it will show timestamps when the changes occurred in the table.

Let’s find the timestamp when a DELETE statement was applied to the Athena table. In our query, it corresponds to the time 2023-04-18 21:34:13.970. With this timestamp, let’s query the main table to see if the emp_no=6 exists in it.

As shown in the following screenshot, the query result shows that the deleted record exists, and this can be used to reinsert data if required.

Optimize Iceberg tables

Every insert and update operation on an Iceberg table creates a separate data and metadata file. If there are multiple such update and insert operations, it might lead to multiple small fragmented files. Having these small files can cause an unnecessary number of metadata and less efficient queries. Utilize Athena OPTIMIZE command to compact these small files.

OPTIMIZE

The OPTIMIZE table REWRITE DATA compaction action rewrites data files into a more optimized layout based on their size and number of associated delete files.

The following query shows the number of data files that exist before the compaction process:

SELECT * FROM "users_73591300$iceberg_files"

The following query performs compaction on the Iceberg table:

OPTIMIZE "users_73591300$iceberg_files" REWRITE DATA USING BIN_PACK

We can observe that the compaction process merged multiple data files into a larger file.

VACUUM

The VACUUM statement on Iceberg tables removes data files that are no longer relevant, which reduces metadata size and storage consumption. VACUUM removes unwanted files older than the amount of time that is specified by the vacuum_max_snapshot_age_seconds table property (default 432000), as shown in the following code:

ALTER TABLE users_73591300 SET TBLPROPERTIES ('vacuum_max_snapshot_age_seconds'='259200')

The following query performs a vacuum operation on the Iceberg table:

VACUUM users_73591300

Clean up

When you have finished experimenting with this solution, clean up your resources to prevent AWS charges from being incurred:

  1. Empty the S3 buckets.
  2. Delete the stack from the AWS CloudFormation console.

Conclusion

In this post, we introduced a serverless CDC solution for semi-structured data using DynamoDB Streams and processing them in Iceberg tables. We demonstrated how to ingest semi-structured data in DynamoDB, identify changed data using DynamoDB Streams, and process them in Iceberg tables. We can expand the solution to build SCD type-2 functionality in data lakes to track historical data changes. This solution is appropriate for low frequency of updates, but for high frequency and larger volumes of data, we can aggregate the changes in a separate intermediate table using DynamoDB Streams and Amazon Kinesis Data Firehose, and then run periodic MERGE operations into the main Iceberg table.

We hope this post provided insights on how to process semi-structured data in a data lake when sources systems lack CDC capability.


About the authors

Vijay Velpula is a Data Lake Architect with AWS Professional Services. He helps customers building  modern data platforms through implementing Big Data & Analytics solutions. Outside of work, he enjoys spending time with family, traveling, hiking and biking.

Karthikeyan Ramachandran is a Data Architect with AWS Professional Services. He specializes in MPP systems helping Customers build and maintain Data warehouse environments. Outside of work, he likes to binge-watch tv shows and loves playing cricket and volleyball.

Sriharsh Adari is a Senior Solutions Architect at Amazon Web Services (AWS), where he helps customers work backwards from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise include Technology Strategy, Data Analytics, and Data Science. In his spare time, he enjoys playing sports, binge-watching TV shows, and playing Tabla.

Derive operational insights from application logs using Automated Data Analytics on AWS

Post Syndicated from Aparajithan Vaidyanathan original https://aws.amazon.com/blogs/big-data/derive-operational-insights-from-application-logs-using-automated-data-analytics-on-aws/

Automated Data Analytics (ADA) on AWS is an AWS solution that enables you to derive meaningful insights from data in a matter of minutes through a simple and intuitive user interface. ADA offers an AWS-native data analytics platform that is ready to use out of the box by data analysts for a variety of use cases. With ADA, teams can ingest, transform, govern, and query diverse datasets from a range of data sources without requiring specialist technical skills. ADA provides a set of pre-built connectors to ingest data from a wide range of sources including Amazon Simple Storage Service (Amazon S3), Amazon Kinesis Data Streams, Amazon CloudWatch, Amazon CloudTrail, and Amazon DynamoDB as well as many others.

ADA provides a foundational platform that can be used by data analysts in a diverse set of use cases including IT, finance, marketing, sales, and security. ADA’s out-of-the-box CloudWatch data connector allows data ingestion from CloudWatch logs in the same AWS account in which ADA has been deployed, or from a different AWS account.

In this post, we demonstrate how an application developer or application tester is able to use ADA to derive operational insights of applications running in AWS. We also demonstrate how you can use the ADA solution to connect to different data sources in AWS. We first deploy the ADA solution into an AWS account and set up the ADA solution by creating data products using data connectors. We then use the ADA Query Workbench to join the separate datasets and query the correlated data, using familiar Structured Query Language (SQL), to gain insights. We also demonstrate how ADA can be integrated with business intelligence (BI) tools such as Tableau to visualize the data and to build reports.

Solution overview

In this section, we present the solution architecture for the demo and explain the workflow. For the purposes of demonstration, the bespoke application is simulated using an AWS Lambda function that emits logs in Apache Log Format at a preset interval using Amazon EventBridge. This standard format can be produced by many different web servers and be read by many log analysis programs. The application (Lambda function) logs are sent to a CloudWatch log group. The historical application logs are stored in an S3 bucket for reference and for querying purposes. A lookup table with a list of HTTP status codes along with the descriptions is stored in a DynamoDB table. These three serve as sources from which data is ingested into ADA for correlation, query, and analysis. We deploy the ADA solution into an AWS account and set up ADA. We then create the data products within ADA for the CloudWatch log group, S3 bucket, and DynamoDB. As the data products are configured, ADA provisions data pipelines to ingest the data from the sources. With the ADA Query Workbench, you can query the ingested data using plain SQL for application troubleshooting or issue diagnosis.

The following diagram provides an overview of the architecture and workflow of using ADA to gain insights into application logs.

The workflow includes the following steps:

  1. A Lambda function is scheduled to be triggered at 2-minute intervals using EventBridge.
  2. The Lambda function emits logs that are stored at a specified CloudWatch log group under /aws/lambda/CdkStack-AdaLogGenLambdaFunction. The application logs are generated using the Apache Log Format schema but stored in the CloudWatch log group in JSON format.
  3. The data products for CloudWatch, Amazon S3, and DynamoDB are created in ADA. The CloudWatch data product connects to the CloudWatch log group where the application (Lambda function) logs are stored. The Amazon S3 connector connects to an S3 bucket folder where the historical logs are stored. The DynamoDB connector connects to a DynamoDB table where the status codes that are referred by the application and historical logs are stored.
  4. For each of the data products, ADA deploys the data pipeline infrastructure to ingest data from the sources. When the data ingestion is complete, you can write queries using SQL via the ADA Query Workbench.
  5. You can log in to the ADA portal and compose SQL queries from the Query Workbench to gain insights in to the application logs. You can optionally save the query and share the query with other ADA users in the same domain. The ADA query feature is powered by Amazon Athena, which is a serverless, interactive analytics service that provides a simplified, flexible way to analyze petabytes of data.
  6. Tableau is configured to access the ADA data products via ADA egress endpoints. You then create a dashboard with two charts. The first chart is a heat map that shows the prevalence of HTTP error codes correlated with the application API endpoints. The second chart is a bar chart that shows the top 10 application APIs with a total count of HTTP error codes from the historical data.

Prerequisites

For this post, you need to complete the following prerequisites:

  1. Install the AWS Command Line Interface (AWS CLI), AWS Cloud Development Kit (AWS CDK) prerequisites, TypeScript-specific prerequisites, and git.
  2. Deploy the ADA solution in your AWS account in the us-east-1 Region.
    1. Provide an admin email while launching the ADA AWS CloudFormation stack. This is needed for ADA to send the root user password. An admin phone number is required to receive a one-time password message if multi-factor authentication (MFA) is enabled. For this demo, MFA is not enabled.
  3. Build and deploy the sample application (available on the GitHub repo) solution so that the following resources can be provisioned in your account in the us-east-1 Region:
    1. A Lambda function that simulates the logging application and an EventBridge rule that invokes the application function at 2-minute intervals.
    2. An S3 bucket with the relevant bucket policies and a CSV file that contains the historical application logs.
    3. A DynamoDB table with the lookup data.
    4. Relevant AWS Identity and Access Management (IAM) roles and permissions required for the services.
  4. Optionally, install Tableau Desktop, a third-party BI provider. For this post, we use Tableau Desktop version 2021.2. There is a cost involved in using a licensed version of the Tableau Desktop application. For additional details, refer to the Tableau licensing information.

Deploy and set up ADA

After ADA is deployed successfully, you can log in using the admin email provided during the installation. You then create a domain named CW_Domain. A domain is a user-defined collection of data products. For example, a domain might be a team or a project. Domains provide a structured way for users to organize their data products and manage access permissions.

  1. On the ADA console, choose Domains in the navigation pane.
  2. Choose Create domain.
  3. Enter a name (CW_Domain) and description, then choose Submit.

Set up the sample application infrastructure using AWS CDK

The AWS CDK solution that deploys the demo application is hosted on GitHub. The steps to clone the repo and to set up the AWS CDK project are detailed in this section. Before you run these commands, be sure to configure your AWS credentials. Create a folder, open the terminal, and navigate to the folder where the AWS CDK solution needs to be installed. Run the following code:

gh repo clone aws-samples/operational-insights-with-automated-data-analytics-on-aws
cd operational-insights-with-automated-data-analytics-on-aws
npm install
npm run build
cdk synth
cdk deploy

These steps perform the following actions:

  • Install the library dependencies
  • Build the project
  • Generate a valid CloudFormation template
  • Deploy the stack using AWS CloudFormation in your AWS account

The deployment takes about 1–2 minutes and creates the DynamoDB lookup table, Lambda function, and S3 bucket containing the historical log files as outputs. Copy these values to a text editing application, such as Notepad.

Create ADA data products

We create three different data products for this demo, one for each data source that you’ll be querying to gain operational insights. A data product is a dataset (a collection of data such as a table or a CSV file) that has been successfully imported into ADA and that can be queried.

Create a CloudWatch data product

First, we create a data product for the application logs by setting up ADA to ingest the CloudWatch log group for the sample application (Lambda function). Use the CdkStack.LambdaFunction output to get the Lambda function ARN and locate the corresponding CloudWatch log group ARN on the CloudWatch console.

Then complete the following steps:

  1. On the ADA console, navigate to the ADA domain and create a CloudWatch data product.
  2. For Name¸ enter a name.
  3. For Source type, choose Amazon CloudWatch.
  4. Disable Automatic PII.

ADA has a feature that automatically detects personally identifiable information (PII) data during import that is enabled by default. For this demo, we disable this option for the data product because the discovery of PII data is not in the scope of this demo.

  1. Choose Next.
  2. Search for and choose the CloudWatch log group ARN copied from the previous step.
  3. Copy the log group ARN.
  4. On the data product page, enter the log group ARN.
  5. For CloudWatch Query, enter a query that you want ADA to get from the log group.

In this demo, we query the @message field because we’re interested in getting the application logs from the log group.

  1. Select how the data updates are triggered after initial import.

ADA can be configured to ingest the data from the source at flexible intervals (up to 15 minutes or later) or on demand. For the demo, we set the data updates to run hourly.

  1. Choose Next.

Next, ADA will connect to the log group and query the schema. Because the logs are in Apache Log Format, we transform the logs into separate fields so that we can run queries on the specific log fields. ADA provides four default transformations and supports custom transformation through a Python script. In this demo, we run a custom Python script to transform the JSON message field into Apache Log Format fields.

  1. Choose Transform schema.
  2. Choose Create new transform.
  3. Upload the apache-log-extractor-transform.py script from the /asset/transform_logs/ folder.
  4. Choose Submit.

ADA will transform the CloudWatch logs using the script and present the processed schema.

  1. Choose Next.
  2. In the last step, review the steps and choose Submit.

ADA will start the data processing, create the data pipelines, and prepare the CloudWatch log groups to be queried from the Query Workbench. This process will take a few minutes to complete and will be shown on the ADA console under Data Products.

Create an Amazon S3 data product

We repeat the steps to add the historical logs from the Amazon S3 data source and look up reference data from the DynamoDB table. For these two data sources, we don’t create custom transforms because the data formats are in CSV (for historical logs) and key attributes (for reference lookup data).

  1. On the ADA console, create a new data product.
  2. Enter a name (hist_logs) and choose Amazon S3.
  3. Copy the Amazon S3 URI (the text after arn:aws:s3:::) from the CdkStack.S3 output variable and navigate to the Amazon S3 console.
  4. In the search box, enter the copied text, open the S3 bucket, select the /logs folder, and choose Copy S3 URI.

The historical logs are stored in this path.

  1. Navigate back to the ADA console and enter the copied S3 URI for S3 location.
  2. For Update Trigger, select On Demand because the historical logs are updated at an unspecified frequency.
  3. For Update Policy, select Append to append newly imported data to the existing data.
  4. Choose Next.

ADA processes the schema for the files in the selected folder path. Because the logs are in CSV format, ADA is able to read the column names without requiring additional transformations. However, the columns status_code and request_size are inferred as long type by ADA. We want to keep the column data types consistent among the data products so that we can join the data tables and query the data. The column status_code will be used to create joins across the data tables.

  1. Choose Transform schema to change the data types of the two columns to string data type.

Note the highlighted column names in the Schema preview pane prior to applying the data type transformations.

  1. In the Transform plan pane, under Built-in transforms, choose Apply Mapping.

This option allows you to change the data type from one type to another.

  1. In the Apply Mapping section, deselect Drop other fields.

If this option is not disabled, only the transformed columns will be preserved and all other columns will be dropped. Because we want to retain all the columns, we disable this option.

  1. Under Field Mappings¸ for Old name and New name, enter status_code and for New type, enter string.
  2. Choose Add Item.
  3. For Old name and New name¸ enter request_size and for New data type, enter string.
  4. Choose Submit.

ADA will apply the mapping transformation on the Amazon S3 data source. Note the column types in the Schema preview pane.

  1. Choose View sample to preview the data with the transformation applied.

ADA will display the PII data acknowledgement to ensure that either only authorized users can view the data or that the dataset doesn’t contain any PII data.

  1. Choose Agree to continue to view the sample data.

Note that the schema is identical to the CloudWatch log group schema because both the current application and historical application logs are in Apache Log Format.

  1. In the final step, review the configuration and choose Submit.

ADA starts processing the data from the Amazon S3 source, creates the backend infrastructure, and prepares the data product. This process takes a few minutes depending upon the size of the data.

Create a DynamoDB data product

Lastly, we create a DynamoDB data product. Complete the following steps:

  1. On the ADA console, create a new data product.
  2. Enter a name (lookup) and choose Amazon DynamoDB.
  3. Enter the Cdk.DynamoDBTable output variable for DynamoDB Table ARN.

This table contains key attributes that will be used as a lookup table in this demo. For the lookup data, we are using the HTTP codes and long and short descriptions of the codes. You can also use PostgreSQL, MySQL, or a CSV file source as an alternative.

  1. For Update Trigger, select On-Demand.

The updates will be on demand because the lookup is mostly for reference purpose while querying and any updates to the lookup data can be updated in ADA using on-demand triggers.

  1. Choose Next.

ADA reads the schema from the underlying DynamoDB schema and presents the column name and type for optional transformation. We will proceed with the default schema selection because the column types are consistent with the types from the CloudWatch log group and Amazon S3 CSV data source. Having data types that are consistent across the data sources allows us to write queries to fetch records by joining the tables using the column fields. For example, the column key in the DynamoDB schema corresponds to the status_code in the Amazon S3 and CloudWatch data products. We can write queries that can join the three tables using the column name key. An example is shown in the next section.

  1. Choose Continue with current schema.
  2. Review the configuration and choose Submit.

ADA will process the data from the DynamoDB table data source and prepare the data product. Depending upon the size of the data, this process takes a few minutes.

Now we have all the three data products processed by ADA and available for you to run queries.

Use the Query Workbench to query the data

ADA allows you to run queries against the data products while abstracting the data source and making it accessible using SQL (Structured Query Language). You can write queries and join the tables just as you would query against tables in a relational database. We demonstrate ADA’s querying capability via two user scenarios. In both the scenarios, we join an application log dataset to the error codes lookup table. In the first use case, we query the current application logs to identify the top 10 most accessed application endpoints along with the corresponding HTTP status codes:

--Query the top 10 Application endpoints along with the corresponding HTTP request type and HTTP status code.

SELECT logs.endpoint AS Application_EndPoint, logs.http_request AS REQUEST, count(logs.endpoint) as Endpoint_Count, ref.key as HTTP_Status_Code, ref.short as Description
FROM cw_domain.cloud_watch_application_logs logs
INNER JOIN cw_domain.lookup ref ON logs.status_code = ref.key
where logs.status_code LIKE '4%%' OR logs.status_code LIKE '5%%' -- = '/v1/server'
GROUP BY logs.endpoint, logs.http_request, ref.key, ref.short
ORDER BY Endpoint_Count DESC
LIMIT 10

In the second example, we query the historical logs table to get the top 10 application endpoints with the most errors to understand the endpoint call pattern:

-- Query Historical Logs to get the top 10 Application Endpoints with most number of errors along with an explanation of the error code.

SELECT endpoint as Application_EndPoint, count(status_code) as Error_Count, ref.long as Description FROM cw_domain.hist_logs hist
INNER JOIN cw_domain.lookup ref ON hist.status_code = ref.key
WHERE hist.status_code LIKE '4%%' OR hist.status_code LIKE '5%%'
GROUP BY endpoint, status_code, ref.long
ORDER BY Error_Count desc
LIMIT 10

In addition to querying, you can optionally save the query and share the saved query with other users in the same domain. The shared queries are accessible directly from the Query Workbench. The query results can also be exported to CSV format.

Visualize ADA data products in Tableau

ADA offers the ability to connect to third-party BI tools to visualize data and create reports from the ADA data products. In this demo, we use ADA’s native integration with Tableau to visualize the data from the three data products we configured earlier. Using Tableau’s Athena connector and following the steps in Tableau configuration, you can configure ADA as a data source in Tableau. After a successful connection has been established between Tableau and ADA, Tableau will populate the three data products under the Tableau catalog cw_domain.

We then establish a relationship across the three databases using the HTTP status code as the joining column, as shown in the following screenshot. Tableau allows us to work in online and offline mode with the data sources. In online mode, Tableau will connect to ADA and query the data products live. In offline mode, we can use the Extract option to extract the data from ADA and import the data in to Tableau. In this demo, we import the data in to Tableau to make the querying more responsive. We then save the Tableau workbook. We can inspect the data from the data sources by choosing the database and Update Now.

With the data source configurations in place in Tableau, we can create custom reports, charts, and visualizations on the ADA data products. Let’s consider two use cases for visualizations.

As shown in the following figure, we visualized the frequency of the HTTP errors by application endpoints using Tableau’s built-in heat map chart. We filtered out the HTTP status codes to only include error codes in the 4xx and 5xx range.

We also created a bar chart to depict the application endpoints from the historical logs ordered by the count of HTTP error codes. In this chart, we can see that the /v1/server/admin endpoint has generated the most HTTP error status codes.

Clean up

Cleaning up the sample application infrastructure is a two-step process. First, to remove the infrastructure provisioned for the purposes of this demo, run the following command in the terminal:

cdk destroy

For the following question, enter y and AWS CDK will delete the resources deployed for the demo:

Are you sure you want to delete: CdkStack (y/n)? y

Alternatively, you can remove the resources via the AWS CloudFormation console by navigating to the CdkStack stack and choosing Delete.

The second step is to uninstall ADA. For instructions, refer to Uninstall the solution.

Conclusion

In this post, we demonstrated how to use the ADA solution to derive insights from application logs stored across two different data sources. We demonstrated how to install ADA on an AWS account and deploy the demo components using AWS CDK. We created data products in ADA and configured the data products with the respective data sources using the ADA’s built-in data connectors. We demonstrated how to query the data products using standard SQL queries and generate insights on the log data. We also connected the Tableau Desktop client, a third-party BI product, to ADA and demonstrated how to build visualizations against the data products.

ADA automates the process of ingesting, transforming, governing, and querying diverse datasets and simplifying the lifecycle management of data. ADA’s pre-built connectors allow you to ingest data from diverse data sources. Software teams with basic knowledge of AWS products and services will be able to set up an operational data analytics platform in a few hours and provide secure access to the data. The data can then be easily and quickly queried using an intuitive and standalone web user interface.

Try out ADA today to easily manage and gain insights from data.


About the authors

Aparajithan Vaidyanathan is a Principal Enterprise Solutions Architect at AWS. He supports enterprise customers migrate and modernize their workloads on AWS cloud. He is a Cloud Architect with 23+ years of experience designing and developing enterprise, large-scale and distributed software systems. He specializes in Machine Learning & Data Analytics with focus on Data and Feature Engineering domain. He is an aspiring marathon runner and his hobbies include hiking, bike riding and spending time with his wife and two boys.

Rashim Rahman is a Software Developer based out of Sydney, Australia with 10+ years of experience in software development and architecture. He works primarily on building large scale open-source AWS solutions for common customer use cases and business problems. In his spare time, he enjoys sports and spending time with friends and family.

Hafiz Saadullah is a Principal Technical Product Manager at Amazon Web Services. Hafiz focuses on AWS Solutions, designed to help customers by addressing common business problems and use cases.